python类unpackb()的实例源码

pyxis.py 文件源码 项目:ml-pyxis 作者: vicolab 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get_sample(self, i):
        """Return the ith sample from `data_db`.

        Parameter
        ---------
        i : int
        """
        if 0 > i or self.nb_samples <= i:
            raise IndexError('The selected sample number is out of range: %d'
                             % i)

        # Convert the sample number to a string with trailing zeros
        key = encode_str('{:010}'.format(i))

        with self._lmdb_env.begin(db=self.data_db) as txn:
            # Read msgpack from LMDB and decode each value in it
            obj = msgpack.unpackb(txn.get(key))
            for k in obj:
                # Keys are stored as byte objects (hence the `decode_str`)
                obj[decode_str(k)] = msgpack.unpackb(
                    obj.pop(k), object_hook=decode_data)

        return obj
abstractClient.py 文件源码 项目:zatt 作者: simonacca 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _request(self, message):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect(self.server_address)
        sock.send(msgpack.packb(message, use_bin_type=True))

        buff = bytes()
        while True:
            block = sock.recv(128)
            if not block:
                break
            buff += block
        resp = msgpack.unpackb(buff, encoding='utf-8')
        sock.close()
        if 'type' in resp and resp['type'] == 'redirect':
            self.server_address = tuple(resp['leader'])
            resp = self._request(message)
        return resp
test_app.py 文件源码 项目:deb-python-falcon 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_list_images(client):
    doc = {
        'images': [
            {
                'href': '/images/1eaf6ef1-7f2d-4ecc-a8d5-6e8adba7cc0e.png'
            }
        ]
    }

    response = client.simulate_get('/images')
    result_doc = msgpack.unpackb(response.content, encoding='utf-8')

    assert result_doc == doc
    assert response.status == falcon.HTTP_OK


# With clever composition of fixtures, we can observe what happens with
# the mock injected into the image resource.
cache.py 文件源码 项目:bowtie 作者: jwkvam 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def load(key):
    """Load the value stored with the key.

    Parameters
    ----------
    key : str
        The key to lookup the value stored.

    Returns
    -------
    object
        The value if the key exists in the cache, otherwise None.

    """
    signal = 'cache_load'
    event = LightQueue(1)
    if flask.has_request_context():
        emit(signal, {'data': pack(key)}, callback=event.put)
    else:
        sio = flask.current_app.extensions['socketio']
        sio.emit(signal, {'data': pack(key)}, callback=event.put)
    return msgpack.unpackb(bytes(event.get(timeout=10)), encoding='utf8')
test_encoders.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_encode_traces_msgpack(self):
        # test encoding for MsgPack format
        traces = []
        traces.append([
            Span(name='client.testing', tracer=None),
            Span(name='client.testing', tracer=None),
        ])
        traces.append([
            Span(name='client.testing', tracer=None),
            Span(name='client.testing', tracer=None),
        ])

        encoder = MsgpackEncoder()
        spans = encoder.encode_traces(traces)
        items = msgpack.unpackb(spans)

        # test the encoded output that should be a string
        # and the output must be flatten
        ok_(isinstance(spans, msgpack_type))
        eq_(len(items), 2)
        eq_(len(items[0]), 2)
        eq_(len(items[1]), 2)
        for i in range(2):
            for j in range(2):
                eq_(b'client.testing', items[i][j][b'name'])
test_sentinel.py 文件源码 项目:pysoa 作者: eventbrite 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_simple_send_and_receive(self):
        client = self._set_up_client()

        payload = {'test': 'test_simple_send_receive'}

        client.send_message_to_queue(
            queue_key='test_simple_send_receive',
            message=msgpack.packb(payload),
            expiry=10,
            capacity=10,
            connection=client.get_connection('test_simple_send_receive'),
        )

        message = None
        for i in range(3):
            # Message will be on random server
            message = message or client.get_connection('test_simple_send_receive').lpop('test_simple_send_receive')

        self.assertIsNotNone(message)
        self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
