python类loads()的实例源码

cache.py 文件源码 项目:isni-reconcile 作者: cmh2166 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def load_object(self, value):
        """The reversal of :meth:`dump_object`.  This might be called with
        None.
        """
        if value is None:
            return None
        if value.startswith(b'!'):
            try:
                return pickle.loads(value[1:])
            except pickle.PickleError:
                return None
        try:
            return int(value)
        except ValueError:
            # before 0.8 we did not have serialization.  Still support that.
            return value
cache.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def get(self, key, timeout=None):
        """Given a key, returns an element from the redis table"""
        key = self.pre_identifier + key
        # Check to see if we have this key
        unpickled_entry = self.client.get(key)
        if not unpickled_entry:
            # No hit, return nothing
            return None

        entry = pickle.loads(unpickled_entry)
        # Use provided timeout in arguments if provided
        # otherwise use the one provided during init.
        if timeout is None:
            timeout = self.timeout

        # Make sure entry is not expired
        if self._is_expired(entry, timeout):
            # entry expired, delete and return nothing
            self.delete_entry(key)
            return None
        # entry found and not expired, return it
        return entry[1]
cache.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_object(self, value):
        """The reversal of :meth:`dump_object`.  This might be callde with
        None.
        """
        if value is None:
            return None
        if value.startswith(b'!'):
            try:
                return pickle.loads(value[1:])
            except pickle.PickleError:
                return None
        try:
            return int(value)
        except ValueError:
            # before 0.8 we did not have serialization.  Still support that.
            return value
data_io.py 文件源码 项目:TPN 作者: myfavouritekk 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def tpn_test_iterator(track_path):
    """ return values:
            x: list of tracks
    """
    temp_res = None

    tracks = []
    # zipfile
    if zipfile.is_zipfile(track_path):
        zf = zipfile.ZipFile(track_path)
        track_list = zf.namelist()
        # print "Loading {} tracks...".format(len(track_list))
        for track_name in track_list:
            tracks.append(cPickle.loads(zf.read(track_name)))
        zf.close()
    # folders
    elif osp.isdir(track_path):
        track_list = sorted(glob.glob(osp.join(track_path, '*')))
        # print "Loading {} tracks...".format(len(track_list))
        for track_name in track_list:
            tracks.append(cPickle.loads(open(track_name, 'rb').read()))
    else:
        raise NotImplementedError('Only zipfile and directories are supported.')

    return tracks
db.py 文件源码 项目:guernsey 作者: ingnil 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _loadJson(cls, filename):
        jsonFh = open(filename, "r")
        jsonDb = json.load(jsonFh)
        jsonFh.close()
        db = cls()
        backRefTables = []
        for tableName, b64PickledTable in jsonDb.iteritems():
            pickledTable = base64.b64decode(b64PickledTable)
            table = pickle.loads(pickledTable)
            if tableName == "_backRefTables":
                backRefTables = table
                continue
            setattr(db, tableName, table)

        for tableName in backRefTables:
            getattr(db, tableName).setDatabase(db)

        for tableName, table in db.__dict__.iteritems():
            table = cls._loadJsonTableTransform(db, tableName, table)
            setattr(db, tableName, table)

        return db
