def test_main(bucket, crypto, processor):
_p = functools.partial(_payload, crypto)
today = date.today()
user = 'foo'
prefix = 'v2/sessions/%s/%s/%s/' % (today.year, today.month, user)
records = [
make_record('2', _p({'user': user, 'extra': 1})),
]
assert main(processor, records) == ('2', 0)
objs = list(bucket.filter(Prefix=prefix))
assert len(objs) == 1
assert objs[0].key.endswith('.json.gz')
obj = bucket.get(objs[0].key)
assert obj['ContentEncoding'] == 'gzip'
assert obj['ContentType'] == 'application/json'
body = obj['Body'].read()
obj['Body'].close()
body = json.loads(gzip.decompress(body).decode('utf-8'))
assert body == {'user': user, 'extra': 1}
# Upload a second time
records = [
make_record('3', _p({'user': user, 'extra': 2})),
]
assert main(processor, records) == ('3', 0)
objs = list(bucket.filter(Prefix=prefix))
assert len(objs) == 2
评论列表
文章目录