python类dumps()的实例源码

static_output.py 文件源码 项目:yt 作者: yt-project 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __new__(cls, filename=None, *args, **kwargs):
        if not isinstance(filename, string_types):
            obj = object.__new__(cls)
            # The Stream frontend uses a StreamHandler object to pass metadata
            # to __init__.
            is_stream = (hasattr(filename, 'get_fields') and
                         hasattr(filename, 'get_particle_type'))
            if not is_stream:
                obj.__init__(filename, *args, **kwargs)
            return obj
        apath = os.path.abspath(filename)
        cache_key = (apath, cPickle.dumps(args), cPickle.dumps(kwargs))
        if ytcfg.getboolean("yt","skip_dataset_cache"):
            obj = object.__new__(cls)
        elif cache_key not in _cached_datasets:
            obj = object.__new__(cls)
            if obj._skip_cache is False:
                _cached_datasets[cache_key] = obj
        else:
            obj = _cached_datasets[cache_key]
        return obj
test_convolution_2d.py 文件源码 项目:chainer-deconv 作者: germanRos 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_pickling(self, x_data):
        x = chainer.Variable(x_data)
        y = self.link(x)
        y_data1 = y.data

        del x, y

        pickled = pickle.dumps(self.link, -1)
        del self.link
        self.link = pickle.loads(pickled)

        x = chainer.Variable(x_data)
        y = self.link(x)
        y_data2 = y.data

        gradient_check.assert_allclose(y_data1, y_data2, atol=0, rtol=0)
cache.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def save(self, key, data):
        try:
            with open(self.cache_path, 'wb') as fh:
                self.data[pickle.dumps(key)] = data
                pickle.dump(self.data, fh, protocol=2)
        except Exception as e:
            log.warning("Could not save cache %s err: %s" % (
                self.cache_path, e))
            if not os.path.exists(self.cache_path):
                directory = os.path.dirname(self.cache_path)
                log.info('Generating Cache directory: %s.' % directory)
                try:
                    os.makedirs(directory)
                except Exception as e:
                    log.warning("Could not create directory: %s err: %s" % (
                        directory, e))
utils.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update(self, items):
    if self.read_only:
      return
    query = """UPDATE {tb} SET value=(?) WHERE key=("?");"""
    if isinstance(items, Mapping):
      items = items.items()
    # ====== check if update is in cache ====== #
    db_update = []
    for key, value in items:
      key = str(key)
      if key in self.current_cache:
        self.current_cache[key] = value
      else:
        db_update.append((marshal.dumps(value), key))
    # ====== perform DB update ====== #
    self.cursor.executemany(query.format(tb=self._current_table), db_update)
    self.connection.commit()
    return self
model_test.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_complex_transform(self):
        with TemporaryDirectory() as temp:
            from sklearn.pipeline import Pipeline
            path = os.path.join(temp, 'audio.sph')
            urlretrieve(filename=path,
                        url='https://s3.amazonaws.com/ai-datasets/sw02001.sph')
            f = Pipeline([
                ('step1', model.SpeechTransform('mspec', fs=8000, vad=True)),
                ('step2', model.Transform(lambda x: (x[0][:, :40],
                                                     x[1].astype(str)))),
                ('step3', model.Transform(lambda x: (np.sum(x[0]),
                                                    ''.join(x[1].tolist()))))
            ])
            x = f.transform(path)
            f = cPickle.loads(cPickle.dumps(f))
            y = f.transform(path)
            self.assertEqual(x[0], y[0])
            self.assertEqual(y[0], -3444229.0)
            self.assertEqual(x[1], y[1])
test_unit.py 文件源码 项目:sqlalchemy-validation 作者: blazelibs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_pickling(self):
        so = ex.SomeObj(minlen=5)
        assert so._sav.entity.minlen == 5
        pstr = pickle.dumps(so)
        del so

        so2 = pickle.loads(pstr)
        assert so2._sav.entity.minlen == 5

        # make sure it's a weakref
        vh = so2._sav
        del so2
        gc.collect()

        try:
            vh.entity
            assert False, 'expected exception'
        except EntityRefMissing:
            pass
