def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
python类device()的实例源码
def test_devices(self):
"""test device imports"""
import zmq.devices
from zmq.devices import basedevice
from zmq.devices import monitoredqueue
from zmq.devices import monitoredqueuedevice
def device(device_type, isocket, osocket):
"""Start a zeromq device (gevent-compatible).
Unlike the true zmq.device, this does not release the GIL.
Parameters
----------
device_type : (QUEUE, FORWARDER, STREAMER)
The type of device to start (ignored).
isocket : Socket
The Socket instance for the incoming traffic.
osocket : Socket
The Socket instance for the outbound traffic.
"""
p = Poller()
if osocket == -1:
osocket = isocket
p.register(isocket, zmq.POLLIN)
p.register(osocket, zmq.POLLIN)
while True:
events = dict(p.poll())
if isocket in events:
osocket.send_multipart(isocket.recv_multipart())
if osocket in events:
isocket.send_multipart(osocket.recv_multipart())
def run_device(self):
"""The runner method.
Do not call me directly, instead call ``self.start()``, just like a Thread.
"""
ins,outs = self._setup_sockets()
device(self.device_type, ins, outs)
def start(self):
"""Start the device. Override me in subclass for other launchers."""
return self.run()
def test_core(self):
"""test core imports"""
from zmq import Context
from zmq import Socket
from zmq import Poller
from zmq import Frame
from zmq import constants
from zmq import device, proxy
from zmq import Stopwatch
from zmq import (
zmq_version,
zmq_version_info,
pyzmq_version,
pyzmq_version_info,
)
def main(req_port=None, res_port=None, use_security=False):
'''main of queue
:param req_port: port for clients
:param res_port: port for servers
'''
if req_port is None:
req_port = env.get_req_port()
if res_port is None:
res_port = env.get_res_port()
auth = None
try:
context = zmq.Context()
frontend_service = context.socket(zmq.XREP)
backend_service = context.socket(zmq.XREQ)
if use_security:
if not os.path.exists(env.get_server_public_key_dir()):
create_certificates(env.get_server_public_key_dir())
auth = Authenticator.instance(env.get_server_public_key_dir())
auth.set_server_key(
frontend_service, env.get_server_secret_key_path())
auth.set_client_key(backend_service, env.get_client_secret_key_path(),
env.get_server_public_key_path())
frontend_service.bind('tcp://*:{req_port}'.format(req_port=req_port))
backend_service.bind('tcp://*:{res_port}'.format(res_port=res_port))
zmq.device(zmq.QUEUE, frontend_service, backend_service)
except KeyboardInterrupt:
pass
finally:
frontend_service.close()
backend_service.close()
context.term()
if use_security and auth is not None:
auth.stop()