sync.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:arthur-redshift-etl 作者: harrystech 项目源码 文件源码
def sync_with_s3(relations: List[RelationDescription], bucket_name: str, prefix: str, dry_run: bool=False) -> None:
    """
    Copy (validated) table design and SQL files from local directory to S3 bucket.
    """
    logger.info("Validating %d table design(s) before upload", len(relations))
    RelationDescription.load_in_parallel(relations)

    files = []  # typing: List[Tuple[str, str]]
    for relation in relations:
        relation_files = [relation.design_file_name]
        if relation.is_transformation:
            if relation.sql_file_name:
                relation_files.append(relation.sql_file_name)
            else:
                raise MissingQueryError("Missing matching SQL file for '%s'" % relation.design_file_name)
        for file_name in relation_files:
            local_filename = relation.norm_path(file_name)
            remote_filename = os.path.join(prefix, local_filename)
            files.append((local_filename, remote_filename))

    uploader = etl.s3.S3Uploader(bucket_name, dry_run=dry_run)
    with Timer() as timer:
        futures = []  # typing: List[concurrent.futures.Future]
        # TODO With Python 3.6, we should pass in a thread_name_prefix
        with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
            for local_filename, remote_filename in files:
                futures.append(executor.submit(uploader.__call__, local_filename, remote_filename))
        errors = 0
        for future in concurrent.futures.as_completed(futures):
            exception = future.exception()
            if exception is not None:
                logger.error("Failed to upload file: %s", exception)
                errors += 1
    if not dry_run:
        logger.info("Uploaded %d of %d file(s) to 's3://%s/%s (%s)",
                    len(files) - errors, len(files), bucket_name, prefix, timer)
    if errors:
        raise ETLRuntimeError("There were {:d} error(s) during upload".format(errors))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号