test_sentinel.py 文件源码 项目:pysoa 作者: eventbrite 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_services_send_receive(self):
        client = self._set_up_client(sentinel_services=['service1', 'service2', 'service3'])

        payload = {'test': 'test_services_send_receive'}

        client.send_message_to_queue(
            queue_key='test_services_send_receive',
            message=msgpack.packb(payload),
            expiry=10,
            capacity=10,
            connection=client.get_connection('test_services_send_receive'),
        )

        message = None
        for i in range(3):
            # Message will be on random server
            message = message or client.get_connection('test_services_send_receive').lpop('test_services_send_receive')

        self.assertIsNotNone(message)
        self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
test_sentinel.py 文件源码 项目:pysoa 作者: eventbrite 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_no_hosts_send_receive(self):
        client = SentinelRedisClient()

        payload = {'test': 'test_no_hosts_send_receive'}

        client.send_message_to_queue(
            queue_key='test_no_hosts_send_receive',
            message=msgpack.packb(payload),
            expiry=10,
            capacity=10,
            connection=client.get_connection('test_no_hosts_send_receive'),
        )

        message = None
        for i in range(3):
            # Message will be on random server
            message = message or client.get_connection('test_no_hosts_send_receive').lpop('test_no_hosts_send_receive')

        self.assertIsNotNone(message)
        self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
test_standard.py 文件源码 项目:pysoa 作者: eventbrite 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_no_hosts_yields_single_default_host(self):
        client = StandardRedisClient()

        payload = {'test': 'test_no_hosts_yields_single_default_host'}

        client.send_message_to_queue(
            queue_key='test_no_hosts_yields_single_default_host',
            message=msgpack.packb(payload),
            expiry=10,
            capacity=10,
            connection=client.get_connection('test_no_hosts_yields_single_default_host'),
        )

        message = client.get_connection(
            'test_no_hosts_yields_single_default_host',
        ).lpop('test_no_hosts_yields_single_default_host')

        self.assertIsNotNone(message)
        self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
test_standard.py 文件源码 项目:pysoa 作者: eventbrite 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_string_host_yields_single_host(self):
        client = StandardRedisClient(hosts=['redis://localhost:1234/0'])

        payload = {'test': 'test_string_host_yields_single_host'}

        client.send_message_to_queue(
            queue_key='test_string_host_yields_single_host',
            message=msgpack.packb(payload),
            expiry=10,
            capacity=10,
            connection=client.get_connection('test_string_host_yields_single_host'),
        )

        message = client.get_connection(
            'test_string_host_yields_single_host',
        ).lpop('test_string_host_yields_single_host')

        self.assertIsNotNone(message)
        self.assertEqual(payload, msgpack.unpackb(message, encoding='utf-8'))
Client.py 文件源码 项目:server 作者: arrchat 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process_incoming_data(self, data):
            try:
                raw_packet = msgpack.unpackb(data)
                self.logger.debug('Incoming: %s', json.dumps(raw_packet))
                packet = self.clients.ipackets.make_packet(self, raw_packet)
                if isinstance(packet, Packets.IncomingPackets.ConfirmationPacket):
                    pass
                elif isinstance(packet, Packets.IncomingPackets.ResponsePacket):
                    self.send_outgoing_packet(Packets.OutgoingPacket.ConfirmationPacket(packet.response_id))
                else:
                    self.send_outgoing_packet(Packets.OutgoingPacket.ConfirmationPacket(packet.query_id))
                    self.send_outgoing_packet(Packets.OutgoingPacket.ResponsePacket(packet.build_response()))

            except Exception as e:
                print('{}: {}'.format(type(e).__name__, e.message))
                print(traceback.format_exc())