test.py 文件源码 项目:chainer-glu 作者: musyoku 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_pickling(self, x_data):
        x = chainer.Variable(x_data)
        y = self.link(x)
        y_data1 = y.data

        del x, y

        pickled = pickle.dumps(self.link, -1)
        del self.link
        self.link = pickle.loads(pickled)

        x = chainer.Variable(x_data)
        y = self.link(x)
        y_data2 = y.data

        testing.assert_allclose(y_data1, y_data2, atol=0, rtol=0)
sqlite_recorder.py 文件源码 项目:OpenMDAO 作者: OpenMDAO 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def record_metadata_solver(self, recording_requester):
        """
        Record solver metadata.

        Parameters
        ----------
        recording_requester: <Solver>
            The Solver that would like to record its metadata.
        """
        path = recording_requester._system.pathname
        solver_class = type(recording_requester).__name__
        if not path:
            path = 'root'
        id = "{}.{}".format(path, solver_class)

        solver_options = pickle.dumps(recording_requester.options,
                                      pickle.HIGHEST_PROTOCOL)

        with self.con:
            self.con.execute(
                "INSERT INTO solver_metadata(id, solver_options, solver_class) "
                "VALUES(?,?,?)", (id, sqlite3.Binary(solver_options), solver_class))
web_recorder.py 文件源码 项目:OpenMDAO 作者: OpenMDAO 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _record_driver_metadata(self, driver_class, model_viewer_data):
        """
        Record driver metadata.

        Parameters
        ----------
        driver_class : str
            The name of the driver type.
        model_viewer_data : JSON Object
            All model viewer data, including variable names relationships.
        """
        driver_metadata_dict = {
            'id': driver_class,
            'model_viewer_data': model_viewer_data
        }
        driver_metadata = json.dumps(driver_metadata_dict)

        requests.post(self._endpoint + '/' + self._case_id + '/driver_metadata',
                      data=driver_metadata, headers=self._headers)
test_simple.py 文件源码 项目:pywren 作者: pywren 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_map(self):

        def plus_one(x):
            return x + 1
        N = 10

        x = np.arange(N)
        futures_original = self.wrenexec.map(plus_one, x)
        futures_str = pickle.dumps(futures_original)
        futures = pickle.loads(futures_str)

        result_count = 0
        while result_count < N:

            fs_dones, fs_notdones = pywren.wait(futures)
            result_count = len(fs_dones)

        res = np.array([f.result() for f in futures])
        np.testing.assert_array_equal(res, x + 1)
util.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _serialize_key(self, key):
        return cPickle.dumps(key)
gpickle.py 文件源码 项目:pybel 作者: pybel 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def to_bytes(graph, protocol=HIGHEST_PROTOCOL):
    """Converts a graph to bytes with pickle. Note that the pickle module has some incompatibilities between Python
    2 and 3. To export a universally importable pickle, choose 0, 1, or 2.

    :param BELGraph graph: A BEL network
    :param int protocol: Pickling protocol to use
    :return: Pickled bytes representing the graph
    :rtype: bytes

    .. seealso:: https://docs.python.org/3.6/library/pickle.html#data-stream-format
    """
    raise_for_not_bel(graph)
    return dumps(graph, protocol=protocol)
shortcuts.py 文件源码 项目:serialtime 作者: ianlini 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def save_pklgz(obj, path, log_description=None, logger=None,
               logging_level=logging.INFO, verbose_start=True,
               verbose_end=True, end_in_new_line=True, log_prefix="..."):
    if log_description is None:
        log_description = "Pickling to " + (path)
    with SimpleTimer(log_description, logger, logging_level, verbose_start,
                     verbose_end, end_in_new_line, log_prefix):
        pkl = cPickle.dumps(obj, protocol=cPickle.HIGHEST_PROTOCOL)
        with gzip.open(path, "wb") as fp:
            fp.write(pkl)
parallel.py 文件源码 项目:rltools 作者: sisl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _dumps(o):
    return cPickle.dumps(o, protocol=-1)