gateway_server_xml_rpc.py 文件源码 项目:creator-system-test-framework 作者: CreatorDev 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def __GetResourceValuesFromReadResponse(self, response, path, resourceType):
        values = None
        if resourceType == AwaResourceType.StringArray:
            values = pickle.loads(self._xmlrpcSession.AwaServerReadResponse_GetValuesAsStringArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.IntegerArray:
            values = pickle.loads(self._xmlrpcSession.AwaServerReadResponse_GetValuesAsIntegerArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.FloatArray:
            values = pickle.loads(self._xmlrpcSession.AwaServerReadResponse_GetValuesAsFloatArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.BooleanArray:
            values = pickle.loads(self._xmlrpcSession.AwaServerReadResponse_GetValuesAsBooleanArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.OpaqueArray:
            values = pickle.loads(self._xmlrpcSession.AwaServerReadResponse_GetValuesAsOpaqueArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.TimeArray:
            values = pickle.loads(self._xmlrpcSession.AwaServerReadResponse_GetValuesAsTimeArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.ObjectLinkArray:
            values = pickle.loads(self._xmlrpcSession.AwaServerReadResponse_GetValuesAsObjectLinkArrayPointer(response, path, None))
        else:
            raise AwaInvalidArgumentException("Invalid resource type", resourceType)

        #CheckSuccess(error, "Failed to retrieve values on path %s" % (path, ))

        return values
gateway_client_xml_rpc.py 文件源码 项目:creator-system-test-framework 作者: CreatorDev 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __GetResourceValuesFromGetResponse(self, response, path, resourceType):
        values = None
        if resourceType == AwaResourceType.StringArray:
            values = pickle.loads(self._xmlrpcSession.AwaClientGetResponse_GetValuesAsStringArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.IntegerArray:
            values = pickle.loads(self._xmlrpcSession.AwaClientGetResponse_GetValuesAsIntegerArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.FloatArray:
            values = pickle.loads(self._xmlrpcSession.AwaClientGetResponse_GetValuesAsFloatArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.BooleanArray:
            values = pickle.loads(self._xmlrpcSession.AwaClientGetResponse_GetValuesAsBooleanArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.OpaqueArray:
            values = pickle.loads(self._xmlrpcSession.AwaClientGetResponse_GetValuesAsOpaqueArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.TimeArray:
            values = pickle.loads(self._xmlrpcSession.AwaClientGetResponse_GetValuesAsTimeArrayPointer(response, path, None))
        elif resourceType == AwaResourceType.ObjectLinkArray:
            values = pickle.loads(self._xmlrpcSession.AwaClientGetResponse_GetValuesAsObjectLinkArrayPointer(response, path, None))
        else:
            raise AwaInvalidArgumentException("Invalid resource type", resourceType)

        #CheckSuccess(error, "Failed to retrieve values on path %s" % (path, ))

        return values
DeadReader.py 文件源码 项目:CellsCycle 作者: AQuadroTeam 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def retry_until_success(self, msg, times):
        stop = False
        while not stop:
            self.internal_channel.send_first_internal_channel_message(msg)

            rep_msg = self.internal_channel.wait_int_message(dont_wait=False)

            if not (rep_msg == NOK or rep_msg == DIE):
                rep_msg = loads(rep_msg)
                if times == 2:
                    # not necessary
                    self.update_birth_information(rep_msg)
                stop = True
            else:
                self.logger.debug("wrong information at sync time, retry in 0.5 seconds")
                time.sleep(0.5)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def deserialize(pkl):
    return pickle.loads(pkl)
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def cookie_decode(data, key, digestmod=None):
    """ Verify and decode an encoded string. Return an object or None."""
    depr(0, 13, "cookie_decode() will be removed soon.",
                "Do not use this API directly.")
    data = tob(data)
    if cookie_is_encoded(data):
        sig, msg = data.split(tob('?'), 1)
        digestmod = digestmod or hashlib.sha256
        hashed = hmac.new(tob(key), msg, digestmod=digestmod).digest()
        if _lscmp(sig[1:], base64.b64encode(hashed)):
            return pickle.loads(base64.b64decode(msg))
    return None
Cookie.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def value_decode(self, val):
        # This could raise an exception!
        return loads( _unquote(val) ), val
Cookie.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def value_decode(self, val):
        strval = _unquote(val)
        try:
            return loads(strval), val
        except:
            return strval, val
caffe_python_layers.py 文件源码 项目:alchemy 作者: voidrank 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def reshape(self, bottom, top):
        while self.redis.llen(config.solver_prototxt) == 0:
            print 'waiting for spider...', self.redis.llen(config.solver_prototxt)
            time.sleep(0.2)

        self.item = cPickle.loads(self.redis.lpop(config.solver_prototxt))
        item = self.item

        for i in range(len(item)):
            top[i].reshape(*item[i].shape)
db_utils.py 文件源码 项目:pybot 作者: spillai 项目源码 文件源码 阅读 128 收藏 0 点赞 0 评论 0
def unpack(self, item): 
        if isinstance(item, np.ndarray):
            return item
        elif isinstance(item, str) and item.startswith('OBJ_'): 
            return cPickle.loads(item[4:])
        else: 
            raise ValueError('Unknown type written to pytables')
_version133.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def unpicklechops(string):
    """base64decodes and unpickes it's argument string into chops"""

    return loads(zlib.decompress(base64.decodestring(string)))
mapped_struct.py 文件源码 项目:sharedbuffers 作者: jampp 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def testPackPickleUnpack(self):
        for TEST_VALUES in self.TEST_VALUES:
            x = self.Struct(**{k:v for k,v in TEST_VALUES.iteritems()})
            pschema = cPickle.loads(cPickle.dumps(self.schema))
            dx = pschema.unpack(self.schema.pack(x))
            for k,v in TEST_VALUES.iteritems():
                self.assertTrue(hasattr(dx, k))
                self.assertEqual(getattr(dx, k), v)
            for k in self.Struct.__slots__:
                if k not in TEST_VALUES:
                    self.assertFalse(hasattr(dx, k))
mapped_struct.py 文件源码 项目:sharedbuffers 作者: jampp 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, buf, offset = 0, idmap = None, idmap_size = 1024):
        if idmap is None:
            idmap = Cache(idmap_size)
        self.offset = offset
        if offset != 0:
            self.buf = buf = buffer(buf, offset)
        else:
            self.buf = buf
        self.total_size, self.index_offset, self.index_elements = self._Header.unpack_from(buf, 0)
        self.index = numpy.frombuffer(buf, 
            offset = self.index_offset, 
            dtype = numpy.uint64, 
            count = self.index_elements)
        self.idmap = idmap

        if self.index_elements > 0 and self.index[0] >= (self._Header.size + self._NewHeader.size):
            # New version, most likely
            self.version, min_reader_version, self.schema_offset, self.schema_size = self._NewHeader.unpack_from(
                buf, self._Header.size)
            if self._CURRENT_VERSION < min_reader_version:
                raise ValueError((
                    "Incompatible buffer, this buffer needs a reader with support for version %d at least, "
                    "this reader supports up to version %d") % (
                        min_reader_version,
                        self._CURRENT_VERSION
                    ))
            if self.schema_offset and self.schema_size:
                if self.schema_offset > len(buf) or (self.schema_size + self.schema_offset) > len(buf):
                    raise ValueError("Corrupted input - bad schema location")
                stored_schema = cPickle.loads(bytes(buffer(buf, self.schema_offset, self.schema_size)))
                if not isinstance(stored_schema, Schema):
                    raise ValueError("Corrupted input - unrecognizable schema")
                if self.schema is None or not self.schema.compatible(stored_schema):
                    self.schema = stored_schema
            elif self.schema is None:
                raise ValueError("Cannot map schema-less buffer without specifying schema")
        elif self.index_elements > 0:
            raise ValueError("Cannot reliably map version-0 buffers")
utils.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _handle_serialization(func):
    def wrapped(session, params):
        params = pickle.loads(params['params'])
        rv = func(session, *params['args'], **params['kwargs'])
        return pickle.dumps(rv)
    return wrapped
session.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def call_plugin_serialized(self, plugin, fn, *args, **kwargs):
        params = {'params': pickle.dumps(dict(args=args, kwargs=kwargs))}
        rv = self.call_plugin(plugin, fn, params)
        return pickle.loads(rv)
cache.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self, key):
        try:
            expires, value = self._cache[key]
            if expires == 0 or expires > time():
                return pickle.loads(value)
        except (KeyError, pickle.PickleError):
            return None


问题


面经


文章

微信
公众号

扫码关注公众号