test_aws.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:donatemates 作者: donatemates 项目源码 文件源码
def test_scan_table(self):
        """Method to test scanning a table"""

        def scan_func(items, input_val):
            for item in items:
                if item["campaign_status"]["S"] == "complete":
                    input_val['count'] += 1
            return input_val

        campaign_table = DynamoTable('campaigns')

        # Add a record
        for idx in range(0, 10):
            data = {"campaign_id": "my_campaign_{}".format(idx),
                    "notified_on": arrow.utcnow().isoformat(),
                    "campaign_status": "complete"
                    }
            campaign_table.put_item(data)

        # Scan table
        result = {"count": 0}
        campaign_table.scan_table(scan_func, result, "campaign_status")

        self.assertEqual(result["count"], 10)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号