python类dumps()的实例源码

workload_mapping.py 文件源码 项目:ottertune 作者: cmu-db 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def compress(workload_state):
        return zlib.compress(pickle.dumps(workload_state))
test_threading.py 文件源码 项目:sudkamp-langs-machines-python 作者: thundergolfer 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_pickle(self):
        import sys
        if sys.version_info < (3, 4):
            import dill as pickle
        else:
            import pickle

        # go to non initial state B
        self.stuff.to_B()
        # pickle Stuff model
        dump = pickle.dumps(self.stuff)
        self.assertIsNotNone(dump)
        stuff2 = pickle.loads(dump)
        self.assertTrue(stuff2.machine.is_state("B"))
        # check if machines of stuff and stuff2 are truly separated
        stuff2.to_A()
        self.stuff.to_C()
        self.assertTrue(stuff2.machine.is_state("A"))
        thread = Thread(target=stuff2.forward)
        thread.start()
        # give thread some time to start
        time.sleep(0.01)
        # both objects should be in different states
        # and also not share locks
        begin = time.time()
        # stuff should not be locked and execute fast
        self.assertTrue(self.stuff.machine.is_state("C"))
        fast = time.time()
        # stuff2 should be locked and take about 1 second
        # to be executed
        self.assertTrue(stuff2.machine.is_state("B"))
        blocked = time.time()
        self.assertAlmostEqual(fast-begin, 0, delta=0.1)
        self.assertAlmostEqual(blocked-begin, 1, delta=0.1)


# Same as TestLockedTransition but with LockedHierarchicalMachine
test_nesting.py 文件源码 项目:sudkamp-langs-machines-python 作者: thundergolfer 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_pickle(self):
        import sys
        if sys.version_info < (3, 4):
            import dill as pickle
        else:
            import pickle

        states = ['A', 'B', {'name': 'C', 'children': ['1', '2', {'name': '3', 'children': ['a', 'b', 'c']}]},
          'D', 'E', 'F']
        transitions = [
            {'trigger': 'walk', 'source': 'A', 'dest': 'B'},
            {'trigger': 'run', 'source': 'B', 'dest': 'C'},
            {'trigger': 'sprint', 'source': 'C', 'dest': 'D'}
        ]
        m = self.stuff.machine_cls(states=states, transitions=transitions, initial='A')
        m.walk()
        dump = pickle.dumps(m)
        self.assertIsNotNone(dump)
        m2 = pickle.loads(dump)
        self.assertEqual(m.state, m2.state)
        m2.run()
        if State.separator in '_':
            m2.to_C_3_a()
            m2.to_C_3_b()
        else:
            m2.to_C.s3.a()
            m2.to_C.s3.b()
test_queue.py 文件源码 项目:kq 作者: joowani 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_enqueue_job(producer, logger):
    producer_cls, producer_inst = producer

    queue = Queue(hosts='host:7000', topic='foo', timeout=300)

    old_job = Job(
        id='2938401',
        timestamp=int(time.time()),
        topic='bar',
        func=failure_func,
        args=[1, 2],
        kwargs={'a': 3},
        timeout=100,
    )
    new_job = queue.enqueue(old_job)

    assert isinstance(new_job, Job)
    assert isinstance(new_job.id, str)
    assert isinstance(new_job.timestamp, int)
    assert old_job.id != new_job.id
    assert old_job.timestamp <= new_job.timestamp
    assert new_job.topic == 'foo'
    assert new_job.func == failure_func
    assert new_job.args == [1, 2]
    assert new_job.kwargs == {'a': 3}
    assert new_job.timeout == 300
    assert new_job.key is None

    producer_inst.send.assert_called_with(
        'foo', dill.dumps(new_job), key=None
    )
    logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))
test_queue.py 文件源码 项目:kq 作者: joowani 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_enqueue_job_with_key(producer, logger):
    producer_cls, producer_inst = producer

    queue = Queue(hosts='host:7000', topic='foo', timeout=300)

    old_job = Job(
        id='2938401',
        timestamp=int(time.time()),
        topic='bar',
        func=failure_func,
        args=[1, 2],
        kwargs={'a': 3},
        timeout=100,
        key='bar',
    )
    new_job = queue.enqueue_with_key('baz', old_job)

    assert isinstance(new_job, Job)
    assert isinstance(new_job.id, str)
    assert isinstance(new_job.timestamp, int)
    assert old_job.id != new_job.id
    assert old_job.timestamp <= new_job.timestamp
    assert new_job.topic == 'foo'
    assert new_job.func == failure_func
    assert new_job.args == [1, 2]
    assert new_job.kwargs == {'a': 3}
    assert new_job.timeout == 300
    assert new_job.key == 'baz'

    producer_inst.send.assert_called_with(
        'foo', dill.dumps(new_job), key='baz'
    )
    logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))
util.py 文件源码 项目:FeatureHub 作者: HDI-Project 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def run_isolated(f, *args):
    """Execute `f(args)` in an isolated environment.

    First, uses dill to serialize the function. Unfortunately, pickle is unable
    to serialize some functions, so we must serialize and deserialize the
    function ourselves.
    """
    f_dill = dill.dumps(f)
    with Pool(1) as pool:
        return pool.apply(_get_function_and_execute, (f_dill, *args))
