test_client.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:cloudcam 作者: revmischa 项目源码 文件源码
def gst_rtp_pipeline(test_params):
    print("starting gstreamer rtp pipeline..")
    GObject.threads_init()
    Gst.init(None)

    # setup a pipeline which will receive RTP video and decode it, calling new_gst_buffer() on every decoded frame
    udpsrc = Gst.ElementFactory.make("udpsrc", None)
    udpsrc.set_property('port', test_params.rtp_port)
    udpsrc.set_property('caps', Gst.caps_from_string('application/x-rtp, encoding-name=H264, payload=96'))
    rtph264depay = Gst.ElementFactory.make("rtph264depay", None)
    h264parse = Gst.ElementFactory.make("h264parse", None)
    avdec_h264 = Gst.ElementFactory.make("avdec_h264", None)
    sink = Gst.ElementFactory.make("appsink", None)

    pipeline = Gst.Pipeline.new("rtp-pipeline")
    pipeline.add(udpsrc)
    pipeline.add(rtph264depay)
    pipeline.add(h264parse)
    pipeline.add(avdec_h264)
    pipeline.add(sink)

    udpsrc.link(rtph264depay)
    rtph264depay.link(h264parse)
    h264parse.link(avdec_h264)
    avdec_h264.link(sink)

    # gst event loop/bus
    loop_thread = GstMainLoopThread()

    sink.set_property("emit-signals", True)
    sink.connect("new-sample", loop_thread.gst_new_buffer, sink)

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", loop_thread.gst_bus_call, loop_thread.loop)

    # start play back and listed to events
    pipeline.set_state(Gst.State.PLAYING)
    loop_thread.start()

    yield loop_thread

    print("stopping gstreamer rtp pipeline..")
    loop_thread.loop.quit()
    pipeline.set_state(Gst.State.NULL)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号