python类FORWARDER的实例源码

servers.py 文件源码 项目:OldSpeak 作者: 0rbitAeolian 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def execute_command_forwarder():
    from oldspeak.console.parsers.streamer import parser

    args = parser.parse_args(get_sub_parser_argv())
    bootstrap_conf_with_gevent(args)

    device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)

    device.bind_in(args.subscriber)
    device.bind_out(args.publisher)
    device.setsockopt_in(zmq.SUBSCRIBE, b'')
    if args.subscriber_hwm:
        device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm)

    if args.publisher_hwm:
        device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm)

    print "oldspeak forwarder started"
    print "date", datetime.utcnow().isoformat()
    print "subscriber", (getattr(args, 'subscriber'))
    print "publisher", (getattr(args, 'publisher'))
    device.start()
test_device.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_device_types(self):
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
            self.assertEqual(dev.device_type, devtype)
            del dev
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def test_device_types(self):
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
            self.assertEqual(dev.device_type, devtype)
            del dev
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_device_types(self):
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
            self.assertEqual(dev.device_type, devtype)
            del dev
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_device_types(self):
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
            self.assertEqual(dev.device_type, devtype)
            del dev
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_device_types(self):
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
            self.assertEqual(dev.device_type, devtype)
            del dev
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_device_types(self):
        for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
            dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
            self.assertEqual(dev.device_type, devtype)
            del dev
zmqlib.py 文件源码 项目:idealoom 作者: conversence 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start_dispatch_thread():
    global INITED, DISPATCHER
    if INITED:
        return
    DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB)
    DISPATCHER.bind_in(INTERNAL_SOCKET)
    DISPATCHER.connect_out(CHANGES_SOCKET)
    DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB')
    DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB')
    DISPATCHER.start()
    #Fix weird nosetests problems. TODO: find and fix underlying problem
    sleep(0.01)
    INITED = True


问题


面经


文章

微信
公众号

扫码关注公众号