def test_write_csv_to_s3_existing(generate_data):
bucket = 'test-bucket'
key = 'test.csv'
conn = boto3.resource('s3', region_name='us-west-2')
conn.create_bucket(Bucket=bucket)
utils.write_csv_to_s3(generate_data(["foo"]), bucket, key)
utils.write_csv_to_s3(generate_data(["foo", "bar"]), bucket, key)
body = (
conn
.Object(bucket, key)
.get()['Body']
.read().decode('utf-8')
)
# header + 2x row = 3
assert len(body.rstrip().split('\n')) == 3
评论列表
文章目录