extractor.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:arthur-redshift-etl 作者: harrystech 项目源码 文件源码
def extract_source(self, source: DataWarehouseSchema,
                       relations: List[RelationDescription]) -> List[RelationDescription]:
        """
        For a given upstream source, iterate through given relations to extract the relations' data.
        """
        self.logger.info("Extracting %d relation(s) from source '%s'", len(relations), source.name)
        failed = []

        with Timer() as timer:
            for i, relation in enumerate(relations):
                try:
                    def _monitored_table_extract(attempt_num):
                        with etl.monitor.Monitor(relation.identifier,
                                                 "extract",
                                                 options=self.options_info(),
                                                 source=self.source_info(source, relation),
                                                 destination={'bucket_name': relation.bucket_name,
                                                              'object_key': relation.manifest_file_name},
                                                 index={"current": i + 1, "final":
                                                        len(relations), "name": source.name},
                                                 dry_run=self.dry_run,
                                                 attempt_num=attempt_num + 1):
                                self.extract_table(source, relation)

                    retries = get_config_int("arthur_settings.extract_retries")
                    retry(retries, _monitored_table_extract, self.logger)

                except ETLRuntimeError:
                    self.failed_sources.add(source.name)
                    failed.append(relation)
                    if not relation.is_required:
                        self.logger.exception("Extract failed for non-required relation '%s':", relation.identifier)
                    elif self.keep_going:
                        self.logger.exception("Ignoring failure of required relation '%s' and proceeding as requested:",
                                              relation.identifier)
                    else:
                        self.logger.debug("Extract failed for required relation '%s'", relation.identifier)
                        raise
            self.logger.info("Finished extract from source '%s': %d succeeded, %d failed (%s)",
                             source.name, len(relations) - len(failed), len(failed), timer)
        return failed
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号