pool.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:sardana 作者: sardana-org 项目源码 文件源码
def iterMove(self, new_pos, timeout=None):
        if operator.isSequenceType(new_pos):
            new_pos = new_pos[0]
        state, pos = self.getAttribute("state"), self.getAttribute("position")

        evt_wait = self._getEventWait()
        evt_wait.connect(state)
        evt_wait.lock()
        try:
            # evt_wait.waitEvent(DevState.MOVING, equal=False)
            time_stamp = time.time()
            try:
                self.getPositionObj().write(new_pos)
            except DevFailed as err_traceback:
                for err in err_traceback:
                    if err.reason == 'API_AttrNotAllowed':
                        raise RuntimeError('%s is already moving' % self)
                    else:
                        raise
            self.final_pos = new_pos
            # putting timeout=0.1 and retries=1 is a patch for the case when
            # the initial moving event doesn't arrive do to an unknown
            # tango/pytango error at the time
            evt_wait.waitEvent(DevState.MOVING, time_stamp,
                               timeout=0.1, retries=1)
        finally:
            evt_wait.unlock()
            evt_wait.disconnect()

        evt_iter_wait = AttributeEventIterator(state, pos)
        evt_iter_wait.lock()
        try:
            for evt_data in evt_iter_wait.events():
                src, value = evt_data
                if src == state and value != DevState.MOVING:
                    raise StopIteration
                yield value
        finally:
            evt_iter_wait.unlock()
            evt_iter_wait.disconnect()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号