def __init__(self):
self.petrone = Petrone()
# subscriber
self.sub_flight = rospy.Subscriber('cmd_fly', Int8, self.cb_fly, queue_size=1)
self.sub_cmd = rospy.Subscriber('cmd_vel_raw', Twist, self.cb_cmd, queue_size=1)
self.sub_color = rospy.Subscriber('led_color', String, self.cb_color, queue_size=1)
# publisher
self.pub_battery = rospy.Publisher('battery', Float32, queue_size=1)
self.pub_status_flight = rospy.Publisher('status/flight', Int8, queue_size=1)
self.pub_status_system = rospy.Publisher('status/system', Int8, queue_size=1)
self.pub_imu = rospy.Publisher('imu', Imu, queue_size=1)
# cache
self.is_disconnected = True
self.last_values = defaultdict(lambda: 0)
# flight parameters
self.twist = Twist()
self.twist_at = 0
python类Int8()的实例源码
def __init__(self,
base_set_command='amixer -c 0 sset Master playback',
base_get_command='amixer -c 0 sget Master playback'):
self.base_set_cmd = base_set_command
self.base_get_cmd = base_get_command
self.curr_vol = 0
rospy.loginfo("VolumeManager using base set command: '" +
self.base_set_cmd + "'")
rospy.loginfo("VolumeManager using base get command: '" +
self.base_get_cmd + "'")
self.sub = rospy.Subscriber('~set_volume',
Int8,
self.cb,
queue_size=1)
# Get volume to start publishing
self.curr_vol = self.get_current_volume()
self.pub = rospy.Publisher('~get_volume',
Int8,
latch=True,
queue_size=1)
self.timer = rospy.Timer(rospy.Duration(1.0), self.curr_vol_cb)
def __init__(self, joy_topic):
rospy.loginfo("waiting for petrone")
rospy.wait_for_message('battery', Float32)
rospy.loginfo("found petrone")
# fly control publisher
self.pub_fly = rospy.Publisher('cmd_fly', Int8, queue_size=1)
self.pub_led = rospy.Publisher('led_color', String, queue_size=1)
# subscribe to the joystick at the end to make sure that all required
# services were found
self._buttons = None
rospy.Subscriber(joy_topic, Joy, self.cb_joy)
def curr_vol_cb(self, event):
self.curr_vol = self.get_current_volume()
self.pub.publish(Int8(self.curr_vol))
def callback(data):
global current_msg
global key
global last_message
global buff_msg
#determine game type according to the data head tag!!!!!!!!!
#rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)
#print type(data), type(data.data), len(data.data)
raw_msg = data.data
if not raw_msg == buff_msg:
current_msg = dataProcessing(raw_msg)
if current_msg == last_msg and (not current_msg == None):
key = False
print key
else:
key = True
#TODO change to a mouth color publisher according to the current_msg
pub = rospy.Publisher('mouth_color', Int8, queue_size=10)
#target_color = current_msg[len(current_msg)-1] #this is formal solution
#temp solution
target_color = 1
if task[0] == "complete_astronaut":
target_color = 1
elif task[0] == "complete_clown":
target_color = 2
elif task[0] == "complete_wizard":
target_color = 3
pub.publish(target_color)
buff_msg = raw_msg
def listener2(threadName, delay):#threadName, delay
global f
rospy.Subscriber("mouth_color", Int8, callback2)
# spin() simply keeps python from exiting until this node is stopped
rospy.spin()
def __init__(self, pocket_name):
GazeboObject.__init__(self, pocket_name)
self._active = None
self._last_time_active = None
self._off_delay = 0.1
rospy.Subscriber("/"+pocket_name+"/contact", Int8, self.update_state)
def __init__(self, max_random_start,
observation_dims, data_format, display):
self.max_random_start = max_random_start
self.action_size = 28
self.display = display
self.data_format = data_format
self.observation_dims = observation_dims
self.screen = np.zeros((self.observation_dims[0],self.observation_dims[1]), np.uint8)
#####################
# Followings are not used yet
self.syncLaser = False
self.syncPose = False
self.syncGoal = False
self.syncDir = False
self.sentTime = 0
#####################
self.poseX = 0.0
self.poseY = 0.0
self.prevPoseX = 0.0
self.prevPoseY = 0.0
self.goalX = 0.0
self.goalY = 0.0
self.dir = 0.0
self.prevDist = 0.0
self.terminal = 0
self.sendTerminal = 0
self.clock = Clock()
self.lastClock = Clock()
rospy.wait_for_service('reset_positions')
self.resetStage = rospy.ServiceProxy('reset_positions', EmptySrv)
# Publishers:
self.pub_action_ = rospy.Publisher("dqn/selected_action",Int8,queue_size=1)
self.pub_new_goal_ = rospy.Publisher("dqn/new_goal",EmptyMsg,queue_size=1)
self.pub_rew_ = rospy.Publisher("dqn/lastreward",Float32,queue_size=1)
# Subscribers:
rospy.Subscriber('bridge/laser_image', Image, self.laserCB,queue_size=1)
rospy.Subscriber('bridge/current_dir', Float32, self.directionCB,queue_size=1)
rospy.Subscriber('bridge/impact', EmptyMsg, self.impactCB,queue_size=1)
rospy.Subscriber('bridge/current_pose', Pose, self.positionCB,queue_size=1)
rospy.Subscriber('bridge/goal_pose', Pose, self.targetCB,queue_size=1)
rospy.Subscriber('clock', Clock, self.stepCB,queue_size=1)
def step(self, action, is_training=False):
self.prevPoseX = self.poseX
self.prevPoseY = self.poseY
if action == -1:
# Step with random action
action = int(random.random()*(self.action_size))
msg = Int8()
msg.data = action
self.pub_action_.publish( msg)
if self.display:
cv2.imshow("Screen", self.screen)
#cv2.waitKey(9)
dist = (self.poseX - self.goalX)**2 + (self.poseY - self.goalY)**2
reward = (self.prevDist - dist)/10.0
self.prevDist = dist
if self.terminal == 1:
reward -= 900
#self.new_random_game()
if dist < 0.9:
reward += 300
newStateMSG = EmptyMsg()
self.pub_new_goal_.publish( newStateMSG)
# cv2.waitKey(30)
# Add whatever info you want
info = ""
#rospy.loginfo("Episede ended, reward: %g", reward)
while(self.clock == self.lastClock):
pass
self.lastClock = self.clock
if self.terminal == 2:
self.sendTerminal = 1
if self.terminal == 1:
# rewd = Float32()
# rewd.data = reward
# self.pub_rew_.publish( rewd)
self.terminal = 2
return self.screen, reward, self.sendTerminal, info
#observation, reward, terminal, info = self.env.step(action)
#return self.preprocess(observation), reward, terminal, info
def __init__(self,target_topic):
#styleMap = ['none','astronaut','clown','wizard','dinosaur','animal','doctor','surgeon','teacher']
self.current_msg = None
self.last_msg = None
self.buff_msg = None
self.key = False
self.task = []
self.dm = RobotManager()
self.lastColor = 1
self.currentColor = 1
self.positive_flag = False
self.styleDict = {}
self.styleSet = {}
self.styleMap = ['none','astronaut','clown','wizard','dinosaur','animal','doctor','surgeon','teacher']#newly added 'animal','doctor','surgeon','teacher'
self.colorTask = 1
#temporary solution # 0 hat, 1 eyes, 2 glasses, 3 clothing, 4 shoes, 5 acc
self.styleSet['none'] = ['no hat', 'no face accessory','no costume','regular shoes','no accessory']
self.styleSet['astronaut'] = ["astrounaut helmet","astronaut face accessory","astronaut costume","astronaut shoes","a flag"]
self.styleSet['clown'] = ["clown hat","clown face","clown costume","clown shoes","a horn"]
self.styleSet['wizard'] = ["wizard hat","wizard eye","wizard costume","wizard shoes","a wand"]
self.styleSet['dinosaur'] = ['dinosaur hat', 'dino sunglasses','dinosaur costume','dinosaur shoes','a mini dinosaur']
self.styleSet['animal'] = ['animal hat','animal face','animal costume','animal shoes','a piece of meat']
self.styleSet['doctor'] = ['doctor hat','doctor face accessory','doctor costume','doctor shoes','a document']
self.styleSet['surgeon'] = ['surgeon hat','surgeon face accessory','surgeon costume','surgeon shoes','a surgeon document']
self.styleSet['teacher'] = ['teacher hat','teacher sunglasses','teacher costume','teacher shoes','an apple']
self.game_listener = rospy.Subscriber(target_topic, String, callback = self.callback)
#self.speech_state_listener = rospy.Subscriber("speech_state", String, callback = self.callback2)
self.mouth_pub = rospy.Publisher('mouth_color', Int8, queue_size=10)
self.breath_count = 0
#formal solution
#styleSet['none'] = ['none','none','none','none','none']
###########################################################
#style dictionary
#formal solution
for elem in self.styleMap:
for clothes in self.styleSet[elem]:
self.styleDict[clothes] = elem
self.styleDict['no face accessory'] = 'none'
# Xray Game
# mapX = ["MAKI's eye","MAKI's ear","MAKI's nose"]
# for part in mapX:
# if part not in dialog:
# dialog[part] = "This part is " + part
#print dialog.keys()