extractor.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:arthur-redshift-etl 作者: harrystech 项目源码 文件源码
def extract_sources(self) -> None:
        """
        Iterate over sources to be extracted and parallelize extraction at the source level
        """
        self.logger.info("Starting to extract %d relation(s) in %d schema(s)", len(self.relations), len(self.schemas))
        self.failed_sources.clear()
        max_workers = len(self.schemas)

        # TODO With Python 3.6, we should pass in a thread_name_prefix
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for source_name, relation_group in groupby(self.relations, attrgetter("source_name")):
                future = executor.submit(self.extract_source, self.schemas[source_name], list(relation_group))
                futures.append(future)
            if self.keep_going:
                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
            else:
                done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION)
        if self.failed_sources:
            self.logger.error("Failed to extract from these source(s): %s", join_with_quotes(self.failed_sources))

        # Note that iterating over result of futures may raise an exception (which surfaces exceptions from threads)
        missing_tables = []  # type: List
        for future in done:
            missing_tables.extend(future.result())
        for table_name in missing_tables:
            self.logger.warning("Failed to extract: '%s'", table_name.identifier)
        if not_done:
            raise DataExtractError("Extract failed to complete for {:d} source(s)".format(len(not_done)))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号