def test_scan_table(self):
"""Method to test scanning a table"""
def scan_func(items, input_val):
for item in items:
if item["campaign_status"]["S"] == "complete":
input_val['count'] += 1
return input_val
campaign_table = DynamoTable('campaigns')
# Add a record
for idx in range(0, 10):
data = {"campaign_id": "my_campaign_{}".format(idx),
"notified_on": arrow.utcnow().isoformat(),
"campaign_status": "complete"
}
campaign_table.put_item(data)
# Scan table
result = {"count": 0}
campaign_table.scan_table(scan_func, result, "campaign_status")
self.assertEqual(result["count"], 10)
评论列表
文章目录