def replication_worker(node: str):
if not running:
return
queue = get_queue_for_node(node)
logger.debug('[-> {node}] Starting...'.format(
node=node,
))
INTERVALS[node] = MIN_INTERVAL
conn = None
if config.args.proxy:
conn = aiohttp.ProxyConnector(proxy=config.args.proxy)
with aiohttp.ClientSession(connector=conn) as session:
try:
while True:
qitem = yield from queue_getter(queue)
if qitem:
itemid, obj = qitem
else:
continue
try:
yield from perform_operation(session, node, obj)
INTERVALS[node] = MIN_INTERVAL
logger.debug('[-> {node}] Operation replicated successfully'.format(node=node))
except (IOError, aiohttp.errors.ClientError):
logger.exception('[-> {node}] Error during replication'.format(node=node))
yield from asyncio.sleep(INTERVALS[node])
# Slow down repetitions
INTERVALS[node] = min(INTERVALS[node] * 2, MAX_INTERVAL)
else:
queue.delete(itemid)
except asyncio.CancelledError:
logger.debug('[-> {node}] Cancelled.'.format(
node=node,
))
logger.debug('[-> {node}] Goodbye.'.format(
node=node,
))
评论列表
文章目录