def graph_filter(self, g_id, dbid, head, conn):
body = pickle.loads(cherrypy.request.body.read())
filters = body['filter']
if 'nodes' in filters and filters['nodes'] is not None:
nf_name = str(uuid4())
nfn = node_property_map(g_id, nf_name, 'bool', filters['nodes'], conn)['property_map']
nf = property_maps[g_id][nfn]
else:
nf = None
if 'links' in filters and filters['links'] is not None:
lf_name = str(uuid4())
lfn = link_property_map(g_id, lf_name, 'bool', filters['links'], conn)['property_map']
lf = property_maps[g_id][lfn]
else:
lf = None
if 'directed' in filters and filters['directed'] is not None:
directed = filters['directed']
else:
directed = None
if 'reversed' in filters and filters['reversed'] is not None:
rev = filters['reversed']
else:
rev = False
if 'filter_id' in filters and filters['filter_id'] is not None:
g2_id = filters['filter_id']
else:
g2_id = str(uuid4()).replace('-', '_')
g2 = gt.GraphView(graphs[g_id], vfilt=nf, efilt=lf, directed=directed, reversed=rev)
graphs[g2_id] = g2
prep_pm(g2_id)
for pm in property_maps[g_id]:
property_maps[g2_id][pm] = property_maps[g_id][pm]
for nda in ndarrays[g_id]:
ndarrays[g2_id][nda] = ndarrays[g_id][nda]
return json.dumps({'subgraph': g2_id})
python类loads()的实例源码
def fields(self, g_id, dbid, head, conn):
obj_type = json.loads(head['params'])['type']
if obj_type in acceptable_types:
fields = auto_reql(r.db(dbid).table(obj_type).map(lambda n: n.keys()).reduce(
lambda x, y: r.expr(x + y).distinct()), conn)
return json.dumps(fields)
def convert(name, converter, process, shell):
return dill.loads(converter)(get_env(shell.user_ns)[name])
def getRepresentation(name, process):
obj_class = getClass(name, process)
converters = pythonwhat.State.State.root_state.converters
if obj_class in converters:
repres = convert(name, dill.dumps(converters[obj_class]), process)
if (errored(repres)):
return ReprFail("manual conversion failed")
else:
return repres
else:
# first try to pickle
try:
stream = getStreamPickle(name, process)
if not errored(stream): return pickle.loads(stream)
except:
pass
# if it failed, try to dill
try:
stream = getStreamDill(name, process)
if not errored(stream): return dill.loads(stream)
return ReprFail("dilling inside process failed for %s - write manual converter" % obj_class)
except PicklingError:
return ReprFail("undilling of bytestream failed with PicklingError - write manual converter")
except Exception as e:
return ReprFail("undilling of bytestream failed for class %s - write manual converter."
"Error: %s - %s" % (obj_class, type(e), e))
def loads(x):
"""deserialize python object(s)"""
try:
return dill.loads(x)
except Exception as e:
logger.exception(e)
raise
def read(stream):
"""read data from a stream"""
# hack for `stream.readuntil`
buffer = b''
while True:
buffer += yield from stream.readexactly(1)
if buffer.endswith(sentinel):
break
msg = buffer[:-len(sentinel)]
msg = loads(msg)
return msg
test_persistent_queue.py 文件源码
项目:python-persistent-queue
作者: philipbl
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def setup_method(self):
import dill
random = str(uuid.uuid4()).replace('-', '')
filename = '{}_{}'.format(self.__class__.__name__, random)
self.queue = PersistentQueue(filename,
loads=dill.loads,
dumps=dill.dumps)
test_persistent_queue.py 文件源码
项目:python-persistent-queue
作者: philipbl
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def setup_method(self):
import msgpack
random = str(uuid.uuid4()).replace('-', '')
filename = '{}_{}'.format(self.__class__.__name__, random)
self.queue = PersistentQueue(filename,
loads=msgpack.unpackb,
dumps=msgpack.packb)
def check(obj):
if not CHECK_SERIALIZATION:
return
try:
dill.loads(dill.dumps(obj))
except Exception as e:
logging.error(
"Couldn't serialize: %s\n'%s'\nBad objects:\n%s" % (
str(obj), str(e), dill.detect.badobjects(obj, depth=2)))
raise
def loads(s):
return dill.loads(s)
def run_dill_encode(payload):
fun,args=dill.loads(payload)
return fun(*args)
def __init__(self, map=map, mapper_pickles=False):
super().__init__()
self.map = map
self.pickle, self.unpickle = ((identity, identity)
if mapper_pickles
else (pickle.dumps, pickle.loads))
def import_object(obj):
import dill as pickle
import base64
# if obj is None:
# obj = sys.stdin.read().strip().encode('utf-8')
if obj is str:
obj = obj.strip().encode('utf-8')
return pickle.loads(gzip.zlib.decompress(base64.b64decode(obj)))
def _run_pickled(pickled):
function, item = dill.loads(pickled)
return function(item)
def safe_load(s):
try:
o = dill.loads(s)
return o
except:
return None
# return functions that execute as expected on example inputs
def loader(pkl,args):
f = dill.loads(pkl)
return f(*args)
def exceptions(self):
bugs = self.meta.cache.id2bugs(self.__meta_id__)
out = []
for b in bugs:
b["args"] = dill.loads(b["args"])
b["exception"] = dill.loads(b["exception"])
out.append(b)
return out
def loads(clz, dump_str: str):
"""??????
Parameters:
dump_str (str): - ????????
Returns:
clz: - ???????????????
"""
return dill.loads(base64.b64decode(dump_str))
def _loads(data):
"""
Decompress and deserialize.
"""
return dill.loads(zlib.decompress(data))
def promise_then_job_fn_job(job, promise, *args, **kwargs):
"""
Toil job that runs a promise created with a then_job_fn handler.
Takes the promise, and the arguments to forward along to the handler, the
last of which is the (result, error) pair from the last promise which gets
processed to just a result.
Returns the promise's success result and error, as a pair.
"""
# The pickled handler in this case takes a bunch of arguments: the Toil job,
# and the success result from the last promise, and then any other arguments
# or kwargs that the user wanted to pass along.
then_handler = dill.loads(promise.then_dill)
# Pull out the results from the last promise
resolved, rejected = args[-1]
args = args[:-1]
if rejected is None:
# Actually run this child promise
# Stick the resolved value on
args = list(args) + [resolved]
try:
# Get the result from the then handler and resolve with it
result = then_handler(job, *args, **kwargs)
promise.handle_resolve(job, result)
except Exception as e:
# Reject with an error if there is one
Logger.error("".join(traceback.format_exception(*sys.exc_info())))
promise.handle_reject(job, e)
else:
# Parent promise rejected so we should not run
# Bubble up the error
promise.handle_reject(job, rejected)
return (promise.result, promise.err)