def test_get_or_create_db():
server = BootstrapServer("http://test.test:5984")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/test", status=404, body=""
)
def create_test_db(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/test", status=200
)
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/test", status=500
)
return 201, headers, ""
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/test", body=create_test_db
)
assert "test" not in server
test_db = server.get_or_create("test")
assert "test" in server
python类PUT的实例源码
def test_get_or_create_db():
server = Server("http://test.test:5984")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/test", status=404, body=""
)
def create_test_db(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/test", status=200
)
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/test", status=500
)
return 201, headers, ""
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/test", body=create_test_db
)
assert "test" not in server
test_db = server.get_or_create("test")
assert "test" in server
def test_upload(self):
httpretty.register_uri(
httpretty.PUT,
OnedriveAPIClient.DOWNLOAD_URL[1].format(path=self._get_path()),
body='')
self.client.upload(self.chunk, TEST_CHUNK_BODY)
def test_upload(self):
httpretty.register_uri(
httpretty.PUT,
OnedriveAPIClient.DOWNLOAD_URL[1].format(path=self._get_path()),
body='')
self.client.upload(self.chunk, TEST_CHUNK_BODY)
def test_upload(self):
httpretty.register_uri(
httpretty.PUT,
OnedriveAPIClient.DOWNLOAD_URL[1].format(path=self._get_path()),
body='')
self.client.upload(self.chunk, TEST_CHUNK_BODY)
def test_create_user():
server = BootstrapServer("http://test.test:5984")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_users"
)
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest",
status=201
)
server.create_user("test", "test")
def test_update(self):
"""Function defined to test paystackapi plan update."""
httpretty.register_uri(
httpretty.PUT,
"https://api.paystack.co/plan/78",
content_type='text/json',
body='{"status": true, "contributors": true}',
status=201,
)
response = Plan.update(plan_id=78, interval="monthly", amount=70000)
self.assertEqual(response['status'], True)
def test_update(self):
"""Function defined to test paystackapi customer update."""
httpretty.register_uri(
httpretty.PUT,
"https://api.paystack.co/customer/4013",
content_type='text/json',
body='{"status": true, "contributors": true}',
status=201,
)
response = Customer.update(customer_id=4013, first_name='andela')
self.assertEqual(response['status'], True)
def test_put_method_called(self):
httpretty.register_uri(
httpretty.PUT,
"https://api.paystack.co/customer/4013",
content_type='text/json',
body='{"status": true, "contributors": null}',
status=302,
)
req = PayStackRequests()
response = req.put('customer/4013',
data={'test': 'requests'})
self.assertTrue(response['status'])
def test_init_without_cloud_server(
config, push_design_documents, get_or_create, generate_config
):
runner = CliRunner()
generate_config.return_value = {"test": {"test": "test"}}
httpretty.register_uri(
httpretty.GET, "http://localhost:5984/_config/test/test",
body='"test_val"'
)
httpretty.register_uri(
httpretty.PUT, "http://localhost:5984/_config/test/test"
)
# Show -- Should throw an error because no local server is selected
res = runner.invoke(show)
assert res.exit_code, res.output
# Init -- Should work and push the design documents but not replicate
# anything
res = runner.invoke(init)
assert res.exit_code == 0, res.exception or res.output
assert get_or_create.call_count == len(all_dbs)
assert push_design_documents.call_count == 1
push_design_documents.reset_mock()
# Show -- Should work
res = runner.invoke(show)
assert res.exit_code == 0, res.exception or res.output
# Init -- Should throw an error because a different database is already
# selected
res = runner.invoke(init, ["--db_url", "http://test.test:5984"])
assert res.exit_code, res.output
def test_user_with_cloud_server(config):
runner = CliRunner()
# Register -- Should work
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_users"
)
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest",
status=201
)
res = runner.invoke(register, input="test\ntest\ntest\n")
assert res.exit_code == 0, res.exception or res.output
# Login -- Should work
httpretty.register_uri(
httpretty.GET, "http://test.test:5984/_session"
)
res = runner.invoke(login, input="test\ntest\n")
assert res.exit_code == 0, res.exception or res.output
# Login -- Should throw an error because you're already logged in as a
# different user
res = runner.invoke(login, input="test2\ntest2\n")
assert res.exit_code, res.output
assert isinstance(res.exception, SystemExit)
# Logout -- Should work
res = runner.invoke(logout)
assert res.exit_code == 0, res.exception or res.output
def test_create_user():
server = Server("http://test.test:5984")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_users"
)
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest",
status=201
)
server.create_user("test", "test")
def given_an_existing_user(self):
self.start_mocking_http()
self.fake_response('/users/foo', file='user-foo.json')
self.expect_call('/users/foo', httpretty.PUT)
def given_an_existing_user(self):
self.start_mocking_http()
self.fake_response('/users/admin', file='user-admin.json')
self.expect_call('/users/admin', httpretty.PUT)
def given_an_existing_user(self):
self.start_mocking_http()
self.fake_response('/users/admin', file='user-admin.json')
self.expect_call('/users/admin', httpretty.PUT)
def given_an_existing_user(self):
self.start_mocking_http()
self.fake_response('/users/admin', file='user-admin.json')
self.expect_call('/users/admin', httpretty.PUT)
def given_an_existing_user(self):
self.start_mocking_http()
self.fake_response('/users/giddy', file='user-giddy.json')
self.expect_call('/users/giddy', httpretty.PUT)
def swagger_stub(swagger_files_url):
"""Fixture to stub a microservice from swagger files.
To use this fixture you need to define a swagger fixture named
swagger_files_url with the path to your swagger files, and the url to stub.
Then just add this fixture to your tests and your request pointing to the
urls in swagger_files_url will be managed by the stub.
Example:
@pytest.fixture
def swagger_files_url():
return [('tests/swagger.yaml', 'http://localhost:8000')]
"""
httpretty.enable()
for i in swagger_files_url: # Get all given swagger files and url
base_url = i[1]
s = SwaggerParser(i[0])
swagger_url[base_url] = s
# Register all urls
httpretty.register_uri(
httpretty.GET, re.compile(base_url + r'/.*'),
body=get_data_from_request)
httpretty.register_uri(
httpretty.POST, re.compile(base_url + r'/.*'),
body=get_data_from_request)
httpretty.register_uri(
httpretty.PUT, re.compile(base_url + r'/.*'),
body=get_data_from_request)
httpretty.register_uri(
httpretty.PATCH, re.compile(base_url + r'/.*'),
body=get_data_from_request)
httpretty.register_uri(
httpretty.DELETE, re.compile(base_url + r'/.*'),
body=get_data_from_request)
memory[base_url] = StubMemory(s)
yield memory[base_url]
# Close httpretty
httpretty.disable()
httpretty.reset()
def test_replicate():
# Start a replication
server = BootstrapServer("http://test.test:5984")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=404
)
def replicate_test_src(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=200, etag="a"
)
return 201, headers, json.dumps({
"id": "test_src", "rev": "a", "ok": True
})
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_replicator/test",
status=201, content_type="application/json", body=replicate_test_src
)
server.replicate("test", "test_src", "test_dest", continuous=True)
assert "test" in server["_replicator"]
# Make sure replicate is idempotent
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_replicator/test_src",
status=500
)
server.replicate("test", "test_src", "test_dest", continuous=True)
assert "test" in server["_replicator"]
# Cancel the replication
def cancel_test_replication(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=404
)
return 200, headers, ""
httpretty.register_uri(
httpretty.DELETE, "http://test.test:5984/_replicator/test",
status=200, body=cancel_test_replication
)
server.cancel_replication("test")
assert "test" not in server["_replicator"]
def test_push_design_documents():
server = BootstrapServer("http://test.test:5984")
tempdir = tempfile.mkdtemp()
try:
test_db_path = os.path.join(tempdir, "test")
os.mkdir(test_db_path)
hidden_db_path = os.path.join(tempdir, ".test")
os.mkdir(hidden_db_path)
views_path = os.path.join(test_db_path, "views")
os.mkdir(views_path)
test_view_path = os.path.join(views_path, "test")
os.mkdir(test_view_path)
map_path = os.path.join(test_view_path, "map.js")
with open(map_path, "w+") as f:
f.write("test")
hidden_map_path = os.path.join(test_view_path, ".test.js")
with open(hidden_map_path, "w+") as f:
f.write("test")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/test"
)
def create_design_doc(request, uri, headers):
obj = {
"_id": "_design/openag",
"views": {
"test": {
"map": "test"
}
}
}
if json.loads(request.body) == obj:
return 200, headers, json.dumps({
"id": "_design/openag",
"rev": "a"
})
else:
return 500, headers, ""
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/test/_design/openag",
status=404
)
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/test/_design/openag",
body=create_design_doc, content_type="application/json"
)
server.push_design_documents(tempdir)
finally:
shutil.rmtree(tempdir)
def test_replicate():
# Start a replication
server = Server("http://test.test:5984")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=404
)
def replicate_test_src(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=200, etag="a"
)
return 201, headers, json.dumps({
"id": "test_src", "rev": "a", "ok": True
})
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_replicator/test",
status=201, content_type="application/json", body=replicate_test_src
)
server.replicate("test", "test_src", "test_dest", continuous=True)
assert "test" in server["_replicator"]
# Make sure replicate is idempotent
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_replicator/test_src",
status=500
)
server.replicate("test", "test_src", "test_dest", continuous=True)
assert "test" in server["_replicator"]
# Cancel the replication
def cancel_test_replication(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=404
)
return 200, headers, ""
httpretty.register_uri(
httpretty.DELETE, "http://test.test:5984/_replicator/test",
status=200, body=cancel_test_replication
)
server.cancel_replication("test")
assert "test" not in server["_replicator"]