def test_upload_new(empty_registry: FlowRegistry):
with requests_mock.Mocker() as mock:
mock.get('http://dpp/api/refresh', status_code=200)
token = generate_token('me')
ret = upload(token, spec, empty_registry, public_key)
assert ret['success']
assert ret['dataset_id'] == 'me/id'
assert ret['flow_id'] == 'me/id/1'
assert ret['errors'] == []
specs = list(empty_registry.list_datasets())
assert len(specs) == 1
first = specs[0]
assert first.owner == 'me'
assert first.identifier == 'me/id'
assert first.spec == spec
revision = empty_registry.get_revision('me/id')
assert revision['revision'] == 1
assert revision['status'] == 'pending'
pipelines = list(empty_registry.list_pipelines_by_id('me/id/1'))
assert len(pipelines) == 7
pipeline = pipelines[0]
assert pipeline.status == 'pending'
pipelines = list(empty_registry.list_pipelines())
assert len(pipelines) == 7
评论列表
文章目录