def test_checkpoint_access(ctx, checkpoints_path):
"""Test checkpoints access"""
try:
checkpoint_set = sbds.checkpoints.required_checkpoints_for_range(
path=checkpoints_path, start=1, end=0)
with fileinput.FileInput(
mode='r',
files=checkpoint_set.checkpoint_paths,
openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks:
for block in blocks:
block_num = json.loads(block)['block_num']
if block_num:
click.echo(
'Success: loaded block %s' % block_num, err=True)
ctx.exit(code=0)
else:
click.echo('Failed to load block', err=True)
ctx.exit(code=127)
except Exception as e:
click.echo('Fail: %s' % e, err=True)
ctx.exit(code=127)
评论列表
文章目录