def is_oneiric():
if path.exists('/etc/issue'):
rospy.logdebug('/etc/issue exists')
with open('/etc/issue') as f:
contents = f.readline()
rospy.logdebug('contents are {0}'.format(contents))
return '11.10' in contents
else:
rospy.logdebug('/etc/issue does not exist')
python类logdebug()的实例源码
def is_lucid_or_maverick():
if path.exists('/etc/issue'):
rospy.logdebug('/etc/issue exists')
with open('/etc/issue') as f:
contents = f.readline()
rospy.logdebug('contents are {0}'.format(contents))
return '10.04' in contents or '10.10' in contents
else:
rospy.logdebug('/etc/issue does not exist')
def update_filter(self, timestep, estimate, ranges):
"""Update position filter.
Args:
timestep (float): Time elapsed since last update.
estimate (StateEstimate): Position estimate to update.
ranges (list of floats): Range measurements.
Returns:
new_estimate (StateEstimate): Updated position estimate.
outlier_flag (bool): Flag indicating whether the measurement was rejected as an outlier.
"""
num_of_units = len(ranges)
x = estimate.state
P = estimate.covariance
# Compute process matrix and covariance matrices
F, Q, R = self._compute_process_and_covariance_matrices(timestep)
# rospy.logdebug('F: {}'.format(F))
# rospy.logdebug('Q: {}'.format(Q))
# rospy.logdebug('R: {}'.format(R))
# Prediction
x = np.dot(F, x)
P = np.dot(F, np.dot(P, F.T)) + Q
# Update
n = np.copy(x)
H = np.zeros((num_of_units, x.size))
z = np.zeros((num_of_units, 1))
h = np.zeros((num_of_units, 1))
for i in xrange(self.ikf_iterations):
n, K, outlier_flag = self._ikf_iteration(x, n, ranges, h, H, z, estimate, R)
if outlier_flag:
new_estimate = estimate
else:
new_state = n
new_covariance = np.dot((np.eye(6) - np.dot(K, H)), P)
new_estimate = UWBTracker.StateEstimate(new_state, new_covariance)
return new_estimate, outlier_flag
def handle_process(self, proc, err):
"""
Takes a running subprocess.Popen object `proc`, rosdebugs everything it
prints to stdout, roswarns everything it prints to stderr, and raises
`err` if it fails
"""
poll = select.poll()
poll.register(proc.stdout)
poll.register(proc.stderr)
while proc.poll() is None and not rospy.is_shutdown():
res = poll.poll(1)
for fd, evt in res:
if not (evt & select.POLLIN):
continue
if fd == proc.stdout.fileno():
line = proc.stdout.readline().strip()
if line:
rospy.logdebug(line)
elif fd == proc.stderr.fileno():
line = proc.stderr.readline().strip()
if line:
rospy.logwarn(line)
if proc.poll():
proc.terminate()
proc.wait()
raise RuntimeError("Process interrupted by ROS shutdown")
if proc.returncode:
raise err
def setUp(self):
self.namespace = rospy.get_namespace()
rospy.logdebug("Initializing test_publish_to_topics in namespace:" +
self.namespace)
self.variables = [("air_flush_on", 1),
("air_temperature", 23),
("light_intensity_blue", 1),
("light_intensity_red", 1),
("light_intensity_white", 1),
("nutrient_flora_duo_a", 5),
("nutrient_flora_duo_b", 5),
("water_potential_hydrogen", 6)
]
# self.topic_ending = ["raw", "measured", "commanded", "desired"]
rospy.init_node(NAME, log_level=rospy.DEBUG)
def callback(self, data):
print(data)
rospy.logdebug("Received message: {}".format(data))
def connect(self):
if self.pseudo:
rospy.loginfo('Connected to pseudo sensor')
return
try:
self.serial = serial.Serial(self.serial_port, 19200, timeout=1)
rospy.logdebug("self.serial.isOpen() = {}".format(self.serial.isOpen()))
if not self.sensor_is_connected:
self.sensor_is_connected = True
rospy.loginfo('Connected to sensor')
except:
if self.sensor_is_connected:
self.sensor_is_connected = False
rospy.logwarn('Unable to connect to sensor')
def trace(msg, *args):
if TRACE:
msg = '\nTRACE> ' + msg
rospy.logdebug(msg, *args)
def handleJointAngles(self, msg):
rospy.logdebug("Received a joint angle target")
try:
# Note: changeAngles() and setAngles() are non-blocking functions.
if (msg.relative==0):
self.motionProxy.setAngles(list(msg.joint_names), list(msg.joint_angles), msg.speed)
else:
self.motionProxy.changeAngles(list(msg.joint_names), list(msg.joint_angles), msg.speed)
except RuntimeError,e:
rospy.logerr("Exception caught:\n%s", e)
def handleJointStiffness(self, msg):
rospy.logdebug("Received a joint angle stiffness")
try:
self.motionProxy.setStiffnesses(list(msg.name), list(msg.effort))
except RuntimeError,e:
rospy.logerr("Exception caught:\n%s", e)
def executeJointTrajectoryAction(self, goal):
rospy.loginfo("JointTrajectory action executing");
names, angles, times = self.jointTrajectoryGoalMsgToAL(goal)
rospy.logdebug("Received trajectory for joints: %s times: %s", str(names), str(times))
rospy.logdebug("Trajectory angles: %s", str(angles))
task_id = None
running = True
#Wait for task to complete
while running and not self.jointTrajectoryServer.is_preempt_requested() and not rospy.is_shutdown():
#If we haven't started the task already...
if task_id is None:
# ...Start it in another thread (thanks to motionProxy.post)
task_id = self.motionProxy.post.angleInterpolation(names, angles, times, (goal.relative==0))
#Wait for a bit to complete, otherwise check we can keep running
running = self.motionProxy.wait(task_id, self.poll_rate)
# If still running at this point, stop the task
if running and task_id:
self.motionProxy.stop( task_id )
jointTrajectoryResult = JointTrajectoryResult()
jointTrajectoryResult.goal_position.header.stamp = rospy.Time.now()
jointTrajectoryResult.goal_position.position = self.motionProxy.getAngles(names, True)
jointTrajectoryResult.goal_position.name = names
if not self.checkJointsLen(jointTrajectoryResult.goal_position):
self.jointTrajectoryServer.set_aborted(jointTrajectoryResult)
rospy.logerr("JointTrajectory action error in result: sizes mismatch")
elif running:
self.jointTrajectoryServer.set_preempted(jointTrajectoryResult)
rospy.logdebug("JointTrajectory preempted")
else:
self.jointTrajectoryServer.set_succeeded(jointTrajectoryResult)
rospy.loginfo("JointTrajectory action done")
def executeJointStiffnessAction(self, goal):
rospy.loginfo("JointStiffness action executing");
names, angles, times = self.jointTrajectoryGoalMsgToAL(goal)
rospy.logdebug("Received stiffness trajectory for joints: %s times: %s", str(names), str(times))
rospy.logdebug("stiffness values: %s", str(angles))
task_id = None
running = True
#Wait for task to complete
while running and not self.jointStiffnessServer.is_preempt_requested() and not rospy.is_shutdown():
#If we haven't started the task already...
if task_id is None:
# ...Start it in another thread (thanks to motionProxy.post)
task_id = self.motionProxy.post.stiffnessInterpolation(names, angles, times)
#Wait for a bit to complete, otherwise check we can keep running
running = self.motionProxy.wait(task_id, self.poll_rate)
# If still running at this point, stop the task
if running and task_id:
self.motionProxy.stop( task_id )
jointStiffnessResult = JointTrajectoryResult()
jointStiffnessResult.goal_position.header.stamp = rospy.Time.now()
jointStiffnessResult.goal_position.position = self.motionProxy.getStiffnesses(names)
jointStiffnessResult.goal_position.name = names
if not self.checkJointsLen(jointStiffnessResult.goal_position):
self.jointStiffnessServer.set_aborted(jointStiffnessResult)
rospy.logerr("JointStiffness action error in result: sizes mismatch")
elif running:
self.jointStiffnessServer.set_preempted(jointStiffnessResult)
rospy.logdebug("JointStiffness preempted")
else:
self.jointStiffnessServer.set_succeeded(jointStiffnessResult)
rospy.loginfo("JointStiffness action done")
def onTactileChanged(self, strVarName, value, strMessage):
"Called when tactile touch status changes in ALMemory"
if strVarName == "FrontTactilTouched":
self.tactile.button = self.tactileTouchFrontButton
elif strVarName == "MiddleTactilTouched":
self.tactile.button = self.tactileTouchMiddleButton
elif strVarName == "RearTactilTouched":
self.tactile.button = self.tactileTouchRearButton
self.tactile.state = int(value);
self.tactilePub.publish(self.tactile)
rospy.logdebug("tactile touched: name=%s, value=%d, message=%s.", strVarName, value, strMessage);
def onBumperChanged(self, strVarName, value, strMessage):
"Called when bumper status changes in ALMemory"
if strVarName == "RightBumperPressed":
self.bumper.bumper = self.bumperRightButton
elif strVarName == "LeftBumperPressed":
self.bumper.bumper = self.bumperLeftButton
self.bumper.state = int(value);
self.bumperPub.publish(self.bumper)
rospy.logdebug("bumper pressed: name=%s, value=%d, message=%s.", strVarName, value, strMessage);
def _image_cb(self, img):
self._image_counter += 1
if self._image_counter > self._THROTTLE:
rospy.logdebug('publish image')
self._image_pub.publish(img)
self._image_counter = 0
def _depth_cb(self, img):
self._depth_counter += 1
if self._depth_counter > self._THROTTLE:
rospy.logdebug('publish depth')
self._depth_pub.publish(img)
self._depth_counter = 0
def _depth_info_cb(self, info):
rospy.logdebug('publish depth info')
self._depth_info_pub.publish(info)
def __init__(self):
if not hasattr(self, 'writer'):
rospy.logdebug("initializing summary writer.")
now = datetime.datetime.now()
self.directory = self.get_output_folder('summaries') + now.strftime("/%Y-%m-%d-%s")
self.writer = tf.train.SummaryWriter(self.directory)
def __init__(self, batch_size=1, output_size=[28, 28], input=""):
self.name = 'inputlayer-%08x' % random.getrandbits(32)
self.output_size = output_size
self.input = input
self.batch_size = batch_size
self.batch = []
with tf.name_scope(self.name) as n_scope:
self.name_scope = n_scope
self.input_placeholder = tf.placeholder(dtype=tf.float32, shape=(self.batch_size, output_size[0], output_size[1], 1), name='input')
rospy.logdebug("?? Input Layer initalized")
def _robot_reboot(self):
"""
Perform a soft-reset of the Create
"""
msg ="Soft-rebooting turtlebot to passive mode."
rospy.logdebug(msg)
self._diagnostics.node_status(msg,"warn")
self._set_digital_outputs([0, 0, 0])
self.robot.soft_reset()
time.sleep(2.0)