def compress(workload_state):
return zlib.compress(pickle.dumps(workload_state))
python类dumps()的实例源码
test_threading.py 文件源码
项目:sudkamp-langs-machines-python
作者: thundergolfer
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_pickle(self):
import sys
if sys.version_info < (3, 4):
import dill as pickle
else:
import pickle
# go to non initial state B
self.stuff.to_B()
# pickle Stuff model
dump = pickle.dumps(self.stuff)
self.assertIsNotNone(dump)
stuff2 = pickle.loads(dump)
self.assertTrue(stuff2.machine.is_state("B"))
# check if machines of stuff and stuff2 are truly separated
stuff2.to_A()
self.stuff.to_C()
self.assertTrue(stuff2.machine.is_state("A"))
thread = Thread(target=stuff2.forward)
thread.start()
# give thread some time to start
time.sleep(0.01)
# both objects should be in different states
# and also not share locks
begin = time.time()
# stuff should not be locked and execute fast
self.assertTrue(self.stuff.machine.is_state("C"))
fast = time.time()
# stuff2 should be locked and take about 1 second
# to be executed
self.assertTrue(stuff2.machine.is_state("B"))
blocked = time.time()
self.assertAlmostEqual(fast-begin, 0, delta=0.1)
self.assertAlmostEqual(blocked-begin, 1, delta=0.1)
# Same as TestLockedTransition but with LockedHierarchicalMachine
test_nesting.py 文件源码
项目:sudkamp-langs-machines-python
作者: thundergolfer
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_pickle(self):
import sys
if sys.version_info < (3, 4):
import dill as pickle
else:
import pickle
states = ['A', 'B', {'name': 'C', 'children': ['1', '2', {'name': '3', 'children': ['a', 'b', 'c']}]},
'D', 'E', 'F']
transitions = [
{'trigger': 'walk', 'source': 'A', 'dest': 'B'},
{'trigger': 'run', 'source': 'B', 'dest': 'C'},
{'trigger': 'sprint', 'source': 'C', 'dest': 'D'}
]
m = self.stuff.machine_cls(states=states, transitions=transitions, initial='A')
m.walk()
dump = pickle.dumps(m)
self.assertIsNotNone(dump)
m2 = pickle.loads(dump)
self.assertEqual(m.state, m2.state)
m2.run()
if State.separator in '_':
m2.to_C_3_a()
m2.to_C_3_b()
else:
m2.to_C.s3.a()
m2.to_C.s3.b()
def test_enqueue_job(producer, logger):
producer_cls, producer_inst = producer
queue = Queue(hosts='host:7000', topic='foo', timeout=300)
old_job = Job(
id='2938401',
timestamp=int(time.time()),
topic='bar',
func=failure_func,
args=[1, 2],
kwargs={'a': 3},
timeout=100,
)
new_job = queue.enqueue(old_job)
assert isinstance(new_job, Job)
assert isinstance(new_job.id, str)
assert isinstance(new_job.timestamp, int)
assert old_job.id != new_job.id
assert old_job.timestamp <= new_job.timestamp
assert new_job.topic == 'foo'
assert new_job.func == failure_func
assert new_job.args == [1, 2]
assert new_job.kwargs == {'a': 3}
assert new_job.timeout == 300
assert new_job.key is None
producer_inst.send.assert_called_with(
'foo', dill.dumps(new_job), key=None
)
logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))
def test_enqueue_job_with_key(producer, logger):
producer_cls, producer_inst = producer
queue = Queue(hosts='host:7000', topic='foo', timeout=300)
old_job = Job(
id='2938401',
timestamp=int(time.time()),
topic='bar',
func=failure_func,
args=[1, 2],
kwargs={'a': 3},
timeout=100,
key='bar',
)
new_job = queue.enqueue_with_key('baz', old_job)
assert isinstance(new_job, Job)
assert isinstance(new_job.id, str)
assert isinstance(new_job.timestamp, int)
assert old_job.id != new_job.id
assert old_job.timestamp <= new_job.timestamp
assert new_job.topic == 'foo'
assert new_job.func == failure_func
assert new_job.args == [1, 2]
assert new_job.kwargs == {'a': 3}
assert new_job.timeout == 300
assert new_job.key == 'baz'
producer_inst.send.assert_called_with(
'foo', dill.dumps(new_job), key='baz'
)
logger.info.assert_called_once_with('Enqueued: {}'.format(new_job))
def run_isolated(f, *args):
"""Execute `f(args)` in an isolated environment.
First, uses dill to serialize the function. Unfortunately, pickle is unable
to serialize some functions, so we must serialize and deserialize the
function ourselves.
"""
f_dill = dill.dumps(f)
with Pool(1) as pool:
return pool.apply(_get_function_and_execute, (f_dill, *args))
def getStreamPickle(name, process, shell):
try:
return pickle.dumps(get_env(shell.user_ns)[name])
except:
return None
def getStreamDill(name, process, shell):
try:
return dill.dumps(get_env(shell.user_ns)[name])
except:
return None
def getRepresentation(name, process):
obj_class = getClass(name, process)
converters = pythonwhat.State.State.root_state.converters
if obj_class in converters:
repres = convert(name, dill.dumps(converters[obj_class]), process)
if (errored(repres)):
return ReprFail("manual conversion failed")
else:
return repres
else:
# first try to pickle
try:
stream = getStreamPickle(name, process)
if not errored(stream): return pickle.loads(stream)
except:
pass
# if it failed, try to dill
try:
stream = getStreamDill(name, process)
if not errored(stream): return dill.loads(stream)
return ReprFail("dilling inside process failed for %s - write manual converter" % obj_class)
except PicklingError:
return ReprFail("undilling of bytestream failed with PicklingError - write manual converter")
except Exception as e:
return ReprFail("undilling of bytestream failed for class %s - write manual converter."
"Error: %s - %s" % (obj_class, type(e), e))
def dumps(x):
"""serialize python object(s)"""
try:
return dill.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
except Exception as e:
logger.info("Failed to serialize %s", x)
logger.exception(e)
raise
def write(stream, msg):
"""write data to a stream"""
msg = dumps(msg)
yield stream.write(msg + sentinel)
test_persistent_queue.py 文件源码
项目:python-persistent-queue
作者: philipbl
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def setup_method(self):
import dill
random = str(uuid.uuid4()).replace('-', '')
filename = '{}_{}'.format(self.__class__.__name__, random)
self.queue = PersistentQueue(filename,
loads=dill.loads,
dumps=dill.dumps)
test_persistent_queue.py 文件源码
项目:python-persistent-queue
作者: philipbl
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def setup_method(self):
import msgpack
random = str(uuid.uuid4()).replace('-', '')
filename = '{}_{}'.format(self.__class__.__name__, random)
self.queue = PersistentQueue(filename,
loads=msgpack.unpackb,
dumps=msgpack.packb)
def check(obj):
if not CHECK_SERIALIZATION:
return
try:
dill.loads(dill.dumps(obj))
except Exception as e:
logging.error(
"Couldn't serialize: %s\n'%s'\nBad objects:\n%s" % (
str(obj), str(e), dill.detect.badobjects(obj, depth=2)))
raise
def dumps(obj):
check(obj)
return dill.dumps(obj, protocol=PICKLE_PROTOCOL)
def get_unique_id(self):
num_epochs = self.conf['training']['num_epochs']
this_conf = deepcopy(self.conf)
#don't make hash dependent on number of epochs.
this_conf['training']['num_epochs'] = 0
unique_id = hash(dill.dumps(this_conf))
return unique_id
def apply_async(pool,fun,args):
payload=dill.dumps((fun,args))
return pool.apply_async(run_dill_encode,(payload,))
def __init__(self, map=map, mapper_pickles=False):
super().__init__()
self.map = map
self.pickle, self.unpickle = ((identity, identity)
if mapper_pickles
else (pickle.dumps, pickle.loads))
def save_state(self, state):
"""Save state"""
with open(state.state_path, 'wb') as f:
data = pickle.dumps(state, pickle.HIGHEST_PROTOCOL)
if self.encrypt:
data = self.encrypt_data(data)
pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
def save_state(self, state):
"""Save state"""
data = pickle.dumps(state, pickle.HIGHEST_PROTOCOL)
if self.encrypt:
data = self.encrypt_data(data)
return self.bucket.put_object(Key=state.state_path,
Body=data)