def get(self, x_des, seed=None, bounds=()):
"""
Get the IK by minimization
:param x_des: desired task space pose [[x, y, z], [x, y, z, w]]
:param seed: RobotState message
:param bounds:[(min, max), (min, max), (min, max), ... for each joint]
:return: (bool, joints)
"""
if isinstance(seed, RobotState):
seed = seed.joint_state
elif not isinstance(seed, JointState) and seed is not None:
raise TypeError('ros.IK.get only accepts RS or JS, got {}'.format(type(seed)))
seed = [seed.position[seed.name.index(joint)] for joint in self._ik.joints] if seed is not None else ()
result = self._ik.get(x_des, seed, bounds)
return result[0], JointState(name=self._ik.joints, position=list(result[1]))
python类JointState()的实例源码
def add_viapoint(self, t, obsys, sigmay=1e-6):
"""
Add a viapoint i.e. an observation at a specific time
:param t: Time of observation
:param obsys: RobotState observed at the time
:param sigmay:
:return:
"""
if isinstance(obsys, RobotState):
obsys = obsys.joint_state
elif not isinstance(obsys, JointState):
raise TypeError("ros.ProMP.add_viapoint only accepts RS or JS, got {}".format(type(obsys)))
try:
positions = [obsys.position[obsys.name.index(joint)] for joint in self.joint_names] # Make sure joints are in right order
except KeyError as e:
raise KeyError("Joint {} provided as viapoint is unknown to the demonstrations".format(e))
else:
self.promp.add_viapoint(t, map(float, positions), sigmay)
def __init__(self):
#init code
rospy.init_node("robotGame")
self.currentDist = 1
self.previousDist = 1
self.reached = False
self.tf = TransformListener()
self.left_joint_names = ['yumi_joint_1_l', 'yumi_joint_2_l', 'yumi_joint_3_l', 'yumi_joint_4_l', 'yumi_joint_5_l', 'yumi_joint_6_l', 'yumi_joint_7_l', 'gripper_l_joint']
self.right_joint_names = ['yumi_joint_1_r', 'yumi_joint_2_r', 'yumi_joint_3_r', 'yumi_joint_4_r', 'yumi_joint_5_r', 'yumi_joint_6_r', 'yumi_joint_7_r', 'gripper_r_joint']
self.left_positions = [-2.01081820427463881, 1.4283937236421274, -1.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 0.0]
self.right_positions = [0.01081820427463881, 2.4283937236421274, 0.3593418228836045, -0.19315625641494183, 1.7016501799872579, 0.6573540231496411, 3.404315594906305, 1.8145107750466565]
self.rjv = []
self.ljv = []
self.pub = rospy.Publisher('my_joint_states', JointState, queue_size=1)
self.js = JointState()
self.js.header = Header()
self.js.name = self.left_joint_names + self.right_joint_names
self.js.velocity = []
self.js.effort = []
self.sub = rospy.Subscriber('/joint_states', JointState, self.jsCB)
self.destPos = np.random.uniform(0,0.25, size =(3))
self.reset()
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
pc = PositionCommand()
pc.mode = JOINT_SPACE
#pc.arm = PositionCommand.LEFT_ARM
pc.arm = 1#PositionCommand.RIGHT_ARM
pc.data = np.zeros(7)
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(pc)
def __init__(self):
self.topics = {
"ball": {"topic": "environment/ball", "sub": None, "data": CircularState(), "type": CircularState},
"light": {"topic": "environment/light", "sub": None, "data": UInt8(), "type": UInt8},
"sound": {"topic": "environment/sound", "sub": None, "data": Float32(), "type": Float32},
"ergo_state": {"topic": "ergo/state", "sub": None, "data": CircularState(), "type": CircularState},
"joy1": {"topic": "sensors/joystick/1", "sub": None, "data": Joy(), "type": Joy},
"joy2": {"topic": "sensors/joystick/2", "sub": None, "data": Joy(), "type": Joy},
"torso_l_j": {"topic": "{}/joint_state".format("poppy_torso"), "sub": None, "data": JointState(), "type": JointState},
"torso_l_eef": {"topic": "{}/left_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped},
"torso_r_eef": {"topic": "{}/right_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped}
}
self.topics["ball"]["sub"] = rospy.Subscriber(self.topics["ball"]["topic"], self.topics["ball"]["type"], self.cb_ball)
self.topics["light"]["sub"] = rospy.Subscriber(self.topics["light"]["topic"], self.topics["light"]["type"], self.cb_light)
self.topics["sound"]["sub"] = rospy.Subscriber(self.topics["sound"]["topic"], self.topics["sound"]["type"], self.cb_sound)
self.topics["ergo_state"]["sub"] = rospy.Subscriber(self.topics["ergo_state"]["topic"], self.topics["ergo_state"]["type"], self.cb_ergo)
self.topics["joy1"]["sub"] = rospy.Subscriber(self.topics["joy1"]["topic"], self.topics["joy1"]["type"], self.cb_joy1)
self.topics["joy2"]["sub"] = rospy.Subscriber(self.topics["joy2"]["topic"], self.topics["joy2"]["type"], self.cb_joy2)
self.topics["torso_l_j"]["sub"] = rospy.Subscriber(self.topics["torso_l_j"]["topic"], self.topics["torso_l_j"]["type"], self.cb_torso_l_j)
self.topics["torso_l_eef"]["sub"] = rospy.Subscriber(self.topics["torso_l_eef"]["topic"], self.topics["torso_l_eef"]["type"], self.cb_torso_l_eef)
self.topics["torso_r_eef"]["sub"] = rospy.Subscriber(self.topics["torso_r_eef"]["topic"], self.topics["torso_r_eef"]["type"], self.cb_torso_r_eef)
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
pc = PositionCommand()
pc.mode = JOINT_SPACE
#pc.arm = PositionCommand.LEFT_ARM
pc.arm = 1#PositionCommand.RIGHT_ARM
pc.data = np.zeros(7)
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(pc)
def _init_pubsub(self):
self.joint_states_pub = rospy.Publisher('joint_states', JointState)
self.odom_pub = rospy.Publisher('odom', Odometry)
self.sensor_state_pub = rospy.Publisher('~sensor_state', TurtlebotSensorState)
self.operating_mode_srv = rospy.Service('~set_operation_mode', SetTurtlebotMode, self.set_operation_mode)
self.digital_output_srv = rospy.Service('~set_digital_outputs', SetDigitalOutputs, self.set_digital_outputs)
if self.drive_mode == 'twist':
self.cmd_vel_sub = rospy.Subscriber('cmd_vel', Twist, self.cmd_vel)
self.drive_cmd = self.robot.direct_drive
elif self.drive_mode == 'drive':
self.cmd_vel_sub = rospy.Subscriber('cmd_vel', Drive, self.cmd_vel)
self.drive_cmd = self.robot.drive
elif self.drive_mode == 'turtle':
self.cmd_vel_sub = rospy.Subscriber('cmd_vel', Turtle, self.cmd_vel)
self.drive_cmd = self.robot.direct_drive
else:
rospy.logerr("unknown drive mode :%s"%(self.drive_mode))
self.transform_broadcaster = None
if self.publish_tf:
self.transform_broadcaster = tf.TransformBroadcaster()
def nearest_neighbour(self, fk_dict, scale_orient=40):
def query_database(link, fk):
x_vector = deepcopy(fk[0])
if self.with_orient:
x_vector += (np.array(fk[1]) / scale_orient).tolist()
res = self.trees[link]['tree'].query(x_vector)
joints = self.trees[link]['data']['data'][res[1]][:-7]
joint_names = self.trees[link]['data']['names'][:-7]
return joints.tolist(), joint_names.tolist()
state = JointState()
for key, value in fk_dict.iteritems():
joints, joint_names = query_database(key, value)
state.position += joints
state.name += joint_names
return state
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(POS_COM_TOPIC, PositionCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, PositionCommand, listen)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
pc = PositionCommand()
pc.mode = JOINT_SPACE
#pc.arm = PositionCommand.LEFT_ARM
pc.arm = 1#PositionCommand.RIGHT_ARM
pc.data = np.zeros(7)
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(pc)
def getAnglesFromDict(d):# NOTE Fails currently if dict is None
"""
Converts a dictionary to a angles of a JointState()
:param d (dict): The dictionary to be converted
:return (sensor_msgs.msg.JointState): The angles
"""
# print datapath
if d is None:
rospy.logerr("Could not get angles for non existing dictionary")
return None
js=JointState()
for n,v in d.items():
js.name.append(n.encode())
js.position.append(v)
return js
def timer_callback(self, event):
msg = JointState()
self.mutex.acquire()
if time.time() - self.last_command_time < 0.5:
dq = cartesian_control(self.joint_transforms,
self.x_current, self.x_target,
False, self.q_current, self.q0_desired)
msg.velocity = dq
elif time.time() - self.last_red_command_time < 0.5:
dq = cartesian_control(self.joint_transforms,
self.x_current, self.x_current,
True, self.q_current, self.q0_desired)
msg.velocity = dq
else:
msg.velocity = numpy.zeros(7)
self.mutex.release()
self.pub_vel.publish(msg)
def set_start_state(self, msg):
"""
Specify a start state for the group.
Parameters
----------
msg : moveit_msgs/RobotState
Examples
--------
>>> from moveit_msgs.msg import RobotState
>>> from sensor_msgs.msg import JointState
>>> joint_state = JointState()
>>> joint_state.header = Header()
>>> joint_state.header.stamp = rospy.Time.now()
>>> joint_state.name = ['joint_a', 'joint_b']
>>> joint_state.position = [0.17, 0.34]
>>> moveit_robot_state = RobotState()
>>> moveit_robot_state.joint_state = joint_state
>>> group.set_start_state(moveit_robot_state)
"""
self._g.set_start_state(conversions.msg_to_string(msg))
def go(self, joints = None, wait = True):
""" Set the target of the group and then move the group to the specified target """
if type(joints) is bool:
wait = joints
joints = None
elif type(joints) is JointState:
self.set_joint_value_target(joints)
elif type(joints) is Pose:
self.set_pose_target(joints)
elif not joints == None:
try:
self.set_joint_value_target(self.get_remembered_joint_values()[joints])
except:
self.set_joint_value_target(joints)
if wait:
return self._g.move()
else:
return self._g.async_move()
def __init__(self, server):
self.server = server
self.mutex = Lock()
# Publisher to send commands
self.pub_command = rospy.Publisher("/motion_planning_goal", geometry_msgs.msg.Transform,
queue_size=1)
self.listener = tf.TransformListener()
# Subscribes to information about what the current joint values are.
rospy.Subscriber("/joint_states", sensor_msgs.msg.JointState,
self.joint_states_callback)
# Publisher to set robot position
self.pub_reset = rospy.Publisher("/joint_command", JointState, queue_size=1)
rospy.sleep(0.5)
# Wait for validity check service
rospy.wait_for_service("check_state_validity")
self.state_valid_service = rospy.ServiceProxy('check_state_validity',
moveit_msgs.srv.GetStateValidity)
self.reset_robot()
def js_cb(self, a):
rospy.loginfo('Received array')
js = JointState()
js.name = ['head_pan', 'right_s0', 'right_s1', 'right_e0', 'right_e1', 'right_w0', 'right_w1', 'right_w2', 'left_s0', 'left_s1', 'left_e0', 'left_e1', 'left_w0', 'left_w1', 'left_w2']
jList = a.data
jMatrix = np.reshape(jList,(jList.shape[0]/15,15))
i = 0
for pos in jMatrix:
rospy.loginfo(pos)
js.position = pos
gsvr = GetStateValidityRequest()
rs = RobotState()
rs.joint_state = js
gsvr.robot_state = rs
gsvr.group_name = "both_arms"
resp = self.coll_client(gsvr)
if (resp.valid == False):
rospy.loginfo('false')
rospy.loginfo(i)
self.js_pub.publish(0)
return
i = i + 1
self.js_pub.publish(1)
rospy.loginfo('true')
return
def to_joint_state(state):
if isinstance(state, RobotState):
state = state.joint_state
elif not isinstance(state, JointState):
raise TypeError("ROSBridge.to_joint_trajectory only accepts RT or JT, got {}".format(type(trajectory)))
return state
def get(self, state):
"""
Get the FK
:param state: RobotState message to get the forward kinematics for
:return: [[x, y, z], [x, y, z, w]]
"""
if isinstance(state, RobotState):
state = state.joint_state
elif not isinstance(state, JointState):
raise TypeError('ros.FK.get only accepts RS or JS, got {}'.format(type(state)))
return self._fk.get([state.position[state.name.index(joint)] for joint in self.joints])
def set_start(self, obsy, sigmay=1e-6):
if isinstance(obsy, RobotState):
obsy = obsy.joint_state
elif not isinstance(obsy, JointState):
raise TypeError("ros.ProMP.set_start only accepts RS or JS, got {}".format(type(obsy)))
try:
positions = [obsy.position[obsy.name.index(joint)] for joint in self.joint_names] # Make sure joints are in right order
except KeyError as e:
raise KeyError("Joint {} provided as start state is unknown to the demonstrations".format(e))
else:
self.promp.set_start(map(float, positions), sigmay)
def __init__(self):
self.joint_name = rospy.get_param("~joint_name")
self.command_pub = rospy.Publisher("command", Float64, queue_size=1)
self.joint_sub = rospy.Subscriber("joint_state", JointState,
self.joint_state_callback, queue_size=1)
def __init__(self):
self.joint_name = rospy.get_param("~joint_name")
self.joint_state = JointState()
self.joint_state.name.append(self.joint_name)
self.joint_state.position.append(0.0)
self.joint_state.velocity.append(0.0)
self.joint_pub = rospy.Publisher("joint_states", JointState, queue_size=1)
self.command_sub = rospy.Subscriber("command", Float64,
self.command_callback, queue_size=1)
def fk_service_client(limb = "right"):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService"
fksvc = rospy.ServiceProxy(ns, SolvePositionFK)
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3',
'right_j4', 'right_j5', 'right_j6']
joints.position = [0.763331, 0.415979, -1.728629, 1.482985,
-1.135621, -1.674347, -0.496337]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
# Request forward kinematics from base to "right_hand" link
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(ns, 5.0)
resp = fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
# Check if result valid
if (resp.isValid[0]):
rospy.loginfo("SUCCESS - Valid Cartesian Solution Found")
rospy.loginfo("\nFK Cartesian Solution:\n")
rospy.loginfo("------------------")
rospy.loginfo("Response Message:\n%s", resp)
else:
rospy.logerr("INVALID JOINTS - No Cartesian Solution Found.")
return False
return True
def check_get_states_init(self):
if self.hand_state_subscriber is None:
self.hand_state_subscriber = rospy.Subscriber("/joint_states", JointState, self.recv_joint_state)
rospy.sleep(0.1)
def run(self):
""" Run our code """
rospy.loginfo("Start laser test code")
#self.blank_image()
#for x in range(0, 99):
# self.add_point(x * 1.0, x * 1.0, 10)
#cv2.imshow("Hackme", self.img)
#cv2.waitKey(0)
self.joint_subscriber = rospy.Subscriber("/multisense/joint_states", JointState, self.recv_joint_state)
self.scan_subscriber = rospy.Subscriber("/multisense/lidar_scan", LaserScan, self.recv_scan)
self.spindle = rospy.Publisher("/multisense/set_spindle_speed", Float64, queue_size=10)
self.set_x_range(-1.0, 1.0)
self.set_y_range(-1.0, 1.0)
self.set_z_range(0.0, 2.0)
rospy.sleep(0.1)
rospy.loginfo("Setting spindle going")
self.spindle.publish(Float64(1.0))
#self.zarj.zps.look_down()
rospy.loginfo("Spin forever, hopefully receiving data...")
while True:
rospy.sleep(1.0)
#print "Got cloud {}x{}".format(resp.cloud.height, resp.cloud.width)
#img = self.create_image_from_cloud(resp.cloud.points)
#cv2.destroyAllWindows()
#print "New image"
#cv2.imshow("My image", img)
#cv2.waitKey(1)
#print resp
#cv_image = self.bridge.imgmsg_to_cv2(resp.cloud, "bgr8")
#cv2.imshow("Point cloud", cv_image)
def get_endeffector_pos(self, pos_only=True):
"""
:param pos_only: only return postion
:return:
"""
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = self.ctrl.limb.joint_names()
joints.position = [self.ctrl.limb.joint_angle(j)
for j in joints.name]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(self.name_of_service, 5)
resp = self.fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
pos = np.array([resp.pose_stamp[0].pose.position.x,
resp.pose_stamp[0].pose.position.y,
resp.pose_stamp[0].pose.position.z])
return pos
def move_with_impedance(self, des_joint_angles):
"""
non-blocking
"""
js = JointState()
js.name = self.ctrl.limb.joint_names()
js.position = [des_joint_angles[n] for n in js.name]
self.imp_ctrl_publisher.publish(js)
def fk_service_client(limb = "right"):
ns = "ExternalTools/" + limb + "/PositionKinematicsNode/FKService"
fksvc = rospy.ServiceProxy(ns, SolvePositionFK)
fkreq = SolvePositionFKRequest()
joints = JointState()
joints.name = ['right_j0', 'right_j1', 'right_j2', 'right_j3',
'right_j4', 'right_j5', 'right_j6']
joints.position = [0.763331, 0.415979, -1.728629, 1.482985,
-1.135621, -1.674347, -0.496337]
# Add desired pose for forward kinematics
fkreq.configuration.append(joints)
# Request forward kinematics from base to "right_hand" link
fkreq.tip_names.append('right_hand')
try:
rospy.wait_for_service(ns, 5.0)
resp = fksvc(fkreq)
except (rospy.ServiceException, rospy.ROSException), e:
rospy.logerr("Service call failed: %s" % (e,))
return False
# Check if result valid
if (resp.isValid[0]):
rospy.loginfo("SUCCESS - Valid Cartesian Solution Found")
rospy.loginfo("\nFK Cartesian Solution:\n")
rospy.loginfo("------------------")
rospy.loginfo("Response Message:\n%s", resp)
else:
rospy.loginfo("INVALID JOINTS - No Cartesian Solution Found.")
return True
def joint_state_from_cmd(cmd):
js = JointState()
js.name = cmd.keys()
js.position = cmd.values()
return js
def __init__(self, limb = "right"):
# control parameters
self._rate = 1000.0 # Hz
self._missed_cmds = 20.0 # Missed cycles before triggering timeout
# create our limb instance
self._limb = intera_interface.Limb(limb)
# initialize parameters
self._springs = dict()
self._damping = dict()
self._des_angles = dict()
# create cuff disable publisher
cuff_ns = 'robot/limb/' + limb + '/suppress_cuff_interaction'
self._pub_cuff_disable = rospy.Publisher(cuff_ns, Empty, queue_size=1)
# verify robot is enabled
print("Getting robot state... ")
self._rs = intera_interface.RobotEnable(CHECK_VERSION)
self._init_state = self._rs.state().enabled
print("Enabling robot... ")
self._rs.enable()
print("Running. Ctrl-c to quit")
rospy.Subscriber("desired_joint_pos", JointState, self._set_des_pos)
rospy.Subscriber("release_spring", Float32, self._release)
rospy.Subscriber("imp_ctrl_active", Int64, self._imp_ctrl_active)
self.max_stiffness = 20
self.time_to_maxstiffness = .3 ######### 0.68
self.t_release = rospy.get_time()
self._imp_ctrl_is_active = True
for joint in self._limb.joint_names():
self._springs[joint] = 30
self._damping[joint] = 4
saveAccEff.py 文件源码
项目:cu-perception-manipulation-stack
作者: correlllab
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def subscribe(self):
self.subscribeAcc = rospy.Subscriber("/robot/accelerometer/left_accelerometer/state",Imu, self.callback)
self.subscribeEff = rospy.Subscriber("/robot/joint_states", JointState, self.callback2)
rospy.spin()
def main():
rospy.init_node('issue_com')
pub = rospy.Publisher(TRIAL_COM_TOPIC, TrialCommand, queue_size=10)
test_pub = rospy.Publisher(TEST_TOPIC, Empty, queue_size=10)
sub = rospy.Subscriber(POS_COM_TOPIC, TrialCommand, listen)
sub2 = rospy.Subscriber(RESULT_TOPIC, SampleResult, listen_report)
#sub = rospy.Subscriber('/joint_states', JointState, listen)
tc = TrialCommand()
T = 1
tc.controller = get_lin_gauss_test(T=T)
tc.T = T
tc.frequency = 20.0
# NOTE: ordering of datatypes in state is determined by the order here
tc.state_datatypes = [JOINT_ANGLES, JOINT_VELOCITIES]
tc.obs_datatypes = tc.state_datatypes
tc.ee_points = EE_SITES.reshape(EE_SITES.size).tolist()
r = rospy.Rate(1)
#while not rospy.is_shutdown():
# pub.publish(pc)
# r.sleep()
# print 'published!'
r.sleep()
test_pub.publish(Empty())
pub.publish(tc)
rospy.spin()