def __init__(self):
"""
Create HeadLiftJoy object.
"""
# params
self._head_axes = rospy.get_param('~head_axes', 3)
self._lift_axes = rospy.get_param('~lift_axes', 3)
self._head_button = rospy.get_param('~head_button', 4)
self._lift_button = rospy.get_param('~lift_button', 5)
# pubs
self._head_pub = rospy.Publisher('head_angle', Float64, queue_size=1)
self._lift_pub = rospy.Publisher('lift_height', Float64, queue_size=1)
# subs
self._joy_sub = rospy.Subscriber('joy', Joy, self._joy_cb, queue_size=1)
python类Joy()的实例源码
def __init__(self):
self.joy_pub = rospy.Publisher('sensors/joystick/1', Joy, queue_size=1)
self.joy_pub2 = rospy.Publisher('sensors/joystick/2', Joy, queue_size=1)
self.rospack = RosPack()
self.rate = rospy.Rate(20)
count = len(devices.gamepads)
if count < 2:
rospy.logerr("Sensors: Expecting 2 joysticks but found only {}, exiting".format(count))
sys.exit(-1)
gamepads = sorted(devices.gamepads, key=lambda pad: int(str(pad.name)[-1]))
rospy.loginfo(gamepads)
self.joysticks = [JoystickThread(pad) for pad in gamepads]
[joystick.start() for joystick in self.joysticks]
def __init__(self):
self.topics = {
"ball": {"topic": "environment/ball", "sub": None, "data": CircularState(), "type": CircularState},
"light": {"topic": "environment/light", "sub": None, "data": UInt8(), "type": UInt8},
"sound": {"topic": "environment/sound", "sub": None, "data": Float32(), "type": Float32},
"ergo_state": {"topic": "ergo/state", "sub": None, "data": CircularState(), "type": CircularState},
"joy1": {"topic": "sensors/joystick/1", "sub": None, "data": Joy(), "type": Joy},
"joy2": {"topic": "sensors/joystick/2", "sub": None, "data": Joy(), "type": Joy},
"torso_l_j": {"topic": "{}/joint_state".format("poppy_torso"), "sub": None, "data": JointState(), "type": JointState},
"torso_l_eef": {"topic": "{}/left_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped},
"torso_r_eef": {"topic": "{}/right_arm/end_effector_pose".format("poppy_torso"), "sub": None, "data": PoseStamped(), "type": PoseStamped}
}
self.topics["ball"]["sub"] = rospy.Subscriber(self.topics["ball"]["topic"], self.topics["ball"]["type"], self.cb_ball)
self.topics["light"]["sub"] = rospy.Subscriber(self.topics["light"]["topic"], self.topics["light"]["type"], self.cb_light)
self.topics["sound"]["sub"] = rospy.Subscriber(self.topics["sound"]["topic"], self.topics["sound"]["type"], self.cb_sound)
self.topics["ergo_state"]["sub"] = rospy.Subscriber(self.topics["ergo_state"]["topic"], self.topics["ergo_state"]["type"], self.cb_ergo)
self.topics["joy1"]["sub"] = rospy.Subscriber(self.topics["joy1"]["topic"], self.topics["joy1"]["type"], self.cb_joy1)
self.topics["joy2"]["sub"] = rospy.Subscriber(self.topics["joy2"]["topic"], self.topics["joy2"]["type"], self.cb_joy2)
self.topics["torso_l_j"]["sub"] = rospy.Subscriber(self.topics["torso_l_j"]["topic"], self.topics["torso_l_j"]["type"], self.cb_torso_l_j)
self.topics["torso_l_eef"]["sub"] = rospy.Subscriber(self.topics["torso_l_eef"]["topic"], self.topics["torso_l_eef"]["type"], self.cb_torso_l_eef)
self.topics["torso_r_eef"]["sub"] = rospy.Subscriber(self.topics["torso_r_eef"]["topic"], self.topics["torso_r_eef"]["type"], self.cb_torso_r_eef)
def _joy_cb(self, msg):
"""
The joy/game-pad message callback.
:type msg: Joy
:param msg: The incoming joy message.
"""
if msg.buttons[self._head_button] == 1:
angle_deg = ((msg.axes[self._head_axes] + 1.0) / 2.0) * SUM_HEAD_ANGLE + MIN_HEAD_ANGLE
rospy.logdebug('Send head angle: {} degrees.'.format(angle_deg))
self._head_pub.publish(data=angle_deg)
if msg.buttons[self._lift_button] == 1:
lift_mm = abs(msg.axes[self._lift_axes]) * SUM_LIFT_HEIGHT + MIN_LIFT_HEIGHT
rospy.logdebug('Send lift height: {} mm.'.format(lift_mm))
self._lift_pub.publish(data=abs(msg.axes[self._lift_axes]))
def _on_joy(self, msg):
""" callback for messages from joystick input
Args:
msg(Joy): a joystick input message
"""
raise NotImplementedError()
def _on_joy(self, msg):
""" callback for messages from joystick input
Args:
msg(Joy): a joystick input message
"""
self._controls['btnLeft'] = (msg.buttons[2] == 1)
self._controls['btnUp'] = (msg.buttons[3] == 1)
self._controls['btnDown'] = (msg.buttons[0] == 1)
self._controls['btnRight'] = (msg.buttons[1] == 1)
self._controls['dPadUp'] = (msg.axes[7] > 0.5)
self._controls['dPadDown'] = (msg.axes[7] < -0.5)
self._controls['dPadLeft'] = (msg.axes[6] > 0.5)
self._controls['dPadRight'] = (msg.axes[6] < -0.5)
self._controls['leftStickHorz'] = msg.axes[0]
self._controls['leftStickVert'] = msg.axes[1]
self._controls['rightStickHorz'] = msg.axes[3]
self._controls['rightStickVert'] = msg.axes[4]
self._controls['leftBumper'] = (msg.buttons[4] == 1)
self._controls['rightBumper'] = (msg.buttons[5] == 1)
self._controls['leftTrigger'] = (msg.axes[2] < 0.0)
self._controls['rightTrigger'] = (msg.axes[5] < 0.0)
self._controls['function1'] = (msg.buttons[6] == 1)
self._controls['function2'] = (msg.buttons[10] == 1)
def _on_joy(self, msg):
""" callback for messages from joystick input
Args:
msg(Joy): a joystick input message
"""
self._controls['btnLeft'] = (msg.buttons[2] == 1)
self._controls['btnUp'] = (msg.buttons[3] == 1)
self._controls['btnDown'] = (msg.buttons[0] == 1)
self._controls['btnRight'] = (msg.buttons[1] == 1)
self._controls['dPadUp'] = (msg.axes[7] > 0.5)
self._controls['dPadDown'] = (msg.axes[7] < -0.5)
self._controls['dPadLeft'] = (msg.axes[6] > 0.5)
self._controls['dPadRight'] = (msg.axes[6] < -0.5)
self._controls['leftStickHorz'] = msg.axes[0]
self._controls['leftStickVert'] = msg.axes[1]
self._controls['rightStickHorz'] = msg.axes[3]
self._controls['rightStickVert'] = msg.axes[4]
self._controls['leftBumper'] = (msg.buttons[4] == 1)
self._controls['rightBumper'] = (msg.buttons[5] == 1)
self._controls['leftTrigger'] = (msg.axes[2] < 0.0)
self._controls['rightTrigger'] = (msg.axes[5] < 0.0)
self._controls['function1'] = (msg.buttons[6] == 1)
self._controls['function2'] = (msg.buttons[7] == 1)
def _on_joy(self, msg):
""" callback for messages from joystick input
Args:
msg(Joy): a joystick input message
"""
self._controls['btnLeft'] = (msg.buttons[15] == 1)
self._controls['btnUp'] = (msg.buttons[12] == 1)
self._controls['btnDown'] = (msg.buttons[14] == 1)
self._controls['btnRight'] = (msg.buttons[13] == 1)
self._controls['dPadUp'] = (msg.buttons[4] == 1)
self._controls['dPadDown'] = (msg.buttons[6] == 1)
self._controls['dPadLeft'] = (msg.buttons[7] == 1)
self._controls['dPadRight'] = (msg.buttons[5] == 1)
self._controls['leftStickHorz'] = msg.axes[0]
self._controls['leftStickVert'] = msg.axes[1]
self._controls['rightStickHorz'] = msg.axes[2]
self._controls['rightStickVert'] = msg.axes[3]
self._controls['leftBumper'] = (msg.buttons[10] == 1)
self._controls['rightBumper'] = (msg.buttons[11] == 1)
self._controls['leftTrigger'] = (msg.buttons[8] == 1)
self._controls['rightTrigger'] = (msg.buttons[9] == 1)
self._controls['function1'] = (msg.buttons[0] == 1)
self._controls['function2'] = (msg.buttons[3] == 1)
def __init__(self):
self.lastButtonState = 0
self.buttonWasPressed = False
rospy.Subscriber("/joy", Joy, self.joyChanged)
def __init__(self):
self.initial_poses = {}
self.planning_groups_tips = {}
self.tf_listener = tf.TransformListener()
self.marker_lock = threading.Lock()
self.prev_time = rospy.Time.now()
self.counter = 0
self.history = StatusHistory(max_length=10)
self.pre_pose = PoseStamped()
self.pre_pose.pose.orientation.w = 1
self.current_planning_group_index = 0
self.current_eef_index = 0
self.initialize_poses = False
self.initialized = False
self.parseSRDF()
self.plan_group_pub = rospy.Publisher('/rviz/moveit/select_planning_group', String, queue_size=5)
self.updatePlanningGroup(0)
self.updatePoseTopic(0, False)
self.joy_pose_pub = rospy.Publisher("/joy_pose", PoseStamped, queue_size=1)
self.plan_pub = rospy.Publisher("/rviz/moveit/plan", Empty, queue_size=5)
self.execute_pub = rospy.Publisher("/rviz/moveit/execute", Empty, queue_size=5)
self.update_start_state_pub = rospy.Publisher("/rviz/moveit/update_start_state", Empty, queue_size=5)
self.update_goal_state_pub = rospy.Publisher("/rviz/moveit/update_goal_state", Empty, queue_size=5)
self.interactive_marker_sub = rospy.Subscriber("/rviz_moveit_motion_planning_display/robot_interaction_interactive_marker_topic/update_full",
InteractiveMarkerInit,
self.markerCB, queue_size=1)
self.sub = rospy.Subscriber("/joy", Joy, self.joyCB, queue_size=1)
def __init__(self, fig, gs, rows, cols, actions_arr):
"""
Constructs an ActionPanel assuming actions_arr is an array of
fully initialized actions.
Each action must have: key, name, func.
Each action can have: axis_pos, keyboard_binding, ps3_binding.
"""
assert len(actions_arr) <= rows*cols, 'Too many actions to put into gridspec.'
self._fig = fig
self._gs = gridspec.GridSpecFromSubplotSpec(rows, cols, subplot_spec=gs)
self._axarr = [plt.subplot(self._gs[i]) for i in range(len(actions_arr))]
# Read keyboard_bindings and ps3_bindings from config
self._actions = {action.key: action for action in actions_arr}
for key, action in self._actions.iteritems():
if key in config['keyboard_bindings']:
action.kb = config['keyboard_bindings'][key]
if key in config['ps3_bindings']:
action.pb = config['ps3_bindings'][key]
self._buttons = None
self._initialize_buttons()
self._cid = self._fig.canvas.mpl_connect('key_press_event', self.on_key_press)
if ROS_ENABLED:
self._ps3_count = 0
rospy.Subscriber(config['ps3_topic'], Joy, self.ps3_callback)
def publish_joy(self, x, y, publisher):
joy = Joy()
joy.header.stamp = rospy.Time.now()
joy.axes.append(y)
joy.axes.append(x)
publisher.publish(joy)
def __init__(self):
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
self.params = json.load(f)
self.button = Button(self.params)
self.rate = rospy.Rate(self.params['publish_rate'])
self.button.switch_led(False)
# Service callers
self.robot_reach_srv_name = '{}/reach'.format(self.params['robot_name'])
self.robot_compliant_srv_name = '{}/set_compliant'.format(self.params['robot_name'])
rospy.loginfo("Ergo node is waiting for poppy controllers...")
rospy.wait_for_service(self.robot_reach_srv_name)
rospy.wait_for_service(self.robot_compliant_srv_name)
self.reach_proxy = rospy.ServiceProxy(self.robot_reach_srv_name, ReachTarget)
self.compliant_proxy = rospy.ServiceProxy(self.robot_compliant_srv_name, SetCompliant)
rospy.loginfo("Controllers connected!")
self.state_pub = rospy.Publisher('ergo/state', CircularState, queue_size=1)
self.button_pub = rospy.Publisher('sensors/buttons/help', Bool, queue_size=1)
self.goals = []
self.goal = 0.
self.joy_x = 0.
self.joy_y = 0.
self.motion_started_joy = 0.
self.js = JointState()
rospy.Subscriber('sensors/joystick/{}'.format(self.params["control_joystick_id"]), Joy, self.cb_joy)
rospy.Subscriber('{}/joint_state'.format(self.params['robot_name']), JointState, self.cb_js)
rospy.Subscriber('sensors/button_leds/pause', Bool, self.cb_bt_led)
self.t = rospy.Time.now()
self.srv_reset = None
self.extended = False
self.standby = False
self.last_activity = rospy.Time.now()
self.delta_t = rospy.Time.now()
def __init__(self):
self.joy_pub = rospy.Publisher('sensors/joystick/1', Joy, queue_size=1)
self.joy_pub2 = rospy.Publisher('sensors/joystick/2', Joy, queue_size=1)
self.ball_pub = rospy.Publisher('environment/ball', CircularState, queue_size=1)
self.light_pub = rospy.Publisher('environment/light', UInt8, queue_size=1)
self.sound_pub = rospy.Publisher('environment/sound', Float32, queue_size=1)
self.rospack = RosPack()
with open(join(self.rospack.get_path('apex_playground'), 'config', 'ergo.json')) as f:
self.ergo_params = json.load(f)
with open(join(self.rospack.get_path('apex_playground'), 'config', 'environment.json')) as f:
self.env_params = json.load(f)
self.rate = rospy.Rate(self.ergo_params['publish_rate'])
vrep_port = rospy.get_param('vrep/environment_port', 29997)
self.simulation_id = vrep.simxStart('127.0.0.1', vrep_port, True, False, 5000, 5)
# Object names in V-Rep
self.joystick_left_joints = ['Joystick_2_Axis_2', 'Joystick_2_Axis_1']
self.joystick_right_joints = ['Joystick_1_Axis_2', 'Joystick_1_Axis_1']
self.ball_name = 'TennisBall'
self.arena_name = 'Arena'
if self.ergo_params["control_joystick_id"] != 2:
useless_joy = self.joystick_left_joints
self.joystick_left_joints = self.joystick_right_joints
self.joystick_right_joints = useless_joy
self.joints = JointTracker(self.joystick_left_joints + self.joystick_right_joints, self.simulation_id)
self.objects = ObjectTracker([self.ball_name, self.arena_name], self.simulation_id)
self.conversions = EnvironmentConversions()
def start_subbing(self):
stick_sub = rospy.Subscriber("joy", Joy, self.handle_joystick)
def __init__(self, fig, gs, rows, cols, actions_arr):
"""
Constructs an ActionPanel assuming actions_arr is an array of
fully initialized actions.
Each action must have: key, name, func.
Each action can have: axis_pos, keyboard_binding, ps3_binding.
"""
assert len(actions_arr) <= rows*cols, 'Too many actions to put into gridspec.'
self._fig = fig
self._gs = gridspec.GridSpecFromSubplotSpec(rows, cols, subplot_spec=gs)
self._axarr = [plt.subplot(self._gs[i]) for i in range(len(actions_arr))]
# Read keyboard_bindings and ps3_bindings from config
self._actions = {action.key: action for action in actions_arr}
for key, action in self._actions.iteritems():
if key in config['keyboard_bindings']:
action.kb = config['keyboard_bindings'][key]
if key in config['ps3_bindings']:
action.pb = config['ps3_bindings'][key]
self._buttons = None
self._initialize_buttons()
self._cid = self._fig.canvas.mpl_connect('key_press_event', self.on_key_press)
if ROS_ENABLED:
self._ps3_count = 0
rospy.Subscriber(config['ps3_topic'], Joy, self.ps3_callback)
def __init__(self, joy_topic):
rospy.loginfo("waiting for petrone")
rospy.wait_for_message('battery', Float32)
rospy.loginfo("found petrone")
# fly control publisher
self.pub_fly = rospy.Publisher('cmd_fly', Int8, queue_size=1)
self.pub_led = rospy.Publisher('led_color', String, queue_size=1)
# subscribe to the joystick at the end to make sure that all required
# services were found
self._buttons = None
rospy.Subscriber(joy_topic, Joy, self.cb_joy)
def joy_listener():
# start node
rospy.init_node("turtlesim_joy", anonymous=True)
# subscribe to joystick messages on topic "joy"
rospy.Subscriber("joy", Joy, tj_callback, queue_size=1)
# keep node alive until stopped
rospy.spin()
# called when joy message is received
def __init__(self, swarmcontroller):
self._sc = swarmcontroller
topic = rospy.get_param("~joy_topic", "joy")
rospy.Subscriber(topic, Joy, self._joystickUpdate)
self._buttons = None
def __init__(self):
self.pub = rospy.Publisher('/mavros/rc/override', OverrideRCIn,
queue_size=10)
self.sub = rospy.Subscriber('/joy', Joy, self.callback)
def joy_listener():
# start node
rospy.init_node("turtlesim_joy", anonymous=True)
# subscribe to joystick messages on topic "joy"
rospy.Subscriber("joy", Joy, tj_callback, queue_size=1)
# keep node alive until stopped
rospy.spin()
# called when joy message is received
def __init__(self, scale=1.0, offset=0.0, deadband=0.1):
"""
Maps joystick input to robot control.
@type scale: float
@param scale: scaling applied to joystick values [1.0]
@type offset: float
@param offset: joystick offset values, post-scaling [0.0]
@type deadband: float
@param deadband: deadband post scaling and offset [0.1]
Raw joystick valuess are in [1.0...-1.0].
"""
sub = rospy.Subscriber("/joy", Joy, self._on_joy)
self._scale = scale
self._offset = offset
self._deadband = deadband
self._controls = {}
self._buttons = {}
self._sticks = {}
button_names = (
'btnLeft', 'btnUp', 'btnDown', 'btnRight',
'dPadUp', 'dPadDown', 'dPadLeft', 'dPadRight',
'leftBumper', 'rightBumper',
'leftTrigger', 'rightTrigger',
'function1', 'function2')
stick_names = (
'leftStickHorz', 'leftStickVert',
'rightStickHorz', 'rightStickVert')
#doing this with lambda won't work
def gen_val_func(name, type_name):
def val_func():
return type_name(
name in self._controls and self._controls[name])
return val_func
for name in button_names:
self._buttons[name] = ButtonTransition(gen_val_func(name, bool))
for name in stick_names:
self._sticks[name] = StickTransition(gen_val_func(name, float))
def __init__(self):
rate = rospy.Rate(5)
rospy.Subscriber("/bluetooth_teleop/joy", Joy, self.joy_callback)
# Initialize button variables for button input
self.x = 0
self.circ = 0
self.sq = self.tri = self.L1 = self.R1 = self.share = self.options = self.p4 = self.L3 = self.R3 = self.DL = self.DR = self.DU = self.DD = 0
# Boolean variable used for tracking the status of object tracking process
self.active = False
objtrack_process = None
rospy.loginfo("In start")
while not rospy.is_shutdown():
# If object tracking process is not currenlty active
if self.active == False:
# Start Object Tracking if circle button pressed
if (self.circ == 1):
rospy.loginfo("Joystick code received, commencing object tracking protocol...")
self.active = True
package = 'obj_track'
executable = 'track.py'
node = roslaunch.core.Node(package, executable)
launch = roslaunch.scriptapi.ROSLaunch()
launch.start()
objtrack_process = launch.launch(node)
# If object tracking process is active
else:
# Stop Object Tracking if x button pressed
if (self.x == 1):
rospy.loginfo("Joystick code recieved, terminating object tracking protocol")
self.active = False
objtrack_process.stop()
# Reset button variables for next pass
self.x = 0
self.circ = 0
# callback function maps button data observed from joystick topic