test_function_set.py 文件源码 项目:chainer-deconv 作者: germanRos 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_pickle_cpu(self):
        fs2_serialized = pickle.dumps(self.fs2)
        fs2_loaded = pickle.loads(fs2_serialized)
        self.assertTrue((self.fs2.b.p.data == fs2_loaded.b.p.data).all())
        self.assertTrue(
            (self.fs2.fs1.a.p.data == fs2_loaded.fs1.a.p.data).all())
test_function_set.py 文件源码 项目:chainer-deconv 作者: germanRos 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_pickle_gpu(self):
        self.fs2.to_gpu()
        fs2_serialized = pickle.dumps(self.fs2)
        fs2_loaded = pickle.loads(fs2_serialized)
        fs2_loaded.to_cpu()
        self.fs2.to_cpu()

        self.assertTrue((self.fs2.b.p.data == fs2_loaded.b.p.data).all())
        self.assertTrue(
            (self.fs2.fs1.a.p.data == fs2_loaded.fs1.a.p.data).all())
test_function_set.py 文件源码 项目:chainer-deconv 作者: germanRos 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_pickle_cpu(self):
        s = pickle.dumps(self.fs)
        fs2 = pickle.loads(s)
        self.check_equal_fs(self.fs, fs2)
test_function_set.py 文件源码 项目:chainer-deconv 作者: germanRos 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_pickle_gpu(self):
        self.fs.to_gpu()
        s = pickle.dumps(self.fs)
        fs2 = pickle.loads(s)

        self.fs.to_cpu()
        fs2.to_cpu()
        self.check_equal_fs(self.fs, fs2)
cache.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self, key):
        return self.data.get(pickle.dumps(key))
cache.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def save(self, key, data):
        self.data[pickle.dumps(key)] = data
cache.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, key):
        k = pickle.dumps(key)
        return self.data.get(k)
test_cache.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_get(self):
        #mock the pick and set it to the data variable
        test_pickle = pickle.dumps(
            {pickle.dumps(self.test_key): self.test_value}, protocol=2)
        self.test_cache.data = pickle.loads(test_pickle)

        #assert
        self.assertEquals(self.test_cache.get(self.test_key), self.test_value)
        self.assertEquals(self.test_cache.get(self.bad_key), None)
utils.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __setstate__(self, states):
    path, read_only, cache_size = states
    if not os.path.exists(path):
      raise ValueError("Cannot find store NoSQL database at path: %s."
                       "If you have moved the database, the dumps from "
                       "cannot restore the previous intance." % path)
    self._restore_dict(path, read_only, cache_size)
    self._path = path
    self._read_only = read_only
    self._cache_size = cache_size
utils.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _flush(self, save_all=False):
    curr_tab = self.current_table
    tables = self.get_all_tables() if save_all else [curr_tab]
    for tab in tables:
      self.set_table(tab)
      if len(self.current_cache) > 0:
        self.cursor.executemany(
            "INSERT INTO {tb} VALUES (?, ?)".format(tb=tab),
            [(str(k), marshal.dumps(v.tolist()) if isinstance(v, np.ndarray)
              else marshal.dumps(v))
             for k, v in self.current_cache.items()])
        self.connection.commit()
        self.current_cache.clear()
    # restore the last table
    return self.set_table(curr_tab)
decorators.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def func_to_str(func):
  # conver to byte
  code = cPickle.dumps(array("B", marshal.dumps(func.__code__)),
                       protocol=cPickle.HIGHEST_PROTOCOL)
  closure = None
  if func.__closure__ is not None:
    print("[WARNING] function: %s contains closure, which cannot be "
          "serialized." % str(func))
    closure = tuple([c.cell_contents for c in func.__closure__])
  defaults = func.__defaults__
  return (code, closure, defaults)
decorators.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, func, *args, **kwargs):
    super(functionable, self).__init__()
    self._function = func
    self.__name__ = self._function.__name__
    try: # sometime cannot get the source
      self._source = inspect.getsource(self._function)
    except Exception as e:
      print("[WARNING] Cannot get source code of function:", func,
            "(error:%s)" % str(e))
      self._source = None
    # try to pickle the function directly
    try:
      self._sandbox = cPickle.dumps(self._function,
          protocol=cPickle.HIGHEST_PROTOCOL)
    except Exception:
      self._sandbox = _serialize_function_sandbox(func, self._source)
    # ====== store argsmap ====== #
    argspec = inspect.getargspec(func)
    argsmap = OrderedDict([(i, _ArgPlaceHolder_()) for i in argspec.args])
    # store defaults
    if argspec.defaults is not None:
      for name, arg in zip(argspec.args[::-1], argspec.defaults[::-1]):
        argsmap[name] = arg
    # update positional arguments
    for name, arg in zip(argspec.args, args):
      argsmap[name] = arg
    # update kw arguments
    argsmap.update(kwargs)
    self._argsmap = argsmap

  # ==================== Pickling methods ==================== #
