def test_upload_new(empty_registry: FlowRegistry):
with requests_mock.Mocker() as mock:
mock.get('http://dpp/api/refresh', status_code=200)
token = generate_token('me')
ret = upload(token, spec, empty_registry, public_key)
assert ret['success']
assert ret['dataset_id'] == 'me/id'
assert ret['flow_id'] == 'me/id/1'
assert ret['errors'] == []
specs = list(empty_registry.list_datasets())
assert len(specs) == 1
first = specs[0]
assert first.owner == 'me'
assert first.identifier == 'me/id'
assert first.spec == spec
revision = empty_registry.get_revision('me/id')
assert revision['revision'] == 1
assert revision['status'] == 'pending'
pipelines = list(empty_registry.list_pipelines_by_id('me/id/1'))
assert len(pipelines) == 7
pipeline = pipelines[0]
assert pipeline.status == 'pending'
pipelines = list(empty_registry.list_pipelines())
assert len(pipelines) == 7
python类Mocker()的实例源码
def test_get_all_drives(self):
with requests_mock.Mocker() as mock:
def callback(request, context):
response_drives = [get_data('drive.json'), get_data('drive.json')]
ids = [str(i) for i in range(0, 2)]
for d in response_drives:
d['id'] = ids.pop(0)
context.status_code = codes.ok
return {'value': response_drives}
mock.get(self.account.client.API_URI + '/drives', json=callback)
all_drives = self.root.get_all_drives()
all_ids = [str(i) for i in range(0, 2)]
for i, x in all_drives.items():
self.assertIn(i, all_ids)
self.assertIsInstance(x, drives.DriveObject)
all_ids.remove(i)
self.assertEqual(0, len(all_ids))
def use_item_collection(self, method_name, url, params):
item_set = get_data('item_collection.json')['value']
item_names = [i['name'] for i in item_set]
next_link = 'https://get_children'
with requests_mock.Mocker() as mock:
def callback(request, context):
data = {'value': [item_set.pop(0)]}
if len(item_set) > 0:
data['@odata.nextLink'] = next_link
context.status_code = codes.ok
return data
mock.get(url, json=callback)
mock.get(next_link, json=callback)
collection = getattr(self.drive, method_name)(**params)
received_names = []
while collection.has_next:
page = collection.get_next()
for i in page:
received_names.append(i.name)
self.assertListEqual(item_names, received_names)
def assert_create_dir(self, should_request_url, parent_id=None):
"""
https://github.com/OneDrive/onedrive-api-docs/blob/master/items/create.md
"""
folder_name = 'foo'
conflict_behavior = options.NameConflictBehavior.REPLACE
with requests_mock.Mocker() as mock:
def callback(request, context):
data = request.json()
self.assertEqual(folder_name, data['name'])
self.assertDictEqual({}, data['folder'])
self.assertEqual(conflict_behavior, data['@name.conflictBehavior'])
context.status_code = codes.created
return get_data('new_dir_item.json')
mock.post(should_request_url, json=callback)
item = self.drive.create_dir(name=folder_name, parent_id=parent_id, conflict_behavior=conflict_behavior)
self.assertIsInstance(item, items.OneDriveItem)
def test_update_item(self):
new_params = {
'item_id': '123',
'new_name': 'whatever.doc',
'new_description': 'This is a dummy description.',
'new_parent_reference': resources.ItemReference.build(drive_id='aaa', id='012'),
'new_file_system_info': facets.FileSystemInfoFacet(
created_time=str_to_datetime('1971-01-01T02:03:04Z'),
modified_time=str_to_datetime('2008-01-02T03:04:05.06Z'))
}
with requests_mock.Mocker() as mock:
def callback(request, context):
json = request.json()
self.assertEqual(json['name'], new_params['new_name'])
self.assertEqual(json['description'], new_params['new_description'])
self.assertDictEqual(json['parentReference'], new_params['new_parent_reference'].data)
self.assertDictEqual(json['fileSystemInfo'], new_params['new_file_system_info'].data)
return get_data('image_item.json')
mock.patch(self.drive.get_item_uri(new_params['item_id'], None), json=callback)
self.drive.update_item(**new_params)
def test_download_large_file(self):
self.drive.config = drive_config.DriveConfig({'max_get_size_bytes': 2})
in_data = b'12345'
output = io.BytesIO()
expected_ranges = ['0-1', '2-3', '4-4']
with requests_mock.Mocker() as mock:
def callback(request, context):
self.assertIn('Range', request.headers)
r = expected_ranges.pop(0)
self.assertEqual('bytes=' + r, request.headers['Range'])
f, t = request.headers['Range'].split('=', 1)[1].split('-')
context.status_code = codes.partial
return in_data[int(f): int(t) + 1]
mock.get(self.drive.get_item_uri(item_id='123', item_path=None) + '/content', content=callback)
self.drive.download_file(file=output, size=len(in_data), item_id='123')
self.assertEqual(in_data, output.getvalue())
def test_upload_small_file(self):
self.drive.config = drive_config.DriveConfig({'max_put_size_bytes': 10})
in_fd = io.BytesIO(b'12345')
with requests_mock.Mocker() as mock:
def callback(request, context):
name = request.path_url.split('/')[-2][:-1]
self.assertEqual('test', name)
self.assertEqual(5, len(request.body.getvalue()))
qs = request.path_url.split('?', 1)[1]
self.assertEqual('@name.conflictBehavior=' + options.NameConflictBehavior.FAIL, qs)
data = {'id': 'abc', 'name': name, 'size': 5, 'file': {}}
context.status_code = codes.created
return data
mock.put(self.drive.get_item_uri('123', None) + ':/test:/content', json=callback)
item = self.drive.upload_file('test', data=in_fd, size=5, parent_id='123',
conflict_behavior=options.NameConflictBehavior.FAIL)
self.assertIsInstance(item, items.OneDriveItem)
def test_copy_item(self):
new_parent = resources.ItemReference.build(id='123abc', path='/foo/bar')
new_name = '456.doc'
with requests_mock.Mocker() as mock:
def callback(request, context):
# Verify that necessary headers are properly set.
headers = request.headers
self.assertIn('Content-Type', headers)
self.assertEqual('application/json', headers['Content-Type'])
self.assertIn('Prefer', headers)
self.assertEqual('respond-async', headers['Prefer'])
# Verify that request body is correct.
body = request.json()
self.assertEqual(new_name, body['name'])
self.assertDictEqual(new_parent.data, body['parentReference'])
# Set response.
context.status_code = codes.accepted
context.headers['Location'] = 'https://foo.bar/monitor'
return None
mock.post(self.drive.get_item_uri(None, '123') + ':/action.copy', content=callback)
session = self.drive.copy_item(new_parent, item_path='123', new_name=new_name)
self.assertIsInstance(session, resources.AsyncCopySession)
def test_request_cls_none():
class Person(Model):
name = StringType()
class PersonAPI(HTTPEater):
request_cls = None
response_cls = Person
url = 'http://example.com/person'
expected_person = Person(dict(name='John'))
api = PersonAPI(name=expected_person.name)
with requests_mock.Mocker() as mock:
mock.get(
api.url,
json=expected_person.to_primitive(),
headers=JSON_HEADERS
)
actual_person = api()
assert actual_person == expected_person
def test_data_error_raised():
class Person(Model):
name = StringType(required=True, min_length=4)
class PersonAPI(HTTPEater):
request_cls = Person
response_cls = Person
url = 'http://example.com/person'
api = PersonAPI(name='John')
with pytest.raises(DataError):
with requests_mock.Mocker() as mock:
mock.get(
api.url,
json={'name': 'Joh'},
headers=CaseInsensitiveDict({
'Content-Type': 'application/json'
})
)
api()
def test_url_formatting():
class Person(Model):
name = StringType()
class GetPersonAPI(HTTPEater):
request_cls = Person
response_cls = Person
url = 'http://example.com/person/{request_model.name}/'
expected_url = 'http://example.com/person/John/'
api = GetPersonAPI(name='John')
assert api.url == expected_url
with requests_mock.Mocker() as mock:
mock.get(
expected_url,
json={'name': 'John'},
headers=JSON_HEADERS
)
response = api()
assert response.name == 'John'
def test_get_request_kwargs_url():
"""
Test that get_request_kwargs can manipulate the url.
"""
class URLManipulatingAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://not-the-real-url.com/'
def get_request_kwargs(self, request_model: Union[Model, None], **kwargs):
kwargs['url'] = 'http://the-real-url.com'
return kwargs
expected_url = 'http://the-real-url.com'
api = URLManipulatingAPI()
with requests_mock.Mocker() as mock:
mock.get(expected_url, json={}, headers=JSON_HEADERS)
api()
assert api.url == expected_url
def test_get_request_kwargs_method():
"""
Test that get_request_kwargs can manipulate the method.
"""
class MethodManipulatingAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://example.com/'
def get_request_kwargs(self, request_model: Union[Model, None], **kwargs):
kwargs['method'] = 'post'
return kwargs
api = MethodManipulatingAPI()
with requests_mock.Mocker() as mock:
mock.post(api.url, json={}, headers=JSON_HEADERS)
api()
assert api.method == 'post'
def test_get_request_kwargs_session():
"""
Test that get_request_kwargs can manipulate the session.
"""
class SessionManipulatingAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://example.com/'
def get_request_kwargs(self, request_model: Union[Model, None], **kwargs):
session = requests.Session()
session.auth = ('john', 's3cr3t')
kwargs['session'] = session
return kwargs
api = SessionManipulatingAPI()
with requests_mock.Mocker() as mock:
mock.get(api.url, json={}, headers=JSON_HEADERS)
api()
assert api.session.auth == ('john', 's3cr3t')
def test_requests_timeout():
class GetPersonAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://example.com/'
def timeout(*args, **kwargs): # pylint: disable=unused-argument
raise requests.Timeout()
api = GetPersonAPI()
with requests_mock.Mocker() as mock:
mock.get(
'http://example.com/',
text=timeout
)
with pytest.raises(EaterTimeoutError):
api()
def test_requests_connecterror():
class GetPersonAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://example.com/'
def connect(*args, **kwargs): # pylint: disable=unused-argument
raise requests.ConnectionError()
api = GetPersonAPI()
with requests_mock.Mocker() as mock:
mock.get(
'http://example.com/',
text=connect
)
with pytest.raises(EaterConnectError):
api()
def test_non_json_content_response():
class GetPersonAPI(HTTPEater):
request_cls = Model
response_cls = Model
url = 'http://example.com/'
api = GetPersonAPI()
with requests_mock.Mocker() as mock:
mock.get(
'http://example.com/',
text='Hello world',
headers=CaseInsensitiveDict({
'Content-Type': 'text/plain'
})
)
with pytest.raises(NotImplementedError):
api()
def stub_request_metadata(badge=False):
"""stub request classifiers, badge."""
if badge:
with requests_mock.Mocker() as mock:
with open('bootstrap_py/tests/data/badge.svg') as fobj:
svg_data = fobj.read()
mock.get(Update.badge_url,
text=svg_data,
status_code=200)
with requests_mock.Mocker() as mock:
with open('bootstrap_py/data/classifiers.txt') as fobj:
data = fobj.read()
mock.get(Classifiers.url,
text=data,
status_code=200)
return Classifiers()
def test_write_points_from_dataframe(self):
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n"
b"foo column_one=\"2\",column_two=2i,column_three=2.0 "
b"3600000000000\n"
)
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)
cli = DataFrameClient(database='db')
cli.write_points(dataframe, 'foo')
self.assertEqual(m.last_request.body, expected)
cli.write_points(dataframe, 'foo', tags=None)
self.assertEqual(m.last_request.body, expected)
def test_write_points_from_dataframe_with_period_index(self):
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
index=[pd.Period('1970-01-01'),
pd.Period('1970-01-02')],
columns=["column_one", "column_two",
"column_three"])
expected = (
b"foo column_one=\"1\",column_two=1i,column_three=1.0 0\n"
b"foo column_one=\"2\",column_two=2i,column_three=2.0 "
b"86400000000000\n"
)
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/write",
status_code=204)
cli = DataFrameClient(database='db')
cli.write_points(dataframe, "foo")
self.assertEqual(m.last_request.body, expected)
def test_write(self):
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.POST,
"http://localhost:8086/write",
status_code=204
)
cli = InfluxDBClient(database='db')
cli.write(
{"database": "mydb",
"retentionPolicy": "mypolicy",
"points": [{"measurement": "cpu_load_short",
"tags": {"host": "server01",
"region": "us-west"},
"time": "2009-11-10T23:00:00Z",
"fields": {"value": 0.64}}]}
)
self.assertEqual(
m.last_request.body,
b"cpu_load_short,host=server01,region=us-west "
b"value=0.64 1257894000000000000\n",
)
def test_write_points_toplevel_attributes(self):
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.POST,
"http://localhost:8086/write",
status_code=204
)
cli = InfluxDBClient(database='db')
cli.write_points(
self.dummy_points,
database='testdb',
tags={"tag": "hello"},
retention_policy="somepolicy"
)
self.assertEqual(
'cpu_load_short,host=server01,region=us-west,tag=hello '
'value=0.64 1257894000123456000\n',
m.last_request.body.decode('utf-8'),
)
def test_query(self):
example_response = (
'{"results": [{"series": [{"measurement": "sdfsdfsdf", '
'"columns": ["time", "value"], "values": '
'[["2009-11-10T23:00:00Z", 0.64]]}]}, {"series": '
'[{"measurement": "cpu_load_short", "columns": ["time", "value"], '
'"values": [["2009-11-10T23:00:00Z", 0.64]]}]}]}'
)
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
rs = self.cli.query('select * from foo')
self.assertListEqual(
list(rs[0].get_points()),
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
)
def test_create_retention_policy_default(self):
example_response = '{"results":[{}]}'
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.cli.create_retention_policy(
'somename', '1d', 4, default=True, database='db'
)
self.assertEqual(
m.last_request.qs['q'][0],
'create retention policy "somename" on '
'"db" duration 1d replication 4 default'
)
def test_create_retention_policy(self):
example_response = '{"results":[{}]}'
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.cli.create_retention_policy(
'somename', '1d', 4, database='db'
)
self.assertEqual(
m.last_request.qs['q'][0],
'create retention policy "somename" on '
'"db" duration 1d replication 4'
)
def test_get_list_retention_policies(self):
example_response = \
'{"results": [{"series": [{"values": [["fsfdsdf", "24h0m0s", 2]],'\
' "columns": ["name", "duration", "replicaN"]}]}]}'
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.assertListEqual(
self.cli.get_list_retention_policies(),
[{'duration': '24h0m0s',
'name': 'fsfdsdf', 'replicaN': 2}]
)
def test_get_list_users(self):
example_response = (
'{"results":[{"series":[{"columns":["user","admin"],'
'"values":[["test",false]]}]}]}'
)
with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.assertListEqual(
self.cli.get_list_users(),
[{'user': 'test', 'admin': False}]
)
def test_write_points_from_dataframe(self):
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
points = [
{
"points": [
["1", 1, 1.0, 0],
["2", 2, 2.0, 3600]
],
"name": "foo",
"columns": ["column_one", "column_two", "column_three", "time"]
}
]
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/db/db/series")
cli = DataFrameClient(database='db')
cli.write_points({"foo": dataframe})
self.assertListEqual(json.loads(m.last_request.body), points)
def test_write_points_from_dataframe_with_float_nan(self):
now = pd.Timestamp('1970-01-01 00:00+00:00')
dataframe = pd.DataFrame(data=[[1, float("NaN"), 1.0], [2, 2, 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["column_one", "column_two",
"column_three"])
points = [
{
"points": [
[1, None, 1.0, 0],
[2, 2, 2.0, 3600]
],
"name": "foo",
"columns": ["column_one", "column_two", "column_three", "time"]
}
]
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/db/db/series")
cli = DataFrameClient(database='db')
cli.write_points({"foo": dataframe})
self.assertListEqual(json.loads(m.last_request.body), points)
def test_write_points_from_dataframe_with_period_index(self):
dataframe = pd.DataFrame(data=[["1", 1, 1.0], ["2", 2, 2.0]],
index=[pd.Period('1970-01-01'),
pd.Period('1970-01-02')],
columns=["column_one", "column_two",
"column_three"])
points = [
{
"points": [
["1", 1, 1.0, 0],
["2", 2, 2.0, 86400]
],
"name": "foo",
"columns": ["column_one", "column_two", "column_three", "time"]
}
]
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
"http://localhost:8086/db/db/series")
cli = DataFrameClient(database='db')
cli.write_points({"foo": dataframe})
self.assertListEqual(json.loads(m.last_request.body), points)