summary_channel.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:SPHERE-HyperStream 作者: IRC-SPHERE 项目源码 文件源码
def get_stream_writer(self, stream):
        """
        Gets the database channel writer
        The mongoengine model checks whether a stream_id/datetime pair already exists in the DB (unique pairs)
        Should be overridden by users' personal channels - allows for non-mongo outputs.
        :param stream: The stream
        :return: The stream writer function
        """

        def writer(document_collection):
            with switch_db(SummaryInstanceModel, 'hyperstream'):
                if isinstance(document_collection, StreamInstance):
                    document_collection = [document_collection]

                for t, doc in document_collection:
                    instance = SummaryInstanceModel(
                        stream_id=stream.stream_id.as_dict(),
                        datetime=t,
                        value=doc)
                    try:
                        instance.save()
                    except NotUniqueError as e:
                        # Implies that this has already been written to the database
                        # Raise an error if the value differs from that in the database
                        logging.warn("Found duplicate document: {}".format(e.message))
                        existing = SummaryInstanceModel.objects(stream_id=stream.stream_id.as_dict(), datetime=t)[0]
                        if existing.value != doc:
                            raise e
                    except (InvalidDocumentError, InvalidDocument) as e:
                        # Something wrong with the document - log the error
                        logging.error(e)
        return writer
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号