def distributed_write(node=None):
'''
'''
data = request.get_json()
if current_app.debug:
current_app.logger.debug(json.dumps(data, indent=2))
queries = data.get('queries', {})
statuses = data.get('statuses', {})
for guid, results in queries.items():
task = DistributedQueryTask.query.filter(
DistributedQueryTask.guid == guid,
DistributedQueryTask.status == DistributedQueryTask.PENDING,
DistributedQueryTask.node == node,
).first()
if not task:
current_app.logger.error(
"%s - Got result for distributed query not in PENDING "
"state: %s: %s",
request.remote_addr, guid, json.dumps(data)
)
continue
# non-zero status indicates sqlite errors
if not statuses.get(guid, 0):
status = DistributedQueryTask.COMPLETE
else:
current_app.logger.error(
"%s - Got non-zero status code (%d) on distributed query %s",
request.remote_addr, statuses.get(guid), guid
)
status = DistributedQueryTask.FAILED
for columns in results:
result = DistributedQueryResult(
columns,
distributed_query=task.distributed_query,
distributed_query_task=task
)
db.session.add(result)
else:
task.status = status
db.session.add(task)
else:
# need to write last_checkin, last_ip on node
db.session.add(node)
db.session.commit()
return jsonify(node_invalid=False)
评论列表
文章目录