python类REP的实例源码

service.py 文件源码 项目:jps 作者: OTL 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, callback, host=None, res_port=None, use_security=False):
        if host is None:
            host = env.get_master_host()
        context = zmq.Context()
        self._socket = context.socket(zmq.REP)
        self._auth = None
        if use_security:
            self._auth = Authenticator.instance(
                env.get_server_public_key_dir())
            self._auth.set_server_key(
                self._socket, env.get_server_secret_key_path())

        if res_port is None:
            res_port = env.get_res_port()
        self._socket.connect(
            'tcp://{host}:{port}'.format(host=host, port=res_port))
        self._callback = callback
        self._thread = None
        self._lock = threading.Lock()
server.py 文件源码 项目:python-zcm 作者: pranav-srinivas-kumar 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, name, priority, actor_context, endpoints, 
                 operation_function, operation_queue):
        """
        Create a server

        Keyword arguments:
        name - Name of the timer
        priority - Priority of the subscriber
        actor_context - ZMQ context of the actor process
        endpoints - A list of endpoint strings
        operation_function - Operation function of the subscriber
        operation_queue - The operation queue object
        """
        self.name = name
        self.priority = priority
        self.endpoints = endpoints
        self.operation_function = operation_function
        self.operation_queue = operation_queue
        self.context = actor_context
        self.server_socket = self.context.socket(zmq.REP)
        for endpoint in self.endpoints:
            self.server_socket.bind(endpoint)
        self.ready = True
        self.func_mutex = Lock()
server.py 文件源码 项目:research-dist 作者: DrawML 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run():
    print("Getting ready for hello world client.  Ctrl-C to exit.\n")
    socket = Ctx.socket(zmq.REP)
    socket.bind(Url)
    while True:
        #  Wait for next request from client
        message = await socket.recv()
        print("Received request: {}".format(message))
        #  Do some "work"
        await asyncio.sleep(1)
        #  Send reply back to client
        message = message.decode('utf-8')
        message = '{}, world'.format(message)
        message = message.encode('utf-8')
        print("Sending reply: {}".format(message))
        await socket.send(message)
base.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start(self):
        """Create and bind the ZAP socket"""
        self.zap_socket = self.context.socket(zmq.REP)
        self.zap_socket.linger = 1
        self.zap_socket.bind("inproc://zeromq.zap.01")
test_socket.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_subclass(self):
        """subclasses can assign attributes"""
        class S(zmq.Socket):
            a = None
            def __init__(self, *a, **kw):
                self.a=-1
                super(S, self).__init__(*a, **kw)

        s = S(self.context, zmq.REP)
        self.sockets.append(s)
        self.assertEqual(s.a, -1)
        s.a=1
        self.assertEqual(s.a, 1)
        a=s.a
        self.assertEqual(a, 1)
test_socket.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_close_after_destroy(self):
        """s.close() after ctx.destroy() should be fine"""
        ctx = self.Context()
        s = ctx.socket(zmq.REP)
        ctx.destroy()
        # reaper is not instantaneous
        time.sleep(1e-2)
        s.close()
        self.assertTrue(s.closed)
test_context.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_many_sockets(self):
        """opening and closing many sockets shouldn't cause problems"""
        ctx = self.Context()
        for i in range(16):
            sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
            [ s.close() for s in sockets ]
            # give the reaper a chance
            time.sleep(1e-2)
        ctx.term()
test_context.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_destroy(self):
        """Context.destroy should close sockets"""
        ctx = self.Context()
        sockets = [ ctx.socket(zmq.REP) for i in range(65) ]

        # close half of the sockets
        [ s.close() for s in sockets[::2] ]

        ctx.destroy()
        # reaper is not instantaneous
        time.sleep(1e-2)
        for s in sockets:
            self.assertTrue(s.closed)
test_context.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_term_thread(self):
        """ctx.term should not crash active threads (#139)"""
        ctx = self.Context()
        evt = Event()
        evt.clear()

        def block():
            s = ctx.socket(zmq.REP)
            s.bind_to_random_port('tcp://127.0.0.1')
            evt.set()
            try:
                s.recv()
            except zmq.ZMQError as e:
                self.assertEqual(e.errno, zmq.ETERM)
                return
            finally:
                s.close()
            self.fail("recv should have been interrupted with ETERM")
        t = Thread(target=block)
        t.start()

        evt.wait(1)
        self.assertTrue(evt.is_set(), "sync event never fired")
        time.sleep(0.01)
        ctx.term()
        t.join(timeout=1)
        self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
