def _start(self, *args, **kwargs):
new_pos = args[0]
if operator.isSequenceType(new_pos):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed as df:
for err in df:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
python类isSequenceType()的实例源码
def iterMove(self, new_pos, timeout=None):
if operator.isSequenceType(new_pos):
new_pos = new_pos[0]
state, pos = self.getAttribute("state"), self.getAttribute("position")
evt_wait = self._getEventWait()
evt_wait.connect(state)
evt_wait.lock()
try:
# evt_wait.waitEvent(DevState.MOVING, equal=False)
time_stamp = time.time()
try:
self.getPositionObj().write(new_pos)
except DevFailed as err_traceback:
for err in err_traceback:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
# putting timeout=0.1 and retries=1 is a patch for the case when
# the initial moving event doesn't arrive do to an unknown
# tango/pytango error at the time
evt_wait.waitEvent(DevState.MOVING, time_stamp,
timeout=0.1, retries=1)
finally:
evt_wait.unlock()
evt_wait.disconnect()
evt_iter_wait = AttributeEventIterator(state, pos)
evt_iter_wait.lock()
try:
for evt_data in evt_iter_wait.events():
src, value = evt_data
if src == state and value != DevState.MOVING:
raise StopIteration
yield value
finally:
evt_iter_wait.unlock()
evt_iter_wait.disconnect()
def _start(self, *args, **kwargs):
new_pos = args[0]
if operator.isSequenceType(new_pos):
new_pos = new_pos[0]
try:
self.write_attribute('position', new_pos)
except DevFailed, df:
for err in df:
if err.reason == 'API_AttrNotAllowed':
raise RuntimeError('%s is already moving' % self)
else:
raise
self.final_pos = new_pos
def appendBuffer(self, d):
if operator.isSequenceType(d):
if isinstance(d, (str, unicode)):
self._buff.append(d)
else:
self._buff.extend(d)
else:
self._buff.append(str(d))
def to_dtype_dformat(data):
"""Transforms the given data parameter (string/ or sequence of string or
sequence of sequence of string/:obj:`DataType`) into a tuple of two
elements (:obj:`DataType`, :obj:`DataFormat`).
:param data: the data information to be transformed
:type data: str or seq<str> or seq<seq<str>>
:return: a tuple <:obj:`DataType`, :obj:`DataFormat`> for the given data
:rtype: tuple<:obj:`DataType`, :obj:`DataFormat`>
"""
import operator
dtype, dformat = data, DataFormat.Scalar
if isinstance(data, (str, unicode)):
dtype, dformat = from_dtype_str(data)
elif operator.isSequenceType(data):
dformat = DataFormat.OneD
dtype = data[0]
if isinstance(dtype, str):
dtype, dformat2 = from_dtype_str(dtype)
if dformat2 == DataFormat.OneD:
dformat = DataFormat.TwoD
elif operator.isSequenceType(dtype):
dformat = DataFormat.TwoD
dtype = dtype[0]
if isinstance(dtype, str):
dtype, _ = from_dtype_str(dtype)
dtype = DTYPE_MAP.get(dtype, DataType.Invalid)
return dtype, dformat
def _outputBlock(self, line):
"""**Unofficial Macro API**.
Sends a line tagged as a block to the output
:param str line: line to be sent"""
if isinstance(line, (str, unicode)):
o = line
elif operator.isSequenceType(line):
o = "\n".join(line)
else:
o = str(line)
self._output("%s\n%s\n%s" % (Macro.BlockStart, o, Macro.BlockFinish))
def _reserveObjs(self, args):
"""**Internal method**. Used to reserve a set of objects for this
macro"""
for obj in args:
# isiterable
if not type(obj) in map(type, ([], ())):
# if not operator.isSequenceType(obj) or type(obj) in
# types.StringTypes:
obj = (obj,)
for sub_obj in obj:
if isinstance(sub_obj, PoolElement):
self.addObj(sub_obj)
def __prepareResult(self, out):
"""**Internal method**. Decodes the given output in order to be able to
send to the result channel
:param out: output value
:return: the output as a sequence of strings
"""
if out is None:
out = ()
if operator.isSequenceType(out) and not type(out) in types.StringTypes:
out = map(str, out)
else:
out = (str(out),)
return out
def point(self, lut, mode=None):
"Map image through lookup table"
self.load()
if not isSequenceType(lut):
# if it isn't a list, it should be a function
if self.mode in ("I", "I;16", "F"):
# check if the function can be used with point_transform
scale, offset = _getscaleoffset(lut)
return self._new(self.im.point_transform(scale, offset))
# for other modes, convert the function to a table
lut = map(lut, range(256)) * self.im.bands
if self.mode == "F":
# FIXME: _imaging returns a confusing error message for this case
raise ValueError("point operation not supported for this mode")
return self._new(self.im.point(lut, mode))
##
# Adds or replaces the alpha layer in this image. If the image
# does not have an alpha layer, it's converted to "LA" or "RGBA".
# The new layer must be either "L" or "1".
#
# @param im The new alpha layer. This can either be an "L" or "1"
# image having the same size as this image, or an integer or
# other color value.