def retrieve(self, flow_name, task_name, task_id):
if not self.is_connected():
self.connect()
try:
record = PostgresBase.session.query(self.query_table).\
filter_by(worker_id=task_id).\
one()
except (NoResultFound, MultipleResultsFound):
raise
except SQLAlchemyError:
PostgresBase.session.rollback()
raise
assert record.worker == task_name
task_result = record.task_result
if not self.is_real_task_result(task_result):
# we synced results to S3, retrieve them from there
# We do not care about some specific version, so no time-based collisions possible
return self.s3.retrieve_task_result(
record.ecosystem.name,
record.package.name,
record.version.identifier,
task_name
)
return task_result
postgres_base.py 文件源码
python
阅读 22
收藏 0
点赞 0
评论 0
评论列表
文章目录