def next_job(self, job):
'''Sends a finished job back to the coordinator and retrieves in exchange the next one.
Kwargs:
job (WorkerJob): job that was finished by a worker and who's results are to be
digested by the coordinator
Returns:
WorkerJob. next job of one of the running epochs that will get
associated with the worker from the finished job and put into state 'running'
'''
if is_chief:
# Try to find the epoch the job belongs to
epoch = next((epoch for epoch in self._epochs_running if epoch.id == job.epoch_id), None)
if epoch:
# We are going to manipulate things - let's avoid undefined state
with self._lock:
# Let the epoch finish the job
epoch.finish_job(job)
# Check, if epoch is done now
if epoch.done():
# If it declares itself done, move it from 'running' to 'done' collection
self._epochs_running.remove(epoch)
self._epochs_done.append(epoch)
# Show the short and/or full WER report
log_info(epoch)
else:
# There was no running epoch found for this job - this should never happen.
log_error('There is no running epoch of id %d for job with ID %d.' % (job.epoch_id, job.id))
return self.get_job(job.worker)
# We are a remote worker and have to hand over to the chief worker by HTTP
result = self._talk_to_chief('', data=pickle.dumps(job))
if result:
result = pickle.loads(result)
return result
评论列表
文章目录