python类device()的实例源码

basedevice.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def start(self):
        """Start the device. Override me in subclass for other launchers."""
        return self.run()
test_imports.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_devices(self):
        """test device imports"""
        import zmq.devices
        from zmq.devices import basedevice
        from zmq.devices import monitoredqueue
        from zmq.devices import monitoredqueuedevice
device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def device(device_type, isocket, osocket):
    """Start a zeromq device (gevent-compatible).

    Unlike the true zmq.device, this does not release the GIL.

    Parameters
    ----------
    device_type : (QUEUE, FORWARDER, STREAMER)
        The type of device to start (ignored).
    isocket : Socket
        The Socket instance for the incoming traffic.
    osocket : Socket
        The Socket instance for the outbound traffic.
    """
    p = Poller()
    if osocket == -1:
        osocket = isocket
    p.register(isocket, zmq.POLLIN)
    p.register(osocket, zmq.POLLIN)

    while True:
        events = dict(p.poll())
        if isocket in events:
            osocket.send_multipart(isocket.recv_multipart())
        if osocket in events:
            isocket.send_multipart(osocket.recv_multipart())
basedevice.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run_device(self):
        """The runner method.

        Do not call me directly, instead call ``self.start()``, just like a Thread.
        """
        ins,outs = self._setup_sockets()
        device(self.device_type, ins, outs)
basedevice.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start(self):
        """Start the device. Override me in subclass for other launchers."""
        return self.run()
test_imports.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_core(self):
        """test core imports"""
        from zmq import Context
        from zmq import Socket
        from zmq import Poller
        from zmq import Frame
        from zmq import constants
        from zmq import device, proxy
        from zmq import Stopwatch
        from zmq import ( 
            zmq_version,
            zmq_version_info,
            pyzmq_version,
            pyzmq_version_info,
        )
queue.py 文件源码 项目:jps 作者: OTL 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main(req_port=None, res_port=None, use_security=False):
    '''main of queue

    :param req_port: port for clients
    :param res_port: port for servers
    '''
    if req_port is None:
        req_port = env.get_req_port()
    if res_port is None:
        res_port = env.get_res_port()
    auth = None
    try:
        context = zmq.Context()
        frontend_service = context.socket(zmq.XREP)
        backend_service = context.socket(zmq.XREQ)
        if use_security:
            if not os.path.exists(env.get_server_public_key_dir()):
                create_certificates(env.get_server_public_key_dir())
            auth = Authenticator.instance(env.get_server_public_key_dir())
            auth.set_server_key(
                frontend_service, env.get_server_secret_key_path())
            auth.set_client_key(backend_service, env.get_client_secret_key_path(),
                                env.get_server_public_key_path())
        frontend_service.bind('tcp://*:{req_port}'.format(req_port=req_port))
        backend_service.bind('tcp://*:{res_port}'.format(res_port=res_port))
        zmq.device(zmq.QUEUE, frontend_service, backend_service)
    except KeyboardInterrupt:
        pass
    finally:
        frontend_service.close()
        backend_service.close()
        context.term()
        if use_security and auth is not None:
            auth.stop()


问题


面经


文章

微信
公众号

扫码关注公众号