def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'learning.json')) as f:
self.params = json.load(f)
self.translator = EnvironmentTranslator()
self.learning = None
# User control and critical resources
self.lock_iteration = RLock()
self.ready_for_interaction = True
self.focus = None
self.set_iteration = -1
# Saved experiment files
self.dir = join(self.rospack.get_path('apex_playground'), 'logs')
if not os.path.isdir(self.dir):
os.makedirs(self.dir)
# self.stamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# self.source_file = join(self.dir, self.source_name + '.pickle')
self.main_experiment = True
self.condition = ""
self.trial = ""
self.task = ""
# Serving these services
self.service_name_perceive = "learning/perceive"
self.service_name_produce = "learning/produce"
self.service_name_set_interest = "learning/set_interest"
self.service_name_set_iteration = "learning/set_iteration"
self.service_name_interests = "learning/get_interests"
# Publishing these topics
self.pub_focus = rospy.Publisher('learning/current_focus', String, queue_size=1, latch=True)
self.pub_user_focus = rospy.Publisher('learning/user_focus', String, queue_size=1, latch=True)
self.pub_ready = rospy.Publisher('learning/ready_for_interaction', Bool, queue_size=1, latch=True)
self.pub_iteration = rospy.Publisher('iteration', UInt32, queue_size=1, latch=True)
# Using these services
self.service_name_get_perception = "perception/get"
for service in [self.service_name_get_perception]:
rospy.loginfo("Learning node is waiting service {}...".format(service))
rospy.wait_for_service(service)
self.get_state = rospy.ServiceProxy(self.service_name_get_perception, GetSensorialState)
评论列表
文章目录