def to_parameters(self):
"""
Fetches a calibration from ROS parameters in the namespace of the current node into this object.
:rtype: None
"""
calib_dict = self.to_dict()
root_params = ['eye_on_hand', 'tracking_base_frame']
if calib_dict['eye_on_hand']:
root_params.append('robot_effector_frame')
else:
root_params.append('robot_base_frame')
for rp in root_params:
rospy.set_param(rp, calib_dict[rp])
transf_params = 'x', 'y', 'z', 'qx', 'qy', 'qz', 'qw'
for tp in transf_params:
rospy.set_param('transformation/'+tp, calib_dict['transformation'][tp])
python类set_param()的实例源码
def _copy_to_parameter_server(self):
for parents, param in self._extract_param_tree(self.type.config_description):
rospy.set_param(self._get_param_name(parents, param['name']), self.config[self._get_config_name(parents,param['name'])])
def update_dr_param(self, section, name, value):
#print 'set %s:%s to %s' % (section, name, value)
#print '~dynamic_reconfigure/piksi__%s__%s' % (section, name), value
rospy.set_param('~dynamic_reconfigure/piksi__%s__%s' % (section, name), value)
def callback_sbp_settings_read_by_index_resp(self, msg, **metadata):
p = msg.setting.split('\0')
try:
i = self.piksi_settings_info[p[0]][p[1]]
p[2] = i['parser'](p[2])
except:
pass
rospy.set_param('~piksi_original_settings/%s/%s' % (p[0],p[1]), p[2])
self.piksi_settings[p[0]][p[1]] = p[2]
self.update_dr_param(p[0], p[1], p[2])
self.settings_index += 1
self.piksi_framer(MsgSettingsReadByIndexReq(index=self.settings_index))
def emit(n, v):
print datetime.datetime.now().strftime('%c'), v
rospy.set_param(o.param, v)
def set_recipe(self, recipe, start_time):
"""
Set the currently running recipe... this is the CouchDB recipe document.
"""
with self.lock:
if self.__recipe is not None:
raise RecipeRunningError("Recipe is already running")
# Set recipe and time
self.__recipe = recipe
self.__start_time = start_time
if self.__start_time is None:
self.__start_time = rospy.get_time()
rospy.set_param(params.CURRENT_RECIPE, recipe["_id"])
rospy.set_param(params.CURRENT_RECIPE_START, self.__start_time )
return self
def clear_recipe(self):
with self.lock:
if self.__recipe is None:
raise RecipeIdleError("No recipe is running")
# Clear recipe and time
rospy.set_param(params.CURRENT_RECIPE, "")
rospy.set_param(params.CURRENT_RECIPE_START, 0)
self.__recipe = None
self.__start_time = None
return self
def register_services(self):
"""Register services for instance"""
rospy.Service(services.START_RECIPE, StartRecipe,
self.start_recipe_service)
rospy.Service(services.STOP_RECIPE, Empty, self.stop_recipe_service)
rospy.set_param(
params.SUPPORTED_RECIPE_FORMATS,
','.join(RECIPE_INTERPRETERS.keys())
)
return self
def set_param(param_name):
"""
POST /api/<version>/param/<param_name> {"value": "x"}
POST to the ROS Parameter Server. Value should be in the value field
of the request body.
"""
if not "value" in request.values:
return error("No value supplied")
rospy.set_param(param_name, request.values["value"])
return "", 204
def setParam(self, name, value):
rospy.set_param(self.prefix + "/" + name, value)
self.updateParamsService(0, [name])
def setParam(self, name, value, group = 0):
rospy.set_param("/cfgroup" + str(group) + "/" + name, value)
self.updateParamsService(group, [name])
def set_param(self, param_name, val):
# Set private var value
setattr(self, '_{}'.format(param_name), val)
if val == None:
full_param_name = '/needybot/blackboard/{}'.format(param_name)
if rospy.has_param(full_param_name):
rospy.delete_param(full_param_name)
self.publish("delete_{}".format(param_name))
else:
rospy.set_param('/needybot/blackboard/{}'.format(param_name), val)
self.publish("set_{}".format(param_name))
def priority_task(self, val):
self.set_param('priority_task', val)
def sequence_tasks(self, val):
self.set_param('sequence_tasks', val)
def simulate(self, val):
self.set_param('simulate', val)
def step_success(self, val):
self.set_param('step_success', val)
def readCurrentCoords():
cc = uarm.get_position()
print 'Current location is x: %2.2fcm, y: %2.2fcm, z: %2.2fcm.' %(float(cc[0]), float(cc[1]), float(cc[2]))
rospy.set_param('current_x', cc[0])
rospy.set_param('current_y', float(cc[1]))
rospy.set_param('current_z', float(cc[2]))
# Read current Angles function
def readCurrentAngles():
ra = {}
ra['s1'] = uarm.get_servo_angle(0)
ra['s2'] = uarm.get_servo_angle(1)
ra['s3'] = uarm.get_servo_angle(2)
ra['s4'] = uarm.get_servo_angle(3)
print 'Four Servo Angles: %2.2f, %2.2f, %2.2f and %2.2f degrees.' %(ra['s1'], ra['s2'],ra['s3'],ra['s4'])
rospy.set_param('servo_1',ra['s1'])
rospy.set_param('servo_2',ra['s2'])
rospy.set_param('servo_3',ra['s3'])
rospy.set_param('servo_4',ra['s4'])
# Read stopper function
def readStopperStatus():
val = uarm.get_tip_sensor()
if val == True:
print 'Stopper is actived'
rospy.set_param('stopper_status','HIGH')
else:
print 'Stopper is not actived'
rospy.set_param('stopper_status','LOW')
# Write single angle
def __init__(self, outside_ros=False):
self.rospack = RosPack()
self.num_workers = 0
self.outside_ros = rospy.get_param('/use_sim_time', outside_ros) # True if work manager <-> controller comm must use ZMQ
self.file_experiment = join(self.rospack.get_path('apex_playground'), 'config', 'experiment.json')
with open(self.file_experiment) as f:
experiment = self.check(json.load(f))
rospy.set_param('/work', experiment)
self.lock_experiment = RLock()
self.failing_workers = {} # association {num_worker: remaining attempts before blacklist}
def _cb_get_work(self, worker):
with self.lock_experiment:
experiment = rospy.get_param('/work')
disabled_workers = rospy.get_param("/experiment/disabled_workers", [])
if worker not in disabled_workers:
for task in range(len(experiment)):
for trial in range(experiment[task]['num_trials']):
status = experiment[task]['progress'][trial]['status']
resuming = status =='taken' and experiment[task]['progress'][trial]['worker'] == worker
# If this work is open or taken and the same worker is requesting some work, distribute it!
if status == 'open' or resuming:
if self.is_completed(task, trial, experiment):
experiment[task]['progress'][trial]['status'] = 'complete'
self.num_workers -= 1
rospy.set_param('/work', experiment)
else:
# This task needs work, distribute it to the worker
experiment[task]['progress'][trial]['status'] = 'taken'
experiment[task]['progress'][trial]['worker'] = worker
rospy.set_param('/work', experiment)
self.num_workers += 1
if resuming:
rospy.logwarn("Resuming worker {} {} from iteration {}/{}".format(worker, experiment[task]['method'],
experiment[task]['progress'][trial]['iteration'],
experiment[task]['num_iterations']))
else:
rospy.logwarn("Distributing {} iterations {} trial {} to worker {}".format(experiment[task]['num_iterations'], experiment[task]['method'], trial, worker))
return dict(method=experiment[task]['method'],
iteration=experiment[task]['progress'][trial]['iteration'],
num_iterations=experiment[task]['num_iterations'],
task=task, trial=trial, work_available=True)
else:
pass
#rospy.logwarn("Worker {} requested work but it is blacklisted".format(worker))
return dict(work_available=False)
def _cb_add_work(self, request):
with self.lock_experiment:
experiment = rospy.get_param('/work')
task = {
"num_iterations": request.num_iterations,
"num_trials": request.num_trials,
"method": request.method
}
experiment.append(task)
rospy.set_param('/work', self.check(experiment))
return AddWorkResponse(task=len(experiment)-1)
def load_params_from_yaml(fp):
from yaml import load
with open(fp, 'r') as infile:
yaml_data = load(infile)
for param in yaml_data:
print "param:", param, ":", yaml_data[param]
rospy.set_param("~"+param, yaml_data[param])
# this function can be used to generate flame graphs easily
def reset(self, condition=0, bottom_face=None, start_motor_positions=None):
if bottom_face is None:
bottom_face = 0
rospy.set_param('/bottom_face', bottom_face + 1)
self._ctrl_pub.publish(String('reset'))
if start_motor_positions is None:
self._set_motor_positions(np.ones(12) * 0.95)
else:
self._set_motor_positions(np.array(start_motor_positions))
time.sleep(0.5)
for _ in range(30):
self._advance_simulation()
def reset(self, bottom_face=0):
rospy.set_param('/bottom_face', bottom_face + 1)
self._ctrl_pub.publish(String('reset'))
def define(self):
self.say=rospy.Publisher('speak_string', String, queue_size=1)
if not rospy.has_param('~words'):
rospy.set_param('~words','???????')
if not rospy.has_param('~SpeakerSubTopic'):
rospy.set_param('~SpeakerSubTopic', 'stop_flag')
self.words=rospy.get_param('~words')
self.topic=rospy.get_param('~SpeakerSubTopic')
def engine_setting(self, name = 'baidu')
rospy.loginfo('currently just support baidu and google')
rospy.set_param('~engine_name', name)
#speaker_settings
def Gender_setting(self, data):
rospy.loginfo('please input man/women')
if data in self.gender_lib:
rospy.set_param('~Gender', data)
def language_setting(self, data):
rospy.loginfo('please input zh (Chinese), ct (Cantonese), en ?English?')
if data in self.language_lib:
rospy.set_param('~LAN', data)
def speed_setting(self, data):
rospy.loginfo('speak speed range is 0 ~ 9')
if data in self.level:
rospy.set_param('~SPEED', data)
else:
rospy.loginfo('currently not support this speak speed level')