test_reqrep.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_basic(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)

        msg1 = b'message 1'
        msg2 = self.ping_pong(s1, s2, msg1)
        self.assertEqual(msg1, msg2)
test_reqrep.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_multiple(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)

        for i in range(10):
            msg1 = i*b' '
            msg2 = self.ping_pong(s1, s2, msg1)
            self.assertEqual(msg1, msg2)
test_reqrep.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_bad_send_recv(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)

        if zmq.zmq_version() != '2.1.8':
            # this doesn't work on 2.1.8
            for copy in (True,False):
                self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
                self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)

        # I have to have this or we die on an Abort trap.
        msg1 = b'asdf'
        msg2 = self.ping_pong(s1, s2, msg1)
        self.assertEqual(msg1, msg2)
test_reqrep.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_json(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
        o = dict(a=10,b=list(range(10)))
        o2 = self.ping_pong_json(s1, s2, o)
test_reqrep.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_large_msg(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
        msg1 = 10000*b'X'

        for i in range(10):
            msg2 = self.ping_pong(s1, s2, msg1)
            self.assertEqual(msg1, msg2)
test_zmqstream.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setUp(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REP)
        self.loop = ioloop.IOLoop.instance()
        self.stream = zmqstream.ZMQStream(self.socket)
test_error.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
test_error.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def atest_ctxterm(self):
        s = self.context.socket(zmq.REP)
        t = Thread(target=self.context.term)
        t.start()
        self.assertRaises(ContextTerminated, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.TERM, s.recv, zmq.NOBLOCK)
        s.close()
        t.join()
test_security.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def zap_handler(self):
        socket = self.context.socket(zmq.REP)
        socket.bind("inproc://zeromq.zap.01")
        try:
            msg = self.recv_multipart(socket)

            version, sequence, domain, address, identity, mechanism = msg[:6]
            if mechanism == b'PLAIN':
                username, password = msg[6:]
            elif mechanism == b'CURVE':
                key = msg[6]

            self.assertEqual(version, b"1.0")
            self.assertEqual(identity, b"IDENT")
            reply = [version, sequence]
            if mechanism == b'CURVE' or \
                (mechanism == b'PLAIN' and username == USER and password == PASS) or \
                (mechanism == b'NULL'):
                reply.extend([
                    b"200",
                    b"OK",
                    b"anonymous",
                    b"\5Hello\0\0\0\5World",
                ])
            else:
                reply.extend([
                    b"400",
                    b"Invalid username or password",
                    b"",
                    b"",
                ])
            socket.send_multipart(reply)
        finally:
            socket.close()
test_monitor.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_monitor(self):
        """Test monitoring interface for sockets."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6666")
        # try monitoring the REP socket

        s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
        # create listening socket for monitor
        s_event = self.context.socket(zmq.PAIR)
        self.sockets.append(s_event)
        s_event.connect("inproc://monitor.rep")
        s_event.linger = 0
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6666")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")

        # test monitor can be disabled.
        s_rep.disable_monitor()
        m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
test_device.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_single_socket_forwarder_bind(self):
        if zmq.zmq_version() in ('4.1.1', '4.0.6'):
            raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        # select random port:
        binder = self.context.socket(zmq.REQ)
        port = binder.bind_to_random_port('tcp://127.0.0.1')
        binder.close()
        time.sleep(0.1)
        req = self.context.socket(zmq.REQ)
        req.connect('tcp://127.0.0.1:%i'%port)
        dev.bind_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        # select random port:
        binder = self.context.socket(zmq.REQ)
        port = binder.bind_to_random_port('tcp://127.0.0.1')
        binder.close()
        time.sleep(0.1)
        req = self.context.socket(zmq.REQ)
        req.connect('tcp://127.0.0.1:%i'%port)
        dev.bind_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()


问题


面经


文章

微信
公众号

扫码关注公众号