transport.py 文件源码 项目:mercury 作者: jr0d 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def receive(self):
        multipart = await self.socket.recv_multipart()
        parsed_message = parse_multipart_message(multipart)

        if not parsed_message:
            log.error('Received junk off the wire')
            raise MercuryClientException('Message is malformed')

        try:
            message = msgpack.unpackb(parsed_message['message'], encoding='utf-8')
        except TypeError as type_error:
            log.error('Received unpacked, non-string type: %s : %s' % (type(parsed_message),
                                                                       type_error))
            await self.send_error(parsed_message['address'], 'Client error, message is not packed')
            raise MercuryClientException('Message is malformed')

        except (msgpack.UnpackException, msgpack.ExtraData) as msgpack_exception:
            log.error('Received invalid request: %s' % str(
                msgpack_exception))

            await self.send_error(parsed_message['address'], 'Client error, message is malformed')
            raise MercuryClientException('Message is malformed')

        return parsed_message['address'], message
async_router_req_client.py 文件源码 项目:mercury 作者: jr0d 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def transceiver(self, payload):
        """Sends and receives messages.

        :param payload: A dict representing the message to send.
        :returns: A string representing the unpacked response.
        """

        packed = msgpack.packb(payload)

        await self.socket.send_multipart([packed])

        if self.response_timeout:
            if not await self.poller.poll(self.response_timeout * 1000):
                raise IOError('Timeout while waiting for server response')

        rep = await self.socket.recv()

        return self.check_and_return(msgpack.unpackb(rep, encoding='utf-8'))
transport.py 文件源码 项目:mercury 作者: jr0d 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def full_req_transceiver(zmq_url, data):
    """Used to send data and close connection.

    :param zmq_url: URL for the socket to connect to.
    :param data: The data to send.
    :returns: The unpacked response.
    """
    # TODO: Harden this
    # TODO: Add linger and POLLIN support : https://github.com/zeromq/pyzmq/issues/132
    ctx, socket = get_ctx_and_connect_req_socket(zmq_url)

    packed = msgpack.packb(data)
    socket.send_multipart([packed])

    rep = socket.recv()
    unpacked_rep = msgpack.unpackb(rep, encoding='utf-8')

    socket.close()
    ctx.term()
    return unpacked_rep
transport.py 文件源码 项目:mercury 作者: jr0d 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def transceiver(self, payload):
        """Sends and receives messages.

        :param payload: A dict representing the message to send.
        :returns: A string representing the unpacked response.
        """
        # TODO: Harden this
        # TODO: Add linger and POLLIN support :
        # https://github.com/zeromq/pyzmq/issues/132

        packed = msgpack.packb(payload)

        # blocks
        self.socket.send_multipart([packed])

        if self.response_timeout:
            if not self.poller.poll(self.response_timeout * 1000):
                raise IOError('Timeout while waiting for server response')
        # blocks
        rep = self.socket.recv()

        return self.check_and_return(msgpack.unpackb(rep, encoding='utf-8'))
yatesock.py 文件源码 项目:YATE 作者: GarethNelson 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def parser_thread(self):
       while self.active:
         eventlet.greenthread.sleep(0)
         data,addr = None,None
         while data==None:
            eventlet.greenthread.sleep(0)
            try:
               data,addr = self.parse_q.get()
            except:
               yatelog.minor_exception('YATESock','Failed during parse receive')
         if data != None:
            data = zlib.decompress(data)
            gc.disable() # performance hack for msgpack
            try:
               msg        = msgpack.unpackb(data,use_list = False)
               msg_type   = msg[0]
               msg_params = msg[1]
               msg_id     = msg[2]
               self.in_queues[msg_type].put((msg_params,msg_id,addr))
            except:
               yatelog.minor_exception('YATESock','Error while parsing packet from %s:%s' % addr)
            gc.enable()
