def stopPipeline(args, config):
pipelineQueueUtils = PipelineQueueUtils('CANCEL_Q')
pipelineDbUtils = PipelineDbUtils(config)
if args.jobId:
jobInfo = pipelineDbUtils.getJobInfo(select=["current_status", "operation_id", "job_id"],
where={"job_id": args.jobId})
elif args.pipeline:
jobInfo = pipelineDbUtils.getJobInfo(select=["current_status", "operation_id", "job_id"],
where={"pipeline_name": args.pipeline})
elif args.tag:
jobInfo = pipelineDbUtils.getJobInfo(select=["current_status", "operation_id", "job_id"],
where={"tag": args.tag})
for j in jobInfo:
if j.current_status == "RUNNING":
msg = {
"job_id": j.job_id,
"operation_id": j.operation_id
}
pipelineQueueUtils.publish(json.dumps(msg))
评论列表
文章目录