def extract_sources(self) -> None:
"""
Iterate over sources to be extracted and parallelize extraction at the source level
"""
self.logger.info("Starting to extract %d relation(s) in %d schema(s)", len(self.relations), len(self.schemas))
self.failed_sources.clear()
max_workers = len(self.schemas)
# TODO With Python 3.6, we should pass in a thread_name_prefix
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for source_name, relation_group in groupby(self.relations, attrgetter("source_name")):
future = executor.submit(self.extract_source, self.schemas[source_name], list(relation_group))
futures.append(future)
if self.keep_going:
done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
else:
done, not_done = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_EXCEPTION)
if self.failed_sources:
self.logger.error("Failed to extract from these source(s): %s", join_with_quotes(self.failed_sources))
# Note that iterating over result of futures may raise an exception (which surfaces exceptions from threads)
missing_tables = [] # type: List
for future in done:
missing_tables.extend(future.result())
for table_name in missing_tables:
self.logger.warning("Failed to extract: '%s'", table_name.identifier)
if not_done:
raise DataExtractError("Extract failed to complete for {:d} source(s)".format(len(not_done)))
评论列表
文章目录