usb2.py 文件源码 项目:flux_line_bot 作者: blesscat 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _begin_handshake(self):
        data = None
        self._feed_buffer(timeout=0.1)

        while True:
            d = self._unpack_buffer()
            if d[0] is None:
                break
            else:
                data = d

        if data is not None:
            channel_idx, buf, fin = data
            if channel_idx != 0xff or fin != 0xfe:
                return False

            data = msgpack.unpackb(buf, use_list=False, encoding="utf8",
                                   unicode_errors="ignore")
            self.session = data["session"]
            logger.debug("Get handshake session: %s", self.session)
            self.send_object(0xff, {"session": self.session,
                                    "client": "fluxclient-%s" % __version__})
            return True
        else:
            return False
usb2.py 文件源码 项目:flux_line_bot 作者: blesscat 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _complete_handshake(self):
        self._feed_buffer(timeout=0.05)
        channel_idx, buf, fin = self._unpack_buffer()
        if channel_idx == 0xfe and fin == 0xfe:
            data = msgpack.unpackb(buf, use_list=False, encoding="utf8",
                                   unicode_errors="ignore")
            if data["session"] == self.session:
                logger.debug("USB handshake completed")
                return True
            else:
                logger.debug("Recv handshake session: %s", data["session"])
                logger.debug("Handshake failed")
                return False

        if channel_idx is not None:
            logger.debug("USB handshake response wrong channel: 0x%02x",
                         channel_idx)
            return False

        logger.debug("USB handshake response timeout")
        return False
contract.py 文件源码 项目:shisetsu 作者: KixPanganiban 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def receive(msgpack_string, contract_digest=None):
        """Unpack a msgpack string into the appropriate Contract type.
        If `contract_digest` is set, only returns the Response/Failure
        if its digest matches the `contract_digest`, and returns False if it
        doesn't.
        """
        payload = msgpack.unpackb(msgpack_string)
        if payload[0] == Contract.CONTRACT_REQUEST:
            contract = Request.load(payload)
        elif payload[0] == Contract.CONTRACT_RESPONSE:
            contract = Response.load(payload)
        elif payload[0] == Contract.CONTRACT_FAILURE:
            contract = Failure.load(payload)

        if contract_digest is not None and not isinstance(contract, Request):
            if contract.request_digest == contract_digest:
                return contract
            else:
                return False

        return contract
test_newspec.py 文件源码 项目:deb-msgpack-python 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_ext():
    def check(ext, packed):
        assert packb(ext) == packed
        assert unpackb(packed) == ext
    check(ExtType(0x42, b'Z'), b'\xd4\x42Z') # fixext 1
    check(ExtType(0x42, b'ZZ'), b'\xd5\x42ZZ') # fixext 2
    check(ExtType(0x42, b'Z'*4), b'\xd6\x42' + b'Z'*4) # fixext 4
    check(ExtType(0x42, b'Z'*8), b'\xd7\x42' + b'Z'*8) # fixext 8
    check(ExtType(0x42, b'Z'*16), b'\xd8\x42' + b'Z'*16) # fixext 16
    # ext 8
    check(ExtType(0x42, b''), b'\xc7\x00\x42')
    check(ExtType(0x42, b'Z'*255), b'\xc7\xff\x42' + b'Z'*255)
    # ext 16
    check(ExtType(0x42, b'Z'*256), b'\xc8\x01\x00\x42' + b'Z'*256)
    check(ExtType(0x42, b'Z'*0xffff), b'\xc8\xff\xff\x42' + b'Z'*0xffff)
    # ext 32
    check(ExtType(0x42, b'Z'*0x10000), b'\xc9\x00\x01\x00\x00\x42' + b'Z'*0x10000)
    # needs large memory
    #check(ExtType(0x42, b'Z'*0xffffffff),
    #              b'\xc9\xff\xff\xff\xff\x42' + b'Z'*0xffffffff)
