def _cb_get_work(self, worker):
with self.lock_experiment:
experiment = rospy.get_param('/work')
disabled_workers = rospy.get_param("/experiment/disabled_workers", [])
if worker not in disabled_workers:
for task in range(len(experiment)):
for trial in range(experiment[task]['num_trials']):
status = experiment[task]['progress'][trial]['status']
resuming = status =='taken' and experiment[task]['progress'][trial]['worker'] == worker
# If this work is open or taken and the same worker is requesting some work, distribute it!
if status == 'open' or resuming:
if self.is_completed(task, trial, experiment):
experiment[task]['progress'][trial]['status'] = 'complete'
self.num_workers -= 1
rospy.set_param('/work', experiment)
else:
# This task needs work, distribute it to the worker
experiment[task]['progress'][trial]['status'] = 'taken'
experiment[task]['progress'][trial]['worker'] = worker
rospy.set_param('/work', experiment)
self.num_workers += 1
if resuming:
rospy.logwarn("Resuming worker {} {} from iteration {}/{}".format(worker, experiment[task]['method'],
experiment[task]['progress'][trial]['iteration'],
experiment[task]['num_iterations']))
else:
rospy.logwarn("Distributing {} iterations {} trial {} to worker {}".format(experiment[task]['num_iterations'], experiment[task]['method'], trial, worker))
return dict(method=experiment[task]['method'],
iteration=experiment[task]['progress'][trial]['iteration'],
num_iterations=experiment[task]['num_iterations'],
task=task, trial=trial, work_available=True)
else:
pass
#rospy.logwarn("Worker {} requested work but it is blacklisted".format(worker))
return dict(work_available=False)
评论列表
文章目录