python类pickle()的实例源码

pickletester.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            self.assertIsInstance(s, bytes_types)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
            else:
                self.assertTrue(num_setitems >= 2)
pickletester.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 is less strict and also accept iterables.
        for proto in protocols:
            try:
                self.dumps(C(), proto)
            except (pickle.PickleError):
                pass
            try:
                self.dumps(D(), proto)
            except (pickle.PickleError):
                pass
pickletester.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_badly_quoted_string(self):
        # Issue #17710
        badpickles = [b"S'\n.",
                      b'S"\n.',
                      b'S\' \n.',
                      b'S" \n.',
                      b'S\'"\n.',
                      b'S"\'\n.',
                      b"S' ' \n.",
                      b'S" " \n.',
                      b"S ''\n.",
                      b'S ""\n.',
                      b'S \n.',
                      b'S\n.',
                      b'S.']
        for p in badpickles:
            self.assertRaises(pickle.UnpicklingError, self.loads, p)
pickletester.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_frame_opcodes(self, pickled):
        """
        Check the arguments of FRAME opcodes in a protocol 4+ pickle.
        """
        frame_opcode_size = 9
        last_arg = last_pos = None
        for op, arg, pos in pickletools.genops(pickled):
            if op.name != 'FRAME':
                continue
            if last_pos is not None:
                # The previous frame's size should be equal to the number
                # of bytes up to the current frame.
                frame_size = pos - last_pos - frame_opcode_size
                self.assertEqual(frame_size, last_arg)
            last_arg, last_pos = arg, pos
        # The last frame's size should be equal to the number of bytes up
        # to the pickle's end.
        frame_size = len(pickled) - last_pos - frame_opcode_size
        self.assertEqual(frame_size, last_arg)
pickletester.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_huge_str_32b(self, size):
        data = "abcd" * (size // 4)
        try:
            for proto in protocols:
                if proto == 0:
                    continue
                with self.subTest(proto=proto):
                    try:
                        pickled = self.dumps(data, protocol=proto)
                        header = (pickle.BINUNICODE +
                                  struct.pack("<I", len(data)))
                        data_start = pickled.index(b'abcd')
                        self.assertEqual(
                            header,
                            pickled[data_start-len(header):data_start])
                        self.assertEqual((pickled.rindex(b"abcd") + len(b"abcd") -
                                          pickled.index(b"abcd")), len(data))
                    finally:
                        pickled = None
        finally:
            data = None

    # BINUNICODE (protocols 1, 2 and 3) cannot carry more than 2**32 - 1 bytes
    # of utf-8 encoded unicode. BINUNICODE8 (protocol 4) supports these huge
    # unicode strings however.
__init__.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _ufunc_reconstruct(module, name):
    # The `fromlist` kwarg is required to ensure that `mod` points to the
    # inner-most module rather than the parent package when module name is
    # nested. This makes it possible to pickle non-toplevel ufuncs such as
    # scipy.special.expit for instance.
    mod = __import__(module, fromlist=[name])
    return getattr(mod, name)
__init__.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _ufunc_reduce(func):
    from pickle import whichmodule
    name = func.__name__
    return _ufunc_reconstruct, (whichmodule(func, name), name)
_tblib.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def install():
    try:
        import copy_reg
    except ImportError:
        import copyreg as copy_reg

    copy_reg.pickle(TracebackType, pickle_traceback)

# Added by gevent

# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also builtins.py for details.
_tblib.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _import_dump_load():
    global dumps
    global loads
    try:
        import cPickle as pickle
    except ImportError:
        import pickle
    dumps = pickle.dumps
    loads = pickle.loads
_tblib.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dump_traceback(tb):
    # Both _init and dump/load have to be unlocked, because
    # copy_reg and pickle can do imports to resolve class names; those
    # class names are in this module and greenlet safe though
    _init()
    return dumps(tb)
_tblib.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def install():
    try:
        import copy_reg
    except ImportError:
        import copyreg as copy_reg

    copy_reg.pickle(TracebackType, pickle_traceback)

# Added by gevent

# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also builtins.py for details.
_tblib.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _import_dump_load():
    global dumps
    global loads
    try:
        import cPickle as pickle
    except ImportError:
        import pickle
    dumps = pickle.dumps
    loads = pickle.loads
_tblib.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def dump_traceback(tb):
    # Both _init and dump/load have to be unlocked, because
    # copy_reg and pickle can do imports to resolve class names; those
    # class names are in this module and greenlet safe though
    _init()
    return dumps(tb)
dal.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def save_dbt(self,table, sql_fields_current):
        tfile = self.file_open(table._dbt, 'w')
        pickle.dump(sql_fields_current, tfile)
        self.file_close(tfile)
dal.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def import_table_definitions(self, path, migrate=False,
                                 fake_migrate=False, items=None):
        pattern = pjoin(path,self._uri_hash+'_*.table')
        if items:
            for tablename, table in items.iteritems():
                # TODO: read all field/table options
                fields = []
                # remove unsupported/illegal Table arguments
                [table.pop(name) for name in ("name", "fields") if
                 name in table]
                if "items" in table:
                    for fieldname, field in table.pop("items").iteritems():
                        # remove unsupported/illegal Field arguments
                        [field.pop(key) for key in ("requires", "name",
                         "compute", "colname") if key in field]
                        fields.append(Field(str(fieldname), **field))
                self.define_table(str(tablename), *fields, **table)
        else:
            for filename in glob.glob(pattern):
                tfile = self._adapter.file_open(filename, 'r')
                try:
                    sql_fields = pickle.load(tfile)
                    name = filename[len(pattern)-7:-6]
                    mf = [(value['sortable'],
                           Field(key,
                                 type=value['type'],
                                 length=value.get('length',None),
                                 notnull=value.get('notnull',False),
                                 unique=value.get('unique',False))) \
                              for key, value in sql_fields.iteritems()]
                    mf.sort(lambda a,b: cmp(a[0],b[0]))
                    self.define_table(name,*[item[1] for item in mf],
                                      **dict(migrate=migrate,
                                             fake_migrate=fake_migrate))
                finally:
                    self._adapter.file_close(tfile)
managers.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, address=None, authkey=None, serializer='pickle'):
        if authkey is None:
            authkey = current_process().authkey
        self._address = address     # XXX not final address if eg ('', 0)
        self._authkey = AuthenticationString(authkey)
        self._state = State()
        self._state.value = State.INITIAL
        self._serializer = serializer
        self._Listener, self._Client = listener_client[serializer]
pickletester.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def opcode_in_pickle(code, pickle):
    for op, dummy, dummy in pickletools.genops(pickle):
        if op.code == code.decode("latin-1"):
            return True
    return False

# Return the number of times opcode code appears in pickle.
pickletester.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def count_opcode(code, pickle):
    n = 0
    for op, dummy, dummy in pickletools.genops(pickle):
        if op.code == code.decode("latin-1"):
            n += 1
    return n
pickletester.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_float(self):
        test_values = [0.0, 4.94e-324, 1e-310, 7e-308, 6.626e-34, 0.1, 0.5,
                       3.14, 263.44582062374053, 6.022e23, 1e30]
        test_values = test_values + [-x for x in test_values]
        for proto in protocols:
            for value in test_values:
                pickle = self.dumps(value, proto)
                got = self.loads(pickle)
                self.assertEqual(value, got)
pickletester.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_long1(self):
        x = 12345678910111213141516178920
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            self.assertEqual(opcode_in_pickle(pickle.LONG1, s), proto >= 2)


问题


面经


文章

微信
公众号

扫码关注公众号