def _step(self, exc=None):
"""
Wrapper around `Task._step()` to automatically dispatch a
`TaskExecState.BEGIN` event.
"""
if not self._in_progress:
self._start = datetime.now(timezone.utc)
source = {'task_exec_id': self.uid}
if self._template:
source['task_template_id'] = self._template.uid
if self._workflow:
source['workflow_template_id'] = self._workflow.template.uid
source['workflow_exec_id'] = self._workflow.uid
self._source = EventSource(**source)
self._in_progress = True
data = {
'type': TaskExecState.BEGIN.value,
'content': self._inputs
}
self._broker.dispatch(
data,
topics=workflow_exec_topics(self._source._workflow_exec_id),
source=self._source,
)
super()._step(exc)
评论列表
文章目录