__init__.py 文件源码 项目:odin 作者: imito 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def is_pickleable(x):
  try:
    cPickle.dumps(x, protocol=cPickle.HIGHEST_PROTOCOL)
    return True
  except cPickle.PickleError:
    return False
work_pool.py 文件源码 项目:batchup 作者: Britefury 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _serialise_args(shared_objects, args):  # pragma: no cover
    serialised_args = []
    for arg in args:
        if isinstance(arg, _SharedConstant):
            value = arg.value
            key = id(value)
            if key not in shared_objects:
                shared_objects[key] = dumps(value)
            ref = _SharedRef(key=key)
            serialised = ref
        else:
            serialised = arg
        serialised_args.append(serialised)
    return tuple(serialised_args)
test_pickle_unpickle_theano_fn.py 文件源码 项目:Theano-Deep-learning 作者: GeekLiB 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_pickle_unpickle_with_reoptimization():
    mode = theano.config.mode
    if mode in ["DEBUG_MODE", "DebugMode"]:
        mode = "FAST_RUN"
    x1 = T.fmatrix('x1')
    x2 = T.fmatrix('x2')
    x3 = theano.shared(numpy.ones((10, 10), dtype=floatX))
    x4 = theano.shared(numpy.ones((10, 10), dtype=floatX))
    y = T.sum(T.sum(T.sum(x1 ** 2 + x2) + x3) + x4)

    updates = OrderedDict()
    updates[x3] = x3 + 1
    updates[x4] = x4 + 1
    f = theano.function([x1, x2], y, updates=updates, mode=mode)

    # now pickle the compiled theano fn
    string_pkl = pickle.dumps(f, -1)

    in1 = numpy.ones((10, 10), dtype=floatX)
    in2 = numpy.ones((10, 10), dtype=floatX)

    # test unpickle with optimization
    default = theano.config.reoptimize_unpickled_function
    try:
        # the default is True
        theano.config.reoptimize_unpickled_function = True
        f_ = pickle.loads(string_pkl)
        assert f(in1, in2) == f_(in1, in2)
    finally:
        theano.config.reoptimize_unpickled_function = default
test_pickle_unpickle_theano_fn.py 文件源码 项目:Theano-Deep-learning 作者: GeekLiB 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_pickle_unpickle_without_reoptimization():
    mode = theano.config.mode
    if mode in ["DEBUG_MODE", "DebugMode"]:
        mode = "FAST_RUN"
    x1 = T.fmatrix('x1')
    x2 = T.fmatrix('x2')
    x3 = theano.shared(numpy.ones((10, 10), dtype=floatX))
    x4 = theano.shared(numpy.ones((10, 10), dtype=floatX))
    y = T.sum(T.sum(T.sum(x1**2 + x2) + x3) + x4)

    updates = OrderedDict()
    updates[x3] = x3 + 1
    updates[x4] = x4 + 1
    f = theano.function([x1, x2], y, updates=updates, mode=mode)

    # now pickle the compiled theano fn
    string_pkl = pickle.dumps(f, -1)

    # compute f value
    in1 = numpy.ones((10, 10), dtype=floatX)
    in2 = numpy.ones((10, 10), dtype=floatX)

    # test unpickle without optimization
    default = theano.config.reoptimize_unpickled_function
    try:
        # the default is True
        theano.config.reoptimize_unpickled_function = False
        f_ = pickle.loads(string_pkl)
        assert f(in1, in2) == f_(in1, in2)
    finally:
        theano.config.reoptimize_unpickled_function = default


问题


面经


文章

微信
公众号

扫码关注公众号