python类Socket()的实例源码

zmq_tools.py 文件源码 项目:esys-pbi 作者: fsxfreak 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, ctx, url):
        self.socket = zmq.Socket(ctx, zmq.PUB)
        self.socket.connect(url)
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _bind_socket(self, socket, addr=None, transport=None):
        """
        Bind a socket using the corresponding transport and address.

        Parameters
        ----------
        socket : zmq.Socket
            Socket to bind.
        addr : str, default is None
            The address to bind to.
        transport : str, AgentAddressTransport, default is None
            Transport protocol.

        Returns
        -------
        addr : str
            The address where the socket binded to.
        """
        if transport == 'tcp':
            host, port = address_to_host_port(addr)
            if not port:
                uri = 'tcp://%s' % self.host
                port = socket.bind_to_random_port(uri)
                addr = self.host + ':' + str(port)
            else:
                socket.bind('tcp://%s' % (addr))
        else:
            if not addr:
                addr = str(unique_identifier())
            if transport == 'ipc':
                addr = config['IPC_DIR'] / addr
            socket.bind('%s://%s' % (transport, addr))
        return addr
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _process_async_rep_event(self, socket, channel, data):
        """
        Process a ASYNC_REP socket's event.

        Parameters
        ----------
        socket : zmq.Socket
            Socket that generated the event.
        channel : AgentChannel
            AgentChannel associated with the socket that generated the event.
        data : bytes
            Data received on the socket.
        """
        message = deserialize_message(message=data,
                                      serializer=channel.serializer)
        address_uuid, request_uuid, data, address = message
        client_address = address.twin()
        if not self.registered(client_address):
            self.connect(address)
        handler = self.handler[socket]
        is_generator = inspect.isgeneratorfunction(handler)
        if is_generator:
            generator = handler(self, data)
            reply = next(generator)
        else:
            reply = handler(self, data)
        self.send(client_address, (address_uuid, request_uuid, reply))
        if is_generator:
            execute_code_after_yield(generator)
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _process_sync_pub_event(self, socket, channel, data):
        """
        Process a SYNC_PUB socket's event.

        Parameters
        ----------
        socket : zmq.Socket
            Socket that generated the event.
        channel : AgentChannel
            AgentChannel associated with the socket that generated the event.
        data : bytes
            Data received on the socket.
        """
        message = deserialize_message(message=data,
                                      serializer=channel.serializer)
        address_uuid, request_uuid, data = message
        handler = self.handler[socket]
        is_generator = inspect.isgeneratorfunction(handler)
        if is_generator:
            generator = handler(self, data)
            reply = next(generator)
        else:
            reply = handler(self, data)
        message = (address_uuid, request_uuid, reply)
        self._send_channel_sync_pub(channel=channel,
                                    message=message,
                                    topic=address_uuid,
                                    general=False)
        if is_generator:
            execute_code_after_yield(generator)
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _process_sub_event(self, socket, addr, data):
        """
        Process a SUB socket's event.

        Parameters
        ----------
        socket : zmq.Socket
            Socket that generated the event.
        addr : AgentAddress
            AgentAddress associated with the socket that generated the event.
        data : bytes
            Data received on the socket.
        """
        handlers = self.handler[socket]

        message = self._process_sub_message(addr.serializer, data)

        for topic in handlers:
            if not data.startswith(topic):
                continue
            # Call the handler (with or without the topic)
            handler = handlers[topic]
            nparams = len(inspect.signature(handler).parameters)
            if nparams == 2:
                handler(self, message)
            elif nparams == 3:
                handler(self, message, topic)
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_unique_external_zmq_sockets(self):
        """
        Return an iterable containing all the zmq.Socket objects from
        `self.socket` which are not internal, without repetition.

        Originally, a socket was internal if its alias was one of the
        following:
            - loopback
            - _loopback_safe
            - inproc://loopback
            - inproc://_loopback_safe

        However, since we are storing more than one entry in the `self.socket`
        dictionary per zmq.socket (by storing its AgentAddress, for example),
        we need a way to simply get all non-internal zmq.socket objects, and
        this is precisely what this function does.
        """
        reserved = ('loopback', '_loopback_safe', 'inproc://loopback',
                    'inproc://_loopback_safe')
        external_sockets = []

        for k, v in self.socket.items():
            if isinstance(k, zmq.sugar.socket.Socket):
                continue
            if isinstance(k, AgentAddress) and k.address in reserved:
                continue
            if k in reserved:
                continue

            external_sockets.append(v)

        return set(external_sockets)
