def test_replicate():
# Start a replication
server = Server("http://test.test:5984")
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=404
)
def replicate_test_src(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=200, etag="a"
)
return 201, headers, json.dumps({
"id": "test_src", "rev": "a", "ok": True
})
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_replicator/test",
status=201, content_type="application/json", body=replicate_test_src
)
server.replicate("test", "test_src", "test_dest", continuous=True)
assert "test" in server["_replicator"]
# Make sure replicate is idempotent
httpretty.register_uri(
httpretty.PUT, "http://test.test:5984/_replicator/test_src",
status=500
)
server.replicate("test", "test_src", "test_dest", continuous=True)
assert "test" in server["_replicator"]
# Cancel the replication
def cancel_test_replication(request, uri, headers):
httpretty.reset()
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator"
)
httpretty.register_uri(
httpretty.HEAD, "http://test.test:5984/_replicator/test",
status=404
)
return 200, headers, ""
httpretty.register_uri(
httpretty.DELETE, "http://test.test:5984/_replicator/test",
status=200, body=cancel_test_replication
)
server.cancel_replication("test")
assert "test" not in server["_replicator"]
评论列表
文章目录