def setUp(self):
client = mock.MagicMock()
bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)
logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
self.addCleanup(logging_patch.stop)
logging_patch.start()
self.system = TrailingOrders(client, bootstrap)
self.system.get_pending_orders = mock.MagicMock(return_value=[])
self.set_next_operation = mock.MagicMock()
next_operation_patcher = mock.patch(
'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
)
self.addCleanup(next_operation_patcher.stop)
next_operation_patcher.start()
python类patch()的实例源码
sell_states_test.py 文件源码
项目:bitcoin-trading-system
作者: vinicius-ronconi
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_push(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"]
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_already_in_registry_with_force(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['my_image', "--force"]
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 1]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
result = self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
self.assertEqual(result.exit_code, 1)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0, 1]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
result = self._invoke_cli(
global_params=self.global_params,
subcmd='push',
subcmd_params=push_params
)
self.assertEqual(result.exit_code, 0)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock):
skipper_runner_run_mock.side_effect = [0, 0]
push_params = ['my_image']
with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
requests_response_mock = requests_response_class_mock.return_value
requests_response_mock.json.return_value = {
'name': 'my_image',
'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
}
requests_get_mock.return_value = requests_response_mock
self._invoke_cli(
defaults=config.load_defaults(),
subcmd='push',
subcmd_params=push_params
)
expected_commands = [
mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
]
skipper_runner_run_mock.assert_has_calls(expected_commands)
def test_get_retry_logging(log_stream):
with mock.patch("umapi_client.connection.requests.Session.get") as mock_get:
mock_get.return_value = MockResponse(429, headers={"Retry-After": "3"})
stream, logger = log_stream
params = dict(mock_connection_params)
params["logger"] = logger
conn = Connection(**params)
pytest.raises(UnavailableError, conn.make_call, "")
stream.flush()
log = stream.getvalue() # save as a local so can do pytest -l to see exact log
assert log == """UMAPI timeout...service unavailable (code 429 on try 1)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 2)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 3)
UMAPI timeout...giving up after 3 attempts (6 seconds).
"""
# log_stream fixture defined in conftest.py
def test_post_retry_logging(log_stream):
with mock.patch("umapi_client.connection.requests.Session.post") as mock_post:
mock_post.return_value = MockResponse(429, headers={"Retry-After": "3"})
stream, logger = log_stream
params = dict(mock_connection_params)
params["logger"] = logger
conn = Connection(**params)
pytest.raises(UnavailableError, conn.make_call, "", [3, 5])
stream.flush()
log = stream.getvalue() # save as a local so can do pytest -l to see exact log
assert log == """UMAPI timeout...service unavailable (code 429 on try 1)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 2)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 3)
UMAPI timeout...giving up after 3 attempts (6 seconds).
"""
def test_plug(self, mock_plug):
plg = extension.Extension(name="noop",
entry_point="os-vif",
plugin=NoOpPlugin,
obj=None)
with mock.patch('stevedore.extension.ExtensionManager.names',
return_value=['foobar']),\
mock.patch('stevedore.extension.ExtensionManager.__getitem__',
return_value=plg):
os_vif.initialize()
info = mock.sentinel.info
vif = mock.MagicMock()
vif.plugin_name = 'noop'
os_vif.plug(vif, info)
mock_plug.assert_called_once_with(vif, info)
def test_unplug(self, mock_unplug):
plg = extension.Extension(name="demo",
entry_point="os-vif",
plugin=NoOpPlugin,
obj=None)
with mock.patch('stevedore.extension.ExtensionManager.names',
return_value=['foobar']),\
mock.patch('stevedore.extension.ExtensionManager.__getitem__',
return_value=plg):
os_vif.initialize()
info = mock.sentinel.info
vif = mock.MagicMock()
vif.plugin_name = 'noop'
os_vif.unplug(vif, info)
mock_unplug.assert_called_once_with(vif, info)
def test_wrap_consumer(self, m_retry_type, m_logging_type):
consumer = mock.sentinel.consumer
retry_handler = mock.sentinel.retry_handler
logging_handler = mock.sentinel.logging_handler
m_retry_type.return_value = retry_handler
m_logging_type.return_value = logging_handler
thread_group = mock.sentinel.thread_group
with mock.patch.object(h_dis.EventPipeline, '__init__'):
pipeline = h_pipeline.ControllerPipeline(thread_group)
ret = pipeline._wrap_consumer(consumer)
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(retry_handler)
m_retry_type.assert_called_with(consumer, exceptions=mock.ANY)
def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
new_resource_version = "456"
conflicting_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': resource_version}}
good_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': new_resource_version}}
conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
good_data = jsonutils.dumps(good_obj, sort_keys=True)
m_resp_conflict = mock.MagicMock()
m_resp_conflict.ok = False
m_resp_conflict.status_code = requests.codes.conflict
m_resp_good = mock.MagicMock()
m_resp_good.ok = True
m_resp_good.json.return_value = conflicting_obj
m_patch.side_effect = [m_resp_conflict, m_resp_good]
with mock.patch.object(self.client, 'get') as m_get:
m_get.return_value = good_obj
self.assertEqual(annotations, self.client.annotate(
path, annotations, resource_version=resource_version))
m_patch.assert_has_calls([
mock.call(self.base_url + path,
data=conflicting_data,
headers=mock.ANY,
cert=(None, None), verify=False),
mock.call(self.base_url + path,
data=good_data,
headers=mock.ANY,
cert=(None, None), verify=False)])
def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
annotating_resource_version = '123'
annotating_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': annotating_resource_version}}
annotating_data = jsonutils.dumps(annotating_obj, sort_keys=True)
new_resource_version = '456'
new_obj = {'metadata': {
'resourceVersion': new_resource_version}}
resolution_obj = annotating_obj.copy()
resolution_obj['metadata']['resourceVersion'] = new_resource_version
resolution_data = jsonutils.dumps(resolution_obj, sort_keys=True)
m_resp_conflict = mock.MagicMock()
m_resp_conflict.ok = False
m_resp_conflict.status_code = requests.codes.conflict
m_resp_good = mock.MagicMock()
m_resp_good.ok = True
m_resp_good.json.return_value = resolution_obj
m_patch.side_effect = (m_resp_conflict, m_resp_good)
with mock.patch.object(self.client, 'get') as m_get:
m_get.return_value = new_obj
self.assertEqual(annotations, self.client.annotate(
path, annotations,
resource_version=annotating_resource_version))
m_patch.assert_has_calls([
mock.call(self.base_url + path,
data=annotating_data,
headers=mock.ANY,
cert=(None, None), verify=False),
mock.call(self.base_url + path,
data=resolution_data,
headers=mock.ANY,
cert=(None, None), verify=False)])
def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
new_resource_version = "456"
conflicting_obj = {'metadata': {
'annotations': annotations,
'resourceVersion': resource_version}}
actual_obj = {'metadata': {
'annotations': {'a1': 'v2'},
'resourceVersion': new_resource_version}}
conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
m_resp_conflict = mock.MagicMock()
m_resp_conflict.ok = False
m_resp_conflict.status_code = requests.codes.conflict
m_patch.return_value = m_resp_conflict
with mock.patch.object(self.client, 'get') as m_get:
m_get.return_value = actual_obj
self.assertRaises(exc.K8sClientException,
self.client.annotate,
path, annotations,
resource_version=resource_version)
m_patch.assert_called_once_with(self.base_url + path,
data=conflicting_data,
headers=mock.ANY,
cert=(None, None), verify=False)
def test_run(self, m_count):
event = mock.sentinel.event
group = mock.sentinel.group
m_queue = mock.Mock()
m_queue.empty.return_value = True
m_queue.get.return_value = event
m_handler = mock.Mock()
m_count.return_value = [1]
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
queue_depth=1)
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
m_handler.assert_called_once_with(event)
def test_run_empty(self, m_count):
events = [mock.sentinel.event1, mock.sentinel.event2]
group = mock.sentinel.group
m_queue = mock.Mock()
m_queue.empty.return_value = True
m_queue.get.side_effect = events + [six_queue.Empty()]
m_handler = mock.Mock()
m_count.return_value = list(range(5))
async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())
with mock.patch('time.sleep'):
async_handler._run(group, m_queue)
m_handler.assert_has_calls([mock.call(event) for event in events])
self.assertEqual(len(events), m_handler.call_count)
def test__prepare_variables_configdrive_file(self, mem_req_mock):
i_info = self.node.instance_info
i_info['configdrive'] = 'fake-content'
self.node.instance_info = i_info
self.node.save()
self.config(tempdir='/path/to/tmpfiles')
expected = {"image": {"url": "http://image",
"validate_certs": "yes",
"source": "fake-image",
"mem_req": 2000,
"disk_format": "qcow2",
"checksum": "md5:checksum"},
'configdrive': {'type': 'file',
'location': '/path/to/tmpfiles/%s.cndrive'
% self.node.uuid}}
with mock.patch.object(ansible_deploy, 'open', mock.mock_open(),
create=True) as open_mock:
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertEqual(expected,
ansible_deploy._prepare_variables(task))
open_mock.assert_has_calls((
mock.call('/path/to/tmpfiles/%s.cndrive' % self.node.uuid,
'w'),
mock.call().__enter__(),
mock.call().write('fake-content'),
mock.call().__exit__(None, None, None)))