def pull_and_publish_event():
oauth_tokens = db.OAuthToken.query(
db.OAuthToken.valid_token == True,
ndb.OR(db.OAuthToken.next_post_time < datetime.datetime.now(), db.OAuthToken.next_post_time == None)
).fetch(100)
q1 = taskqueue.Queue(EVENT_PULL_QUEUE_HIGH)
q2 = taskqueue.Queue(EVENT_PULL_QUEUE)
for token in oauth_tokens:
logging.info("Can post to OAuthToken %s: %s", token.queue_id(), token)
tasks = q1.lease_tasks_by_tag(120, 1, token.queue_id())
q = q1
if not tasks:
tasks = q2.lease_tasks_by_tag(120, 1, token.queue_id())
q = q2
logging.info("Fetching %d tasks with queue id %s", len(tasks), token.queue_id())
if tasks:
# Should only be one task
if len(tasks) != 1:
logging.error('Found too many tasks in our lease_tasks_by_tag: %s', len(tasks))
task = tasks[0]
# Backwards compatibility for items in the queue
if '{' not in task.payload:
data = {
'type': 'event',
'event_id': task.payload,
}
else:
data = json.loads(task.payload)
logging.info('Processing data payload: %r', data)
posted = _post_data_with_authtoken(data, token)
q.delete_tasks(task)
# Only mark it up for delay, if we actually posted...
if posted:
next_post_time = datetime.datetime.now() + datetime.timedelta(seconds=token.time_between_posts)
token = token.key.get()
token.next_post_time = next_post_time
token.put()
评论列表
文章目录