def get_stream_writer(self, stream):
"""
Gets the database channel writer
The mongoengine model checks whether a stream_id/datetime pair already exists in the DB (unique pairs)
Should be overridden by users' personal channels - allows for non-mongo outputs.
:param stream: The stream
:return: The stream writer function
"""
def writer(document_collection):
with switch_db(SummaryInstanceModel, 'hyperstream'):
if isinstance(document_collection, StreamInstance):
document_collection = [document_collection]
for t, doc in document_collection:
instance = SummaryInstanceModel(
stream_id=stream.stream_id.as_dict(),
datetime=t,
value=doc)
try:
instance.save()
except NotUniqueError as e:
# Implies that this has already been written to the database
# Raise an error if the value differs from that in the database
logging.warn("Found duplicate document: {}".format(e.message))
existing = SummaryInstanceModel.objects(stream_id=stream.stream_id.as_dict(), datetime=t)[0]
if existing.value != doc:
raise e
except (InvalidDocumentError, InvalidDocument) as e:
# Something wrong with the document - log the error
logging.error(e)
return writer
评论列表
文章目录