def execute_command_forwarder():
from oldspeak.console.parsers.streamer import parser
args = parser.parse_args(get_sub_parser_argv())
bootstrap_conf_with_gevent(args)
device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
device.bind_in(args.subscriber)
device.bind_out(args.publisher)
device.setsockopt_in(zmq.SUBSCRIBE, b'')
if args.subscriber_hwm:
device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm)
if args.publisher_hwm:
device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm)
print "oldspeak forwarder started"
print "date", datetime.utcnow().isoformat()
print "subscriber", (getattr(args, 'subscriber'))
print "publisher", (getattr(args, 'publisher'))
device.start()
python类FORWARDER的实例源码
def test_device_types(self):
for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
self.assertEqual(dev.device_type, devtype)
del dev
def test_device_types(self):
for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
self.assertEqual(dev.device_type, devtype)
del dev
def test_device_types(self):
for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
self.assertEqual(dev.device_type, devtype)
del dev
def test_device_types(self):
for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
self.assertEqual(dev.device_type, devtype)
del dev
def test_device_types(self):
for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
self.assertEqual(dev.device_type, devtype)
del dev
def test_device_types(self):
for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
self.assertEqual(dev.device_type, devtype)
del dev
def start_dispatch_thread():
global INITED, DISPATCHER
if INITED:
return
DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB)
DISPATCHER.bind_in(INTERNAL_SOCKET)
DISPATCHER.connect_out(CHANGES_SOCKET)
DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB')
DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB')
DISPATCHER.start()
#Fix weird nosetests problems. TODO: find and fix underlying problem
sleep(0.01)
INITED = True