def forwarder(tasks, interval, batch_size, source, dest):
'''Forward items from one storage to another.'''
from .utils import RunFlag, load_manager, redis_client
from .store import QueueStore
log = logging.getLogger('dsq.forwarder')
if not tasks and not source:
print('--tasks or --source must be provided')
sys.exit(1)
s = QueueStore(redis_client(source)) if source else load_manager(tasks).queue
d = QueueStore(redis_client(dest))
run = RunFlag()
while run:
batch = s.take_many(batch_size)
if batch['schedule'] or batch['queues']:
try:
d.put_many(batch)
except Exception:
s.put_many(batch)
log.exception('Forward error')
raise
else:
time.sleep(interval)
评论列表
文章目录