test_zmq_pub_sub.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def recv_messages(zmq_subscriber, timeout_count, message_count):
    """Test utility function.

    Subscriber thread that receives and counts ZMQ messages.

    Args:
        zmq_subscriber (zmq.Socket): ZMQ subscriber socket.
        timeout_count (int): No. of failed receives until exit.
        message_count (int): No. of messages expected to be received.

    Returns:
        (int) Number of messages received.
    """
    # pylint: disable=E1101
    fails = 0  # No. of receives that didn't return a message.
    receive_count = 0  # Total number of messages received.
    while fails < timeout_count:
        try:
            _ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK)
            fails = 0
            receive_count += 1
            if receive_count == message_count:
                break
        except zmq.ZMQError as error:
            if error.errno == zmq.EAGAIN:
                pass
            else:
                raise
        fails += 1
        time.sleep(1e-6)
    return receive_count
sockets.py 文件源码 项目:networkzero 作者: tjguk 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        zmq.Socket.__init__(self, *args, **kwargs)
        #
        # Keep track of which thread this socket was created in
        #
        self.__dict__['_thread'] = threading.current_thread()
handlers.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, interface_or_socket, context=None):
        logging.Handler.__init__(self)
        if isinstance(interface_or_socket, zmq.Socket):
            self.socket = interface_or_socket
            self.ctx = self.socket.context
        else:
            self.ctx = context or zmq.Context()
            self.socket = self.ctx.socket(zmq.PUB)
            self.socket.bind(interface_or_socket)
poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_descriptors(self):
        """Returns three elements tuple with socket descriptors ready
        for gevent.select.select
        """
        rlist = []
        wlist = []
        xlist = []

        for socket, flags in self.sockets:
            if isinstance(socket, zmq.Socket):
                rlist.append(socket.getsockopt(zmq.FD))
                continue
            elif isinstance(socket, int):
                fd = socket
            elif hasattr(socket, 'fileno'):
                try:
                    fd = int(socket.fileno())
                except:
                    raise ValueError('fileno() must return an valid integer fd')
            else:
                raise TypeError('Socket must be a 0MQ socket, an integer fd '
                                'or have a fileno() method: %r' % socket)

            if flags & zmq.POLLIN:
                rlist.append(fd)
            if flags & zmq.POLLOUT:
                wlist.append(fd)
            if flags & zmq.POLLERR:
                xlist.append(fd)

        return (rlist, wlist, xlist)
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_subclass(self):
        """subclasses can assign attributes"""
        class S(zmq.Socket):
            a = None
            def __init__(self, *a, **kw):
                self.a=-1
                super(S, self).__init__(*a, **kw)

        s = S(self.context, zmq.REP)
        self.sockets.append(s)
        self.assertEqual(s.a, -1)
        s.a=1
        self.assertEqual(s.a, 1)
        a=s.a
        self.assertEqual(a, 1)
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_shadow(self):
        p = self.socket(zmq.PUSH)
        p.bind("tcp://127.0.0.1:5555")
        p2 = zmq.Socket.shadow(p.underlying)
        self.assertEqual(p.underlying, p2.underlying)
        s = self.socket(zmq.PULL)
        s2 = zmq.Socket.shadow(s.underlying)
        self.assertNotEqual(s.underlying, p.underlying)
        self.assertEqual(s.underlying, s2.underlying)
        s2.connect("tcp://127.0.0.1:5555")
        sent = b'hi'
        p2.send(sent)
        rcvd = self.recv(s2)
        self.assertEqual(rcvd, sent)
poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def unregister(self, socket):
        """Remove a 0MQ socket or native fd for I/O monitoring.

        Parameters
        ----------
        socket : Socket
            The socket instance to stop polling.
        """
        idx = self._map.pop(socket)
        self.sockets.pop(idx)
        # shift indices after deletion
        for socket, flags in self.sockets[idx:]:
            self._map[socket] -= 1
handlers.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, interface_or_socket, context=None):
        logging.Handler.__init__(self)
        if isinstance(interface_or_socket, zmq.Socket):
            self.socket = interface_or_socket
            self.ctx = self.socket.context
        else:
            self.ctx = context or zmq.Context()
            self.socket = self.ctx.socket(zmq.PUB)
            self.socket.bind(interface_or_socket)
poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_descriptors(self):
        """Returns three elements tuple with socket descriptors ready
        for gevent.select.select
        """
        rlist = []
        wlist = []
        xlist = []

        for socket, flags in self.sockets:
            if isinstance(socket, zmq.Socket):
                rlist.append(socket.getsockopt(zmq.FD))
                continue
            elif isinstance(socket, int):
                fd = socket
            elif hasattr(socket, 'fileno'):
                try:
                    fd = int(socket.fileno())
                except:
                    raise ValueError('fileno() must return an valid integer fd')
            else:
                raise TypeError('Socket must be a 0MQ socket, an integer fd '
                                'or have a fileno() method: %r' % socket)

            if flags & zmq.POLLIN:
                rlist.append(fd)
            if flags & zmq.POLLOUT:
                wlist.append(fd)
            if flags & zmq.POLLERR:
                xlist.append(fd)

        return (rlist, wlist, xlist)
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_subclass(self):
        """subclasses can assign attributes"""
        class S(zmq.Socket):
            a = None
            def __init__(self, *a, **kw):
                self.a=-1
                super(S, self).__init__(*a, **kw)

        s = S(self.context, zmq.REP)
        self.sockets.append(s)
        self.assertEqual(s.a, -1)
        s.a=1
        self.assertEqual(s.a, 1)
        a=s.a
        self.assertEqual(a, 1)
poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def register(self, socket, flags=POLLIN|POLLOUT):
        """p.register(socket, flags=POLLIN|POLLOUT)

        Register a 0MQ socket or native fd for I/O monitoring.

        register(s,0) is equivalent to unregister(s).

        Parameters
        ----------
        socket : zmq.Socket or native socket
            A zmq.Socket or any Python object having a ``fileno()`` 
            method that returns a valid file descriptor.
        flags : int
            The events to watch for.  Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
            If `flags=0`, socket will be unregistered.
        """
        if flags:
            if socket in self._map:
                idx = self._map[socket]
                self.sockets[idx] = (socket, flags)
            else:
                idx = len(self.sockets)
                self.sockets.append((socket, flags))
                self._map[socket] = idx
        elif socket in self._map:
            # uregister sockets registered with no events
            self.unregister(socket)
        else:
            # ignore new sockets with no events
            pass
poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def unregister(self, socket):
        """Remove a 0MQ socket or native fd for I/O monitoring.

        Parameters
        ----------
        socket : Socket
            The socket instance to stop polling.
        """
        idx = self._map.pop(socket)
        self.sockets.pop(idx)
        # shift indices after deletion
        for socket, flags in self.sockets[idx:]:
            self._map[socket] -= 1
handlers.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, interface_or_socket, context=None):
        logging.Handler.__init__(self)
        if isinstance(interface_or_socket, zmq.Socket):
            self.socket = interface_or_socket
            self.ctx = self.socket.context
        else:
            self.ctx = context or zmq.Context()
            self.socket = self.ctx.socket(zmq.PUB)
            self.socket.bind(interface_or_socket)
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_subclass(self):
        """subclasses can assign attributes"""
        class S(zmq.Socket):
            a = None
            def __init__(self, *a, **kw):
                self.a=-1
                super(S, self).__init__(*a, **kw)

        s = S(self.context, zmq.REP)
        self.sockets.append(s)
        self.assertEqual(s.a, -1)
        s.a=1
        self.assertEqual(s.a, 1)
        a=s.a
        self.assertEqual(a, 1)


问题


面经


文章

微信
公众号

扫码关注公众号