decision_task_poller.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:botoflow 作者: boto 项目源码 文件源码
def single_poll(self, next_page_token=None):
        poll_time = time.time()
        try:
            kwargs = {'domain': self.domain,
                      'taskList': {'name': self.task_list},
                      'identity': self.identity}
            if next_page_token is not None:
                kwargs['nextPageToken'] = next_page_token
            # old botocore throws TypeError when unable to establish SWF connection
            return self.worker.client.poll_for_decision_task(**kwargs)

        except KeyboardInterrupt:
            # sleep before actually exiting as the connection is not yet closed
            # on the other end
            sleep_time = 60 - (time.time() - poll_time)
            six.print_("Exiting in {0}...".format(sleep_time), file=sys.stderr)
            time.sleep(sleep_time)
            raise
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号