tasks.py 文件源码 项目:pythonwhat 作者: datacamp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getStreamPickle(name, process, shell):
    try:
        return pickle.dumps(get_env(shell.user_ns)[name])
    except:
        return None
tasks.py 文件源码 项目:pythonwhat 作者: datacamp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def getStreamDill(name, process, shell):
    try:
        return dill.dumps(get_env(shell.user_ns)[name])
    except:
        return None
tasks.py 文件源码 项目:pythonwhat 作者: datacamp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getRepresentation(name, process):
    obj_class = getClass(name, process)
    converters = pythonwhat.State.State.root_state.converters
    if obj_class in converters:
        repres = convert(name, dill.dumps(converters[obj_class]), process)
        if (errored(repres)):
            return ReprFail("manual conversion failed")
        else: 
            return repres
    else:
        # first try to pickle
        try:
            stream = getStreamPickle(name, process)
            if not errored(stream): return pickle.loads(stream)
        except: 
            pass

        # if it failed, try to dill
        try:
            stream = getStreamDill(name, process)
            if not errored(stream): return dill.loads(stream)
            return ReprFail("dilling inside process failed for %s - write manual converter" % obj_class)
        except PicklingError:
            return ReprFail("undilling of bytestream failed with PicklingError - write manual converter")
        except Exception as e:
            return ReprFail("undilling of bytestream failed for class %s - write manual converter."
                            "Error: %s - %s" % (obj_class, type(e), e))
protocol.py 文件源码 项目:cess 作者: frnsys 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def dumps(x):
    """serialize python object(s)"""
    try:
        return dill.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
    except Exception as e:
        logger.info("Failed to serialize %s", x)
        logger.exception(e)
        raise
protocol.py 文件源码 项目:cess 作者: frnsys 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write(stream, msg):
    """write data to a stream"""
    msg = dumps(msg)
    yield stream.write(msg + sentinel)
test_persistent_queue.py 文件源码 项目:python-persistent-queue 作者: philipbl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def setup_method(self):
        import dill

        random = str(uuid.uuid4()).replace('-', '')
        filename = '{}_{}'.format(self.__class__.__name__, random)
        self.queue = PersistentQueue(filename,
                                     loads=dill.loads,
                                     dumps=dill.dumps)
test_persistent_queue.py 文件源码 项目:python-persistent-queue 作者: philipbl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setup_method(self):
        import msgpack

        random = str(uuid.uuid4()).replace('-', '')
        filename = '{}_{}'.format(self.__class__.__name__, random)
        self.queue = PersistentQueue(filename,
                                     loads=msgpack.unpackb,
                                     dumps=msgpack.packb)
serialization.py 文件源码 项目:kubeface 作者: hammerlab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def check(obj):
    if not CHECK_SERIALIZATION:
        return
    try:
        dill.loads(dill.dumps(obj))
    except Exception as e:
        logging.error(
            "Couldn't serialize: %s\n'%s'\nBad objects:\n%s" % (
                str(obj), str(e), dill.detect.badobjects(obj, depth=2)))
        raise
serialization.py 文件源码 项目:kubeface 作者: hammerlab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dumps(obj):
    check(obj)
    return dill.dumps(obj, protocol=PICKLE_PROTOCOL)
model_builder.py 文件源码 项目:plasma 作者: jnkh 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_unique_id(self):
        num_epochs = self.conf['training']['num_epochs']
        this_conf = deepcopy(self.conf)
        #don't make hash dependent on number of epochs.
        this_conf['training']['num_epochs'] = 0
        unique_id =  hash(dill.dumps(this_conf))
        return unique_id
tester.py 文件源码 项目:Flow-Guided-Feature-Aggregation 作者: msracver 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def apply_async(pool,fun,args):
    payload=dill.dumps((fun,args))
    return pool.apply_async(run_dill_encode,(payload,))
mapping.py 文件源码 项目:pyabc 作者: neuralyzer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, map=map, mapper_pickles=False):
        super().__init__()
        self.map = map
        self.pickle, self.unpickle = ((identity, identity)
                                      if mapper_pickles
                                      else (pickle.dumps, pickle.loads))
storage.py 文件源码 项目:mbot 作者: michaelkuty 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def save_state(self, state):
        """Save state"""

        with open(state.state_path, 'wb') as f:

            data = pickle.dumps(state, pickle.HIGHEST_PROTOCOL)

            if self.encrypt:
                data = self.encrypt_data(data)

            pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
storage.py 文件源码 项目:mbot 作者: michaelkuty 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def save_state(self, state):
        """Save state"""

        data = pickle.dumps(state, pickle.HIGHEST_PROTOCOL)

        if self.encrypt:
            data = self.encrypt_data(data)

        return self.bucket.put_object(Key=state.state_path,
                                      Body=data)


问题


面经


文章

微信
公众号

扫码关注公众号