dynamo.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:livebridge 作者: dpa-newslab 项目源码 文件源码
def insert_post(self, **kwargs):
        params = {
            "TableName": self.table_name,
            "Item": {
                "target_id": {"S": kwargs.get("target_id")},
                "post_id": {"S": str(kwargs.get("post_id"))},
                "source_id": {"S": kwargs.get("source_id")},
                "text": {"S": kwargs.get("text") or " "},
                "sticky": {"N": str(int(kwargs.get("sticky", False)))},
                "created": {"S": datetime.strftime(kwargs.get("created"), self.date_fmt)},
                "updated": {"S": datetime.strftime(kwargs.get("updated"), self.date_fmt)},
            }
        }
        # add doc at target if present
        if kwargs.get("target_doc"):
            params["Item"]["target_doc"] = {"S": json.dumps(kwargs["target_doc"])}
        db = await self.db
        try:
            response = await db.put_item(**params)
            if response.get("ResponseMetadata", {}).get("HTTPStatusCode") == 200:
                logger.info("[DB] Post {} {} was saved!".format(kwargs["source_id"], kwargs["post_id"]))
                return True
        except BotoCoreError as exc:
            logger.error("[DB] Error when saving {}".format(kwargs))
            logger.error(exc)
        return False
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号