test_s3transfer.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:s3transfer 作者: boto 项目源码 文件源码
def test_callback_called_once_with_sigv4(self):
        # Verify #98, where the callback was being invoked
        # twice when using signature version 4.
        self.amount_seen = 0
        lock = threading.Lock()

        def progress_callback(amount):
            with lock:
                self.amount_seen += amount

        client = self.session.create_client(
            's3', self.region,
            config=Config(signature_version='s3v4'))
        transfer = s3transfer.S3Transfer(client)
        filename = self.files.create_file_with_size(
            '10mb.txt', filesize=10 * 1024 * 1024)
        transfer.upload_file(filename, self.bucket_name,
                             '10mb.txt', callback=progress_callback)
        self.addCleanup(self.delete_object, '10mb.txt')

        self.assertEqual(self.amount_seen, 10 * 1024 * 1024)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号