pubsub.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码
def pull_and_publish_event():
    oauth_tokens = db.OAuthToken.query(
        db.OAuthToken.valid_token == True,
        ndb.OR(db.OAuthToken.next_post_time < datetime.datetime.now(), db.OAuthToken.next_post_time == None)
    ).fetch(100)
    q1 = taskqueue.Queue(EVENT_PULL_QUEUE_HIGH)
    q2 = taskqueue.Queue(EVENT_PULL_QUEUE)
    for token in oauth_tokens:
        logging.info("Can post to OAuthToken %s: %s", token.queue_id(), token)
        tasks = q1.lease_tasks_by_tag(120, 1, token.queue_id())
        q = q1
        if not tasks:
            tasks = q2.lease_tasks_by_tag(120, 1, token.queue_id())
            q = q2
        logging.info("Fetching %d tasks with queue id %s", len(tasks), token.queue_id())
        if tasks:
            # Should only be one task
            if len(tasks) != 1:
                logging.error('Found too many tasks in our lease_tasks_by_tag: %s', len(tasks))
            task = tasks[0]
            # Backwards compatibility for items in the queue
            if '{' not in task.payload:
                data = {
                    'type': 'event',
                    'event_id': task.payload,
                }
            else:
                data = json.loads(task.payload)
            logging.info('Processing data payload: %r', data)
            posted = _post_data_with_authtoken(data, token)
            q.delete_tasks(task)

            # Only mark it up for delay, if we actually posted...
            if posted:
                next_post_time = datetime.datetime.now() + datetime.timedelta(seconds=token.time_between_posts)
                token = token.key.get()
                token.next_post_time = next_post_time
                token.put()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号