def handleRequest (req):
global query_results
Tree = req['tree']
cur = helper.getCurNodeID ()
if len (Tree[cur]['child']) == 0:
return helper.handleLeafNode (req)
# From now on, the following handles when the current node is a relay node
workers = []
# 1) create a worker thread at the current node
(func, argv) = helper.getThreadArgument (True, req)
t = Thread (target = helper.wrapper, args = (func, argv, query_results))
workers.append (t)
# 2) deliver query to child nodes
for child in Tree[cur]['child']:
(func, argv) = helper.getThreadArgument (False, req, child)
# further optimization (should be implemented): construct a subtree for
# each child and pass it on to the httpcmd as argument
t = Thread (target = helper.wrapper, args = (func, argv,
query_results))
workers.append (t)
# 3) start workers
for worker in workers:
worker.start()
# 4) wait unitl workers finish -> this part might be hung forever
for worker in workers:
worker.join()
data=[]
for res in query_results:
if len(res) > 0 and type(res) == type(()) and 'content-type' in res[0]:
resp, content = res
content = json.loads (content, object_hook=json_util.object_hook)
else:
content = res
data += content
# reset variables
query_results = []
if req['api'] == 'execQuery' and 'aggcode' in req:
# 4) process collected data using AggCode
return helper.processCollectedData (req['aggcode'], data)
else:
return data
评论列表
文章目录