def get_blocks(self, block_nums):
requests = (
{
'jsonrpc': '2.0', 'id': block_num, 'method': 'get_block',
'params': [block_num]
} for block_num in block_nums)
batched_requests = chunkify(requests, self.batch_request_size)
coros = (self.fetch(batch) for batch in batched_requests)
first_coros = islice(coros, 0, self.concurrent_tasks_limit)
futures = [asyncio.ensure_future(c) for c in first_coros]
logger.debug(f'inital futures:{len(futures)}')
start = time.perf_counter()
while futures:
await asyncio.sleep(0)
for f in futures:
try:
if f.done():
self._perf_history.append(time.perf_counter() - start)
result = f.result()
futures.remove(f)
logger.debug(f'futures:{len(futures)}')
try:
futures.append(asyncio.ensure_future(next(coros)))
except StopIteration as e:
logger.debug('StopIteration')
except concurrent.futures._base.CancelledError:
return
start = time.perf_counter()
yield result
except KeyboardInterrupt:
logger.debug('client.get blocks kbi')
for f in futures:
f.cancel()
self.close()
return
except Exception as e:
logger.exception(f'client.get_blocks error:{e}')
continue
评论列表
文章目录