def get_boot_action(self, action_id):
"""Query for a single boot action by ID.
:param action_id: string ULID bootaction id
"""
try:
with self.db_engine.connect() as conn:
query = self.ba_status_tbl.select().where(
self.ba_status_tbl.c.action_id == ulid2.decode_ulid_base32(
action_id))
rs = conn.execute(query)
r = rs.fetchone()
if r is not None:
ba_dict = dict(r)
ba_dict['action_id'] = bytes(ba_dict['action_id'])
ba_dict['identity_key'] = bytes(ba_dict['identity_key'])
ba_dict['task_id'] = uuid.UUID(bytes=ba_dict['task_id'])
return ba_dict
else:
return None
except Exception as ex:
self.logger.error(
"Error querying boot action %s" % action_id, exc_info=ex)
评论列表
文章目录