VideoPusher.h

#pragma once
#include <iostream>
#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <glib.h>
#include <boost/shared_ptr.hpp>
#include <mutex>

#ifndef INT64_C 
#define INT64_C(c) (c ## LL) 
#define UINT64_C(c) (c ## ULL) 
#endif 

class VideoPusher
{
    struct Exception : std::exception{};
    template<typename T>
    T* chk(T* pointer) {
        if (pointer == nullptr) {
            throw Exception();
        }
        return pointer;
    }

    template<typename T>
    void delptr(T* pointer)
    {
        if (pointer != nullptr)
        {
            delete pointer;
            pointer = nullptr;
        }
    }


public:
    VideoPusher();
    VideoPusher(std::string strlocation, std::string strCode);
    virtual ~VideoPusher();
    bool Run(std::string strlocation, std::string strCode, std::string strrequest);
    
protected:
    static void _onPadAdded(GstElement *src, GstPad *src_pad, gpointer user_data);
    void SetElesNull();
protected:
    GMainLoop  *_loop;
    GstBus*     _bus;
    GstElement* _pipeline;
    GstElement* _source;
    GstElement* _depay;
    GstElement* _parse;
    GstElement* _capsfilter;
    GstElement* _queue1;
    GstElement* _rtmpsink;
    GstElement* _flvmux;
    std::mutex _mutex;
	bool _bStopPush;
};

typedef boost::shared_ptr<VideoPusher> VideoPusherPtr;

VideoPusher.cpp

void VideoPusher::_onPadAdded(GstElement *src, GstPad *src_pad, gpointer user_data)
{
    GstPad* sink_pad = (GstPad*)user_data;
    gst_pad_link(src_pad, sink_pad);
}

void VideoPusher::SetElesNull()
{
    _loop = nullptr;
    _bus = nullptr;
    _pipeline = nullptr;
    _source = nullptr;
    _depay = nullptr;
    _parse = nullptr;
    _capsfilter = nullptr;
    _queue1 = nullptr;
    _rtmpsink = nullptr;
    _flvmux = nullptr;
}