test_extension.py 文件源码 项目:deb-msgpack-python 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_extension_type():
    def default(obj):
        print('default called', obj)
        if isinstance(obj, array.array):
            typecode = 123 # application specific typecode
            data = obj.tostring()
            return ExtType(typecode, data)
        raise TypeError("Unknown type object %r" % (obj,))

    def ext_hook(code, data):
        print('ext_hook called', code, data)
        assert code == 123
        obj = array.array('d')
        obj.fromstring(data)
        return obj

    obj = [42, b'hello', array.array('d', [1.1, 2.2, 3.3])]
    s = msgpack.packb(obj, default=default)
    obj2 = msgpack.unpackb(s, ext_hook=ext_hook)
    assert obj == obj2
test_memoryview.py 文件源码 项目:deb-msgpack-python 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _runtest(format, nbytes, expected_header, expected_prefix, use_bin_type):
    # create a new array
    original_array = array(format)
    original_array.fromlist([255] * (nbytes // original_array.itemsize))
    original_data = get_data(original_array)
    view = make_memoryview(original_array)

    # pack, unpack, and reconstruct array
    packed = packb(view, use_bin_type=use_bin_type)
    unpacked = unpackb(packed)
    reconstructed_array = make_array(format, unpacked)

    # check that we got the right amount of data
    assert len(original_data) == nbytes
    # check packed header
    assert packed[:1] == expected_header
    # check packed length prefix, if any
    assert packed[1:1+len(expected_prefix)] == expected_prefix
    # check packed data
    assert packed[1+len(expected_prefix):] == original_data
    # check array unpacked correctly
    assert original_array == reconstructed_array
EndpointProcessor.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__( self, parent, socket ):
        self.parent = parent

        # A simple connection header sent by the proxy before the connection
        # content, it encapsulates the original connection source information.
        self.address = msgpack.unpackb( socket.recv( struct.unpack( '!I', socket.recv( 4 ) )[ 0 ] ) )
        self.parent.log( 'Remote address: %s' % str( self.address ) )

        try:
            socket = parent.sslContext.wrap_socket( socket, 
                                                    server_side = True, 
                                                    do_handshake_on_connect = True,
                                                    suppress_ragged_eofs = True )
        except:
            raise DisconnectException
        self.s = socket
        self.aid = None
        self.lock = Semaphore( 1 )
        self.r = rpcm( isHumanReadable = True, isDebug = self.parent.log )
        self.r.loadSymbols( Symbols.lookups )
        self.connId = uuid.uuid4()
        self.hostName = None
        self.int_ip = None
        self.ext_ip = None
        self.tags = []
ObjectsDb.py 文件源码 项目:lc_cloud 作者: refractionPOINT 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def decode( cls, data, withRouting = False, isFullDump = False ):
        event = None
        routing = None
        try:
            data = msgpack.unpackb( base64.b64decode( data ), use_list = True )
            if isFullDump:
                event = data
                cls._dataToUtf8( event )
            else:
                if 'event' in data:
                    event = data[ 'event' ]
                    cls._dataToUtf8( event )
                if 'routing' in data and withRouting:
                    routing = data[ 'routing' ]
                    cls._dataToUtf8( routing )
        except:
            event = None
            routing = None

        if withRouting and not isFullDump:
            return routing, event
        else:
            return event
dsc_database.py 文件源码 项目:dsc2 作者: stephenslab 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, db_prefix, master_names):
        self.db_prefix = db_prefix
        # If this is None, then the last block will be used
        # As master table
        self.master_names = master_names
        # different tables; one exec per table
        self.data = {}
        # master tables
        self.master = {}
        # list of exec names that are the last step in sequence
        self.last_block = []
        # key = block name, item = exec name
        self.groups = {}
        if os.path.isfile(self.db_prefix + '.map.mpk'):
            self.maps = msgpack.unpackb(open(self.db_prefix + '.map.mpk', 'rb').read(), encoding = 'utf-8',
                                        object_pairs_hook = OrderedDict)
        else:
            raise ResultDBError("DSC filename database is corrupted!")
msgpack.py 文件源码 项目:frontera-docs-zh_CN 作者: xsren 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def decode(self, buffer):
        obj = unpackb(buffer, encoding='utf-8')
        if obj[0] == b'pc':
            return ('page_crawled',
                    self._response_from_object(obj[1]))
        if obj[0] == b'le':
            return ('links_extracted',
                    self._request_from_object(obj[1]),
                    [self._request_from_object(x) for x in obj[2]])
        if obj[0] == b'us':
            return ('update_score', self._request_from_object(obj[1]), obj[2], obj[3])
        if obj[0] == b're':
            return ('request_error', self._request_from_object(obj[1]), to_native_str(obj[2]))
        if obj[0] == b'as':
            return ('add_seeds', [self._request_from_object(x) for x in obj[1]])
        if obj[0] == b'njid':
            return ('new_job_id', int(obj[1]))
        if obj[0] == b'of':
            return ('offset', int(obj[1]), int(obj[2]))
        return TypeError('Unknown message type')
Client.py 文件源码 项目:Sample-Code 作者: meigrafd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update(self):
        while self.running:
            # Read the length of the image as a 32-bit unsigned int.
            data_len = struct.unpack('<L', self.connection.read(struct.calcsize('<L')))[0]
            if data_len:
                printD('Updating...')
                printD('data_len: %s' % data_len)
                data = self.connection.read(data_len)
                deserialized_data = msgpack.unpackb(data, object_hook=msgpack_numpy.decode)
                printD('Frame received')
                #print(deserialized_data)
                #stdout.flush()
                img = Image.fromarray(deserialized_data)
                newImage = ImageTk.PhotoImage(img)
                self.gui.stream_label.configure(image=newImage)
                self.gui.stream_label.image = newImage
                printD("image updated")
            else:
                time.sleep(0.001)
Client.py 文件源码 项目:Sample-Code 作者: meigrafd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_2(self):
        if self.running == False:
            return
        # Read the length of the image as a 32-bit unsigned int.
        data_len = struct.unpack('<L', self.connection.read(struct.calcsize('<L')))[0]
        if data_len:
            printD('Updating...')
            printD('data_len: %s' % data_len)
            data = self.connection.read(data_len)
            deserialized_data = msgpack.unpackb(data, object_hook=msgpack_numpy.decode)
            printD('Frame received')
            #print(deserialized_data)
            #stdout.flush()
            img = Image.fromarray(deserialized_data)
            newImage = ImageTk.PhotoImage(img)
            self.gui.stream_label.configure(image=newImage)
            self.gui.stream_label.image = newImage
        self.gui.master.after(70, self.update_2)
test_msgs.py 文件源码 项目:bearded-avenger-sdk-py 作者: csirtgadgets 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_msgs_recv():

    def _recv_multipart():
        m = Msg(id=msgpack.packb(1234), mtype=Msg.PING, token='token1234', data=[]).to_list()

        return m

    ctx = zmq.Context()
    s = ctx.socket(zmq.REQ)
    s.recv_multipart = _recv_multipart

    m = Msg().recv(s)

    assert msgpack.unpackb(m[0]) == 1234
    assert m[1] == 'token1234'
    assert m[2] == 'ping'
    assert m[3] == '[]'
scanner.py 文件源码 项目:redis-memory-analyzer 作者: gamenet 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def resolve_types(self, ret):
        if not self.pipeline_mode:
            try:
                key_with_types = msgpack.unpackb(self.resolve_types_script(ret))
            except ResponseError as e:
                if "CROSSSLOT" not in repr(e):
                    raise e
                key_with_types = self.resolve_with_pipe(ret)
                self.pipeline_mode = True
        else:
            key_with_types = self.resolve_with_pipe(ret)

        for i in range(0, len(ret)):
            yield key_with_types[i], ret[i]

        ret.clear()


问题


面经


文章

微信
公众号

扫码关注公众号