def send(self, res):
self.buffer.append(res)
# buffer is full, write to disk
if len(self.buffer) >= self.chunk_size:
chunk = self.buffer[:self.chunk_size]
self.buffer = self.buffer[self.chunk_size:]
log_path = os.path.join(self.out_dir, "%s-scan.json" % (datetime.datetime.utcnow().isoformat()))
def write():
wf = open(log_path, "w")
try:
json.dump(chunk, wf, sort_keys=True)
finally:
wf.close()
r = threads.deferToThread(write).chainDeferred(self.current_task)
self.current_task = None
return r
# buffer is not full, return deferred for current batch
if not self.current_task or self.current_task.called:
self.current_task = defer.Deferred()
return self.current_task
评论列表
文章目录