def extract_source(self, source: DataWarehouseSchema,
relations: List[RelationDescription]) -> List[RelationDescription]:
"""
For a given upstream source, iterate through given relations to extract the relations' data.
"""
self.logger.info("Extracting %d relation(s) from source '%s'", len(relations), source.name)
failed = []
with Timer() as timer:
for i, relation in enumerate(relations):
try:
def _monitored_table_extract(attempt_num):
with etl.monitor.Monitor(relation.identifier,
"extract",
options=self.options_info(),
source=self.source_info(source, relation),
destination={'bucket_name': relation.bucket_name,
'object_key': relation.manifest_file_name},
index={"current": i + 1, "final":
len(relations), "name": source.name},
dry_run=self.dry_run,
attempt_num=attempt_num + 1):
self.extract_table(source, relation)
retries = get_config_int("arthur_settings.extract_retries")
retry(retries, _monitored_table_extract, self.logger)
except ETLRuntimeError:
self.failed_sources.add(source.name)
failed.append(relation)
if not relation.is_required:
self.logger.exception("Extract failed for non-required relation '%s':", relation.identifier)
elif self.keep_going:
self.logger.exception("Ignoring failure of required relation '%s' and proceeding as requested:",
relation.identifier)
else:
self.logger.debug("Extract failed for required relation '%s'", relation.identifier)
raise
self.logger.info("Finished extract from source '%s': %d succeeded, %d failed (%s)",
source.name, len(relations) - len(failed), len(failed), timer)
return failed
评论列表
文章目录