def execQuery (tree, query, aggcode=None):
hosts = check_source (tree, query['name'])
if send_source (hosts, tree, query['name']) == False:
return []
if aggcode:
hosts = check_source (tree, aggcode['name'])
if send_source (hosts, tree, aggcode['name']) == False:
return []
req = buildReq ('execQuery', tree, query, aggcode)
resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
if resp['status'] != '200':
return []
else:
return json.loads (content, object_hook=json_util.object_hook)
python类object_hook()的实例源码
def registerQuery (filepath):
filename = os.path.basename (filepath)
try:
with open (filepath, 'r') as f:
filedata = f.read()
except EnvironmentError:
return [False]
req = {'api': 'registerQuery'}
req.update ({'name': filename})
req.update ({'data': filedata})
resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
if resp['status'] != '200':
return []
else:
return json.loads (content, object_hook=json_util.object_hook)
def execRequest (req, url):
global results
workers = []
tree = req['tree']
for child in tree['controller']['child']:
t = Thread (target = wrapper, args = (httpcmd, (child, req, url),
results))
workers.append (t)
for worker in workers:
worker.start()
for worker in workers:
worker.join()
data = []
for res in results:
resp, content = res
if resp['status'] == '200':
data += json.loads (content, object_hook=json_util.object_hook)
results = []
return json.dumps (data, default=json_util.default)
def deserialize(value):
return loads(value, object_hook=json_util.object_hook)
def installQuery (tree, query, interval):
hosts = check_source (tree, query['name'])
if send_source (hosts, tree, query['name']) == False:
return []
req = buildReq ('installQuery', tree, query, None, interval)
resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
if resp['status'] != '200':
return []
else:
return json.loads (content, object_hook=json_util.object_hook)
def uninstallQuery (tree, query):
req = buildReq ('uninstallQuery', tree, query)
resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
if resp['status'] != '200':
return []
else:
return json.loads (content, object_hook=json_util.object_hook)
def check_source (tree, filename):
req = {'api': 'check_source'}
req.update ({'tree': tree})
req.update ({'name': filename})
resp, content = r.post (controller, json.dumps (req, default=json_util.default), "pathdump")
return json.loads (content, object_hook=json_util.object_hook)
def getAggTree (groupnodes):
req = {'api': 'getAggTree'}
req.update ({'groupnodes': groupnodes})
resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
if resp['status'] != '200':
return {}
else:
return json.loads (content, object_hook=json_util.object_hook)[0]
def getFlowCollectionDir():
req = {'api': 'getFlowCollDir'}
resp, content = r.get (controller, json.dumps (req, default=json_util.default), "pathdump")
if resp['status'] != '200':
return ''
else:
return json.loads (content, object_hook=json_util.object_hook)[0]
def run (argv, coll):
flowid = argv[0]
timeRange = json.loads (json.dumps(argv[1]),
object_hook=json_util.object_hook)
if isinstance (timeRange[0], datetime.datetime):
timeRange[0].replace (tzinfo=None)
if isinstance (timeRange[1], datetime.datetime):
timeRange[1].replace (tzinfo=None)
linkID = ('*', '*')
return pdapi.getPaths (flowid, linkID, timeRange)
def load(data):
return json.loads(data, object_hook=object_hook)
# We don't want to have tzinfo in decoded json.This object_hook is
# overwritten json_util.object_hook for $date
def object_hook(dct):
if "$date" in dct:
dt = bson.json_util.object_hook(dct)
return dt.replace(tzinfo=None)
return bson.json_util.object_hook(dct)
def load_from_json(file):
"""
This function load a json database into a list of dict.
Parameters
----------
file : file handler
An already opened json file handler to the serialized job list.
Returns
-------
list
"""
job_list = list()
city_dict = collections.defaultdict(int)
for l in file.readlines():
# use dict instead of directly object, better with pandas
job = JobOfferAnon.from_json(json.loads(l, object_hook=json_util.object_hook)).to_dict()
job['city'] = sanitize_city_name(job['city'])
job['city'] = sanitize_city_name_for_geoloc(job['city'])
city_file = pkg_resources.resource_filename('sfbistats.utils', 'city_locations.csv')
dep, reg = city_to_dep_region(job['city'], city_file)
job['department'] = dep
job['region'] = reg
job['duration'] = sanitize_duration(job['duration'])
city_dict[job['city']] += 1
job_list.append(job)
job_list = spell_correct(job_list, city_dict)
return job_list
def mongo_aggregate(cls, query, pipeline, hide_deleted=True):
"""Perform mongo aggregate queries
query - is a dict which is to be passed to $match, a pipeline operator
pipeline - list of dicts or dict of mongodb pipeline operators,
http://docs.mongodb.org/manual/reference/operator/aggregation-pipeline
"""
if isinstance(query, basestring):
query = json.loads(
query, object_hook=json_util.object_hook) if query else {}
if not (isinstance(pipeline, dict) or isinstance(pipeline, list)):
raise Exception(_(u"Invalid pipeline! %s" % pipeline))
if not isinstance(query, dict):
raise Exception(_(u"Invalid query! %s" % query))
query = dict_for_mongo(query)
if hide_deleted:
# display only active elements
deleted_at_query = {
"$or": [{"_deleted_at": {"$exists": False}},
{"_deleted_at": None}]}
# join existing query with deleted_at_query on an $and
query = {"$and": [query, deleted_at_query]}
k = [{'$match': query}]
if isinstance(pipeline, list):
k.extend(pipeline)
else:
k.append(pipeline)
results = xform_instances.aggregate(k)
return results['result']
def query_mongo_minimal(
cls, query, fields, sort, start=0, limit=DEFAULT_LIMIT,
count=False, hide_deleted=True):
fields_to_select = {cls.USERFORM_ID: 0}
# TODO: give more detailed error messages to 3rd parties
# using the API when json.loads fails
query = json.loads(
query, object_hook=json_util.object_hook) if query else {}
query = dict_for_mongo(query)
if hide_deleted:
# display only active elements
# join existing query with deleted_at_query on an $and
query = {"$and": [query, {"_deleted_at": None}]}
# fields must be a string array i.e. '["name", "age"]'
fields = json.loads(
fields, object_hook=json_util.object_hook) if fields else []
# TODO: current mongo (2.0.4 of this writing)
# cant mix including and excluding fields in a single query
if type(fields) == list and len(fields) > 0:
fields_to_select = dict(
[(_encode_for_mongo(field), 1) for field in fields])
sort = json.loads(
sort, object_hook=json_util.object_hook) if sort else {}
cursor = xform_instances.find(query, fields_to_select)
if count:
return [{"count": cursor.count()}]
if start < 0 or limit < 0:
raise ValueError(_("Invalid start/limit params"))
cursor.skip(start).limit(limit)
if type(sort) == dict and len(sort) == 1:
sort_key = sort.keys()[0]
# TODO: encode sort key if it has dots
sort_dir = int(sort[sort_key]) # -1 for desc, 1 for asc
cursor.sort(_encode_for_mongo(sort_key), sort_dir)
# set batch size
cursor.batch_size = cls.DEFAULT_BATCHSIZE
return cursor
def query_mongo(username, id_string, query=None, hide_deleted=True):
print("incoming query", query)
qry = query
# query = None
# query = json.loads(query, object_hook=json_util.object_hook)\
# if query else {}
# query = dict_for_mongo(query)
# query[USERFORM_ID] = u'{0}_{1}'.format(username, id_string)
# if hide_deleted:
# query = {"$and": [query, {"_deleted_at": None}]}
# query = {"$and": [query, qry]}
print(qry)
print("cpount", xform_instances.find(qry).count())
return xform_instances.find(qry)
def _loads(content, fmt=None):
if fmt == 'toml':
return toml.loads(content)
elif fmt == 'json':
return json.loads(content, object_hook=json_util.object_hook)
elif fmt == 'python':
return ast.literal_eval(content)
elif fmt == 'pickle':
return pickle.loads(content)
else:
return content
def mongo_aggregate(cls, query, pipeline, hide_deleted=True):
"""Perform mongo aggregate queries
query - is a dict which is to be passed to $match, a pipeline operator
pipeline - list of dicts or dict of mongodb pipeline operators,
http://docs.mongodb.org/manual/reference/operator/aggregation-pipeline
"""
if isinstance(query, basestring):
query = json.loads(
query, object_hook=json_util.object_hook) if query else {}
if not (isinstance(pipeline, dict) or isinstance(pipeline, list)):
raise Exception(_(u"Invalid pipeline! %s" % pipeline))
if not isinstance(query, dict):
raise Exception(_(u"Invalid query! %s" % query))
query = dict_for_mongo(query)
if hide_deleted:
# display only active elements
deleted_at_query = {
"$or": [{"_deleted_at": {"$exists": False}},
{"_deleted_at": None}]}
# join existing query with deleted_at_query on an $and
query = {"$and": [query, deleted_at_query]}
k = [{'$match': query}]
if isinstance(pipeline, list):
k.extend(pipeline)
else:
k.append(pipeline)
results = xform_instances.aggregate(k)
return results['result']
def query_mongo_minimal(
cls, query, fields, sort, start=0, limit=DEFAULT_LIMIT,
count=False, hide_deleted=True):
fields_to_select = {cls.USERFORM_ID: 0}
# TODO: give more detailed error messages to 3rd parties
# using the API when json.loads fails
query = json.loads(
query, object_hook=json_util.object_hook) if query else {}
query = dict_for_mongo(query)
if hide_deleted:
# display only active elements
# join existing query with deleted_at_query on an $and
query = {"$and": [query, {"_deleted_at": None}]}
# fields must be a string array i.e. '["name", "age"]'
fields = json.loads(
fields, object_hook=json_util.object_hook) if fields else []
# TODO: current mongo (2.0.4 of this writing)
# cant mix including and excluding fields in a single query
if type(fields) == list and len(fields) > 0:
fields_to_select = dict(
[(_encode_for_mongo(field), 1) for field in fields])
sort = json.loads(
sort, object_hook=json_util.object_hook) if sort else {}
cursor = xform_instances.find(query, fields_to_select)
if count:
return [{"count": cursor.count()}]
if start < 0 or limit < 0:
raise ValueError(_("Invalid start/limit params"))
cursor.skip(start).limit(limit)
if type(sort) == dict and len(sort) == 1:
sort_key = sort.keys()[0]
# TODO: encode sort key if it has dots
sort_dir = int(sort[sort_key]) # -1 for desc, 1 for asc
cursor.sort(_encode_for_mongo(sort_key), sort_dir)
# set batch size
cursor.batch_size = cls.DEFAULT_BATCHSIZE
return cursor
def query_mongo(username, id_string, query=None, hide_deleted=True):
query = json.loads(query, object_hook=json_util.object_hook)\
if query else {}
query = dict_for_mongo(query)
query[USERFORM_ID] = u'{0}_{1}'.format(username, id_string)
if hide_deleted:
# display only active elements
# join existing query with deleted_at_query on an $and
query = {"$and": [query, {"_deleted_at": None}]}
return xform_instances.find(query)
def handleRequest (req):
global query_results
Tree = req['tree']
cur = helper.getCurNodeID ()
if len (Tree[cur]['child']) == 0:
return helper.handleLeafNode (req)
# From now on, the following handles when the current node is a relay node
workers = []
# 1) create a worker thread at the current node
(func, argv) = helper.getThreadArgument (True, req)
t = Thread (target = helper.wrapper, args = (func, argv, query_results))
workers.append (t)
# 2) deliver query to child nodes
for child in Tree[cur]['child']:
(func, argv) = helper.getThreadArgument (False, req, child)
# further optimization (should be implemented): construct a subtree for
# each child and pass it on to the httpcmd as argument
t = Thread (target = helper.wrapper, args = (func, argv,
query_results))
workers.append (t)
# 3) start workers
for worker in workers:
worker.start()
# 4) wait unitl workers finish -> this part might be hung forever
for worker in workers:
worker.join()
data=[]
for res in query_results:
if len(res) > 0 and type(res) == type(()) and 'content-type' in res[0]:
resp, content = res
content = json.loads (content, object_hook=json_util.object_hook)
else:
content = res
data += content
# reset variables
query_results = []
if req['api'] == 'execQuery' and 'aggcode' in req:
# 4) process collected data using AggCode
return helper.processCollectedData (req['aggcode'], data)
else:
return data
def query_mongo(cls, username, id_string, query, fields, sort, start=0,
limit=DEFAULT_LIMIT, count=False, hide_deleted=True):
fields_to_select = {cls.USERFORM_ID: 0}
# TODO: give more detailed error messages to 3rd parties
# using the API when json.loads fails
if isinstance(query, basestring):
query = json.loads(query, object_hook=json_util.object_hook)
query = query if query else {}
query = dict_for_mongo(query)
query[cls.USERFORM_ID] = u'%s_%s' % (username, id_string)
# check if query contains and _id and if its a valid ObjectID
if '_uuid' in query and ObjectId.is_valid(query['_uuid']):
query['_uuid'] = ObjectId(query['_uuid'])
if hide_deleted:
# display only active elements
# join existing query with deleted_at_query on an $and
query = {"$and": [query, {"_deleted_at": None}]}
# fields must be a string array i.e. '["name", "age"]'
if isinstance(fields, basestring):
fields = json.loads(fields, object_hook=json_util.object_hook)
fields = fields if fields else []
# TODO: current mongo (2.0.4 of this writing)
# cant mix including and excluding fields in a single query
if type(fields) == list and len(fields) > 0:
fields_to_select = dict(
[(_encode_for_mongo(field), 1) for field in fields])
if isinstance(sort, basestring):
sort = json.loads(sort, object_hook=json_util.object_hook)
sort = sort if sort else {}
cursor = xform_instances.find(query, fields_to_select)
if count:
return [{"count": cursor.count()}]
if start < 0 or limit < 0:
raise ValueError(_("Invalid start/limit params"))
cursor.skip(start).limit(limit)
if type(sort) == dict and len(sort) == 1:
sort_key = sort.keys()[0]
# TODO: encode sort key if it has dots
sort_dir = int(sort[sort_key]) # -1 for desc, 1 for asc
cursor.sort(_encode_for_mongo(sort_key), sort_dir)
# set batch size
cursor.batch_size = cls.DEFAULT_BATCHSIZE
return cursor
def query_mongo(cls, username, id_string, query, fields, sort, start=0,
limit=DEFAULT_LIMIT, count=False, hide_deleted=True):
fields_to_select = {cls.USERFORM_ID: 0}
# TODO: give more detailed error messages to 3rd parties
# using the API when json.loads fails
if isinstance(query, basestring):
query = json.loads(query, object_hook=json_util.object_hook)
query = query if query else {}
query = dict_for_mongo(query)
query[cls.USERFORM_ID] = u'%s_%s' % (username, id_string)
# check if query contains and _id and if its a valid ObjectID
if '_uuid' in query and ObjectId.is_valid(query['_uuid']):
query['_uuid'] = ObjectId(query['_uuid'])
if hide_deleted:
# display only active elements
# join existing query with deleted_at_query on an $and
query = {"$and": [query, {"_deleted_at": None}]}
# fields must be a string array i.e. '["name", "age"]'
if isinstance(fields, basestring):
fields = json.loads(fields, object_hook=json_util.object_hook)
fields = fields if fields else []
# TODO: current mongo (2.0.4 of this writing)
# cant mix including and excluding fields in a single query
if type(fields) == list and len(fields) > 0:
fields_to_select = dict(
[(_encode_for_mongo(field), 1) for field in fields])
if isinstance(sort, basestring):
sort = json.loads(sort, object_hook=json_util.object_hook)
sort = sort if sort else {}
cursor = xform_instances.find(query, fields_to_select)
if count:
return [{"count": cursor.count()}]
if start < 0 or limit < 0:
raise ValueError(_("Invalid start/limit params"))
cursor.skip(start).limit(limit)
if type(sort) == dict and len(sort) == 1:
sort_key = sort.keys()[0]
# TODO: encode sort key if it has dots
sort_dir = int(sort[sort_key]) # -1 for desc, 1 for asc
cursor.sort(_encode_for_mongo(sort_key), sort_dir)
# set batch size
cursor.batch_size = cls.DEFAULT_BATCHSIZE
return cursor