bool VideoPusher::Run(std::string strLocation,
    std::string strCode, std::string strrequest)
{
	_bStopPush = false;
  
	while (!_bStopPush)
	{
		SetElesNull();

		gboolean terminate = FALSE;

		gst_init(nullptr, nullptr);

		std::stringstream stream;
		stream << "pipeline" << g_pipelinenum++;
		std::string strname;
		strname = stream.str();

		_pipeline = chk(gst_pipeline_new(strname.c_str()));
		_source = chk(gst_element_factory_make("rtspsrc", "src"));
		_depay = chk(gst_element_factory_make(("rtp" + strCode + "depay").c_str(), "depay"));
		_parse = chk(gst_element_factory_make((strCode + "parse").c_str(), "parse"));
		_flvmux = chk(gst_element_factory_make("flvmux", "flvmux"));
		_queue1 = chk(gst_element_factory_make("queue", "queue"));
		_capsfilter = chk(gst_element_factory_make("capsfilter", "filter"));
		_rtmpsink = chk(gst_element_factory_make("rtmpsink", "sink"));

		//g_object_set(_source, "protocols", 0x00000004, NULL);
		g_object_set(_source, "latency", 0, NULL);
		g_object_set(_capsfilter, "caps-change-mode", 1, NULL);
		g_object_set(_rtmpsink, "location", strLocation.c_str(), NULL);

		gst_bin_add_many(GST_BIN(_pipeline), _source, _depay, _parse, _flvmux, _capsfilter, _queue1, _rtmpsink, NULL);
		g_signal_connect(_source, "pad-added", G_CALLBACK(&_onPadAdded), gst_element_get_static_pad(_depay, "sink"));
		gboolean bsuccess = gst_element_link_many(_depay, _parse, _flvmux, _capsfilter, _queue1, _rtmpsink, NULL);
		if (!bsuccess) {
			g_print("Failed to link one or more elements!\n");
			gst_element_unlink_many(_depay, _parse, _flvmux, _capsfilter, _queue1, _rtmpsink, NULL);
			Sleep(1000);
			continue;
		}
		g_object_set(_source, "location", strrequest.c_str(), NULL);
		GstCaps* caps = gst_caps_new_simple(
			"video/x-raw",
			"format", G_TYPE_STRING, "rgb",
			"width", G_TYPE_INT, 426,
			"height", G_TYPE_INT, 240,
			"framerate", GST_TYPE_FRACTION, 25, 1,
			NULL);
		g_object_set(_capsfilter, "caps", caps, NULL);
		gst_caps_unref(caps);
		GstStateChangeReturn res = gst_element_set_state(_pipeline, GST_STATE_PLAYING);
		if (res == GST_STATE_CHANGE_FAILURE)
		{
			g_printerr("Unable to set the pipeline to the playing state.\n");
			gst_object_unref(_pipeline);
			Sleep(1000);
			continue;
		}
		GstMessage *msg;
		/* Listen to the bus */
		//_bus = gst_element_get_bus(_pipeline);
		_bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
		do
		{
			msg = gst_bus_timed_pop_filtered(_bus, GST_CLOCK_TIME_NONE,
				GstMessageType(GST_MESSAGE_STATE_CHANGED |
				GST_MESSAGE_ERROR | GST_MESSAGE_EOS));
			/* Parse message */
			if (msg != NULL)
			{
				GError *err;
				gchar *debug_info;
				switch (GST_MESSAGE_TYPE(msg))
				{
				case GST_MESSAGE_ERROR:
				{
					gst_message_parse_error(msg, &err, &debug_info);
					g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
					g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
					g_clear_error(&err);
					g_free(debug_info);
					terminate = TRUE;
				}
				break;
				case GST_MESSAGE_EOS:
				{
					g_print("End-Of-Streamreached.\n");
					terminate = TRUE;
				}
				break;
				case GST_MESSAGE_STATE_CHANGED:
				{
					/* We are onlyinterested in state-changed messages from the pipeline */
					if (GST_MESSAGE_SRC(msg) == GST_OBJECT(_pipeline))
					{
						GstState old_state, new_state, pending_state;
						gst_message_parse_state_changed(msg,
							&old_state,
							&new_state,
							&pending_state);
						g_print("Pipeline state changed from %s to %s:\n",
							gst_element_state_get_name(old_state),
							gst_element_state_get_name(new_state));
						if (pending_state == GST_STATE_NULL)
						{
							terminate = TRUE;
						}
					}
				}
				break;
				default:
				{
					/* We shouldnot reach here */
					g_printerr("Unexpected message received.\n");
					break;
				}
				}
				gst_message_unref(msg);
				//  std::this_thread::sleep_for(std::chrono::milliseconds(200));
			}

		} while (!terminate);

		/* Free resources */
		try
		{
			std::lock_guard<std::mutex> lock(_mutex);
			gst_object_unref(_bus);
			gst_element_set_state(_pipeline, GST_STATE_PAUSED);
			gst_element_set_state(_pipeline, GST_STATE_READY);
			gst_element_set_state(_pipeline, GST_STATE_NULL);
			gst_object_unref(_pipeline);
		}
		catch (std::exception &e)
		{
			cout << e.what();
			return true;
		}
		catch (...)
		{
			return true;
		}
	}

   

    return true;
}

main.cpp

#include "VideoPusher.h"

int main()
{
    VideoPusher pusher;
    std::string strRtmp = "rtmp://localhost:1945/live/room";
    std::string strRtsp = "rtsp://admin:HuaWei123@59.51.115.31/LiveMedia/ch1/Media1";
    std::string strCode = "h264";
    bool bSuccess = pusher.Run(strRtmp, strCode, strRtsp);

    getchar();
    return 0;
}