def __init__(self, name, steps=[]):
self.name = name
self.actions = Speaker(
'actions',
[
'available',
'failed',
'started',
'success',
'metric',
'error',
'logs',
]
)
self.steps = [s.job_type for s in steps]
self.total_steps = len(steps)
self.context = zmq.Context()
self.sockets = SocketManager(zmq, self.context)
self.sockets.create('step-events', zmq.SUB)
self.sockets.create('jobs-in', zmq.PULL)
for step in self.steps:
self.sockets.create(step, zmq.PUSH)
for action in self.actions.actions.keys():
self.bind_action(action)
self.total_actions = len(self.actions.actions)
self.pool = gevent.pool.Pool(self.total_actions ** (self.total_steps + 1))
self.greenlets = []
self._allowed_to_run = True
self.default_interval = 0.1
self.backend = StorageBackend()
self.logger = logging.getLogger('pipeline')
评论列表
文章目录