def test_cli_with_login_username_only(self, mocked_method):
mocked = mock.MagicMock()
mocked.login.return_value = {"Status": "Login Succeeded"}
mocked.push.return_value = [
{"stream": "In process"},
{"status": "Successfully pushed"}]
mocked_method.return_value = mocked
with FakeProjectDirectory() as tmpdir:
add_sh_fake_config(tmpdir)
runner = CliRunner()
result = runner.invoke(
cli, ["dev", "--version", "test", "--apikey", "apikey"])
assert result.exit_code == 0
mocked.login.assert_called_with(
email=None, password=' ',
reauth=False, registry='registry', username='apikey')
mocked.push.assert_called_with(
'registry/user/project:test', decode=True,
insecure_registry=False, stream=True)
python类MagicMock()的实例源码
first_attempt_at_spark_test.py 文件源码
项目:monasca-transform
作者: openstack
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_transform_to_recordstore(self):
# simply verify that the transform method is called first, then
# rdd to recordstore
kafka_stream = MagicMock(name='kafka_stream')
transformed_stream = MagicMock(name='transformed_stream')
kafka_stream.transform.return_value = transformed_stream
MonMetricsKafkaProcessor.transform_to_recordstore(
kafka_stream)
# TODO(someone) figure out why these asserts now fail
# transformed_stream_expected = call.foreachRDD(
# MonMetricsKafkaProcessor.rdd_to_recordstore
# ).call_list()
# kafka_stream_expected = call.transform(
# MonMetricsKafkaProcessor.store_offset_ranges
# ).call_list()
# self.assertEqual(kafka_stream_expected, kafka_stream.mock_calls)
# self.assertEqual(transformed_stream_expected,
# transformed_stream.mock_calls)
def test_transform_service_heartbeat(self, coordinator):
# mock coordinator
fake_kazoo_driver = MagicMock(name="MagicKazooDriver",
spec=KazooDriver)
coordinator.return_value = fake_kazoo_driver
# option1
serv_thread = transform_service.TransformService()
serv_thread.daemon = True
serv_thread.start()
time.sleep(2)
# option2
# mocks dont seem to work when spawning a service
# pid = _spawn_transform_service()
# time.sleep()
# os.kill(pid, signal.SIGNAL_SIGTERM)
fake_kazoo_driver.heartbeat.assert_called_with()
def test_make_vif_subnets(self, m_mk_fixed_ip, m_make_vif_subnet):
subnet_id = mock.sentinel.subnet_id
ip_address = mock.sentinel.ip_address
fixed_ip = mock.sentinel.fixed_ip
subnet = mock.Mock()
subnets = mock.MagicMock()
subnets.__contains__.return_value = True
m_mk_fixed_ip.return_value = fixed_ip
m_make_vif_subnet.return_value = subnet
port = {'fixed_ips': [
{'subnet_id': subnet_id, 'ip_address': ip_address}]}
self.assertEqual([subnet], ovu._make_vif_subnets(port, subnets))
m_make_vif_subnet.assert_called_once_with(subnets, subnet_id)
m_mk_fixed_ip.assert_called_once_with(address=ip_address)
subnet.ips.objects.append.assert_called_once_with(fixed_ip)
def test_get_parent_port(self):
cls = nested_vif.NestedPodVIFDriver
m_driver = mock.Mock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
node_fixed_ip = mock.sentinel.node_fixed_ip
pod_status = mock.MagicMock()
pod_status.__getitem__.return_value = node_fixed_ip
pod = mock.MagicMock()
pod.__getitem__.return_value = pod_status
parent_port = mock.sentinel.parent_port
m_driver._get_parent_port_by_host_ip.return_value = parent_port
cls._get_parent_port(m_driver, neutron, pod)
m_driver._get_parent_port_by_host_ip.assert_called_once()
def test_get_subnet(self, m_osv_subnet, m_osv_network):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
subnet = mock.MagicMock()
network = mock.MagicMock()
subnet_id = mock.sentinel.subnet_id
network_id = mock.sentinel.network_id
neutron_subnet = {'network_id': network_id}
neutron_network = mock.sentinel.neutron_network
neutron.show_subnet.return_value = {'subnet': neutron_subnet}
neutron.show_network.return_value = {'network': neutron_network}
m_osv_subnet.return_value = subnet
m_osv_network.return_value = network
ret = default_subnet._get_subnet(subnet_id)
self.assertEqual(network, ret)
neutron.show_subnet.assert_called_once_with(subnet_id)
neutron.show_network.assert_called_once_with(network_id)
m_osv_subnet.assert_called_once_with(neutron_subnet)
m_osv_network.assert_called_once_with(neutron_network)
network.subnets.objects.append.assert_called_once_with(subnet)
def test_get_in_use_vlan_ids_set(self):
cls = nested_vlan_vif.NestedVlanPodVIFDriver
m_driver = mock.Mock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
vlan_ids = set()
trunk_id = mock.sentinel.trunk_id
vlan_ids.add('100')
port = mock.MagicMock()
port.__getitem__.return_value = '100'
trunk_obj = mock.MagicMock()
trunk_obj.__getitem__.return_value = [port]
trunk = mock.MagicMock()
trunk.__getitem__.return_value = trunk_obj
neutron.show_trunk.return_value = trunk
self.assertEqual(vlan_ids,
cls._get_in_use_vlan_ids_set(m_driver, trunk_id))
def test_request_vif(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
pod = get_pod_obj()
project_id = mock.sentinel.project_id
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
vif = mock.sentinel.vif
m_driver._get_port_from_pool.return_value = vif
oslo_cfg.CONF.set_override('ports_pool_min',
5,
group='vif_pool')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(vif, cls.request_vif(m_driver, pod, project_id,
subnets, security_groups))
def test_request_vif_empty_pool(self, m_eventlet):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
host_addr = mock.sentinel.host_addr
pod_status = mock.MagicMock()
pod_status.__getitem__.return_value = host_addr
pod = mock.MagicMock()
pod.__getitem__.return_value = pod_status
project_id = mock.sentinel.project_id
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
m_driver._get_port_from_pool.side_effect = (
exceptions.ResourceNotReady(pod))
self.assertRaises(exceptions.ResourceNotReady, cls.request_vif,
m_driver, pod, project_id, subnets, security_groups)
m_eventlet.assert_called_once()
def test__get_in_use_ports(self):
cls = vif_pool.BaseVIFPool
m_driver = mock.MagicMock(spec=cls)
kubernetes = self.useFixture(k_fix.MockK8sClient()).client
pod = get_pod_obj()
port_id = 'f2c1b73a-6a0c-4dca-b986-0d07d09e0c02'
versioned_object = jsonutils.dumps({
'versioned_object.data': {
'active': True,
'address': 'fa:16:3e:ef:e6:9f',
'id': port_id
}})
pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF] = (
versioned_object)
items = [pod]
kubernetes.get.return_value = {'items': items}
resp = cls._get_in_use_ports(m_driver)
self.assertEqual(resp, [port_id])
def test__return_ports_to_pool_no_update(self, max_pool, m_sleep):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
port_id = mock.sentinel.port_id
pool_length = 5
m_driver._recyclable_ports = {port_id: pool_key}
m_driver._available_ports_pools = {}
oslo_cfg.CONF.set_override('ports_pool_max',
max_pool,
group='vif_pool')
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
m_driver._get_ports_by_attrs.return_value = [
{'id': port_id, 'security_groups': ['security_group']}]
m_driver._get_pool_size.return_value = pool_length
self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)
neutron.update_port.assert_not_called()
neutron.delete_port.assert_not_called()
def test__return_ports_to_pool_delete_port(self, m_sleep):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
port_id = mock.sentinel.port_id
pool_length = 10
vif = mock.sentinel.vif
m_driver._recyclable_ports = {port_id: pool_key}
m_driver._available_ports_pools = {}
m_driver._existing_vifs = {port_id: vif}
oslo_cfg.CONF.set_override('ports_pool_max',
10,
group='vif_pool')
m_driver._get_ports_by_attrs.return_value = [
{'id': port_id, 'security_groups': ['security_group_modified']}]
m_driver._get_pool_size.return_value = pool_length
self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)
neutron.update_port.assert_not_called()
neutron.delete_port.assert_called_once_with(port_id)
def test__return_ports_to_pool_delete_exception(self, m_sleep):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
port_id = mock.sentinel.port_id
pool_length = 10
vif = mock.sentinel.vif
m_driver._recyclable_ports = {port_id: pool_key}
m_driver._available_ports_pools = {}
m_driver._existing_vifs = {port_id: vif}
oslo_cfg.CONF.set_override('ports_pool_max',
5,
group='vif_pool')
m_driver._get_ports_by_attrs.return_value = [
{'id': port_id, 'security_groups': ['security_group_modified']}]
m_driver._get_pool_size.return_value = pool_length
neutron.delete_port.side_effect = n_exc.PortNotFoundClient
self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)
neutron.update_port.assert_not_called()
neutron.delete_port.assert_called_once_with(port_id)
def test__return_ports_to_pool_delete_key_error(self, m_sleep):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
port_id = mock.sentinel.port_id
pool_length = 10
m_driver._recyclable_ports = {port_id: pool_key}
m_driver._available_ports_pools = {}
m_driver._existing_vifs = {}
oslo_cfg.CONF.set_override('ports_pool_max',
5,
group='vif_pool')
m_driver._get_ports_by_attrs.return_value = [
{'id': port_id, 'security_groups': ['security_group_modified']}]
m_driver._get_pool_size.return_value = pool_length
self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)
neutron.update_port.assert_not_called()
neutron.delete_port.assert_not_called()
def test__recover_precreated_ports_empty(self, m_get_subnet, m_to_osvif):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
filtered_ports = []
m_driver._get_ports_by_attrs.return_value = filtered_ports
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
cls._recover_precreated_ports(m_driver)
m_driver._get_ports_by_attrs.assert_called_once()
m_get_subnet.assert_not_called()
m_to_osvif.assert_not_called()
def test__return_ports_to_pool_no_update(self, max_pool, m_sleep):
cls = vif_pool.NestedVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
port_id = mock.sentinel.port_id
pool_length = 5
m_driver._recyclable_ports = {port_id: pool_key}
m_driver._available_ports_pools = {}
oslo_cfg.CONF.set_override('ports_pool_max',
max_pool,
group='vif_pool')
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
m_driver._get_ports_by_attrs.return_value = [
{'id': port_id, 'security_groups': ['security_group']}]
m_driver._get_pool_size.return_value = pool_length
self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)
neutron.update_port.assert_not_called()
neutron.delete_port.assert_not_called()
def test_release_vif_parent_not_found(self):
cls = nested_macvlan_vif.NestedMacvlanPodVIFDriver
m_driver = mock.Mock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
port_id = lib_utils.get_hash()
pod = mock.sentinel.pod
vif = mock.Mock()
vif.id = port_id
container_mac = mock.sentinel.mac_address
container_ip = mock.sentinel.ip_address
container_port = self._get_fake_port(port_id, container_ip,
container_mac)
neutron.show_port.return_value = container_port
m_driver.lock = mock.MagicMock(spec=threading.Lock())
m_driver._get_parent_port.side_effect = n_exc.NeutronClientException
self.assertRaises(n_exc.NeutronClientException, cls.release_vif,
m_driver, pod, vif)
neutron.show_port.assert_called_once_with(port_id)
m_driver._get_parent_port.assert_called_once_with(neutron, pod)
m_driver._remove_from_allowed_address_pairs.assert_not_called()
neutron.delete_port.assert_not_called()
def test_has_port_changes(self):
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
m_service = mock.MagicMock()
m_handler._get_service_ports.return_value = [
{'port': 1, 'name': 'X', 'protocol': 'TCP'},
]
m_lbaas_spec = mock.MagicMock()
m_lbaas_spec.ports = [
obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1),
obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2),
]
ret = h_lbaas.LBaaSSpecHandler._has_port_changes(
m_handler, m_service, m_lbaas_spec)
self.assertTrue(ret)
def test_has_port_changes__no_changes(self):
m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
m_service = mock.MagicMock()
m_handler._get_service_ports.return_value = [
{'port': 1, 'name': 'X', 'protocol': 'TCP'},
{'port': 2, 'name': 'Y', 'protocol': 'TCP'}
]
m_lbaas_spec = mock.MagicMock()
m_lbaas_spec.ports = [
obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1),
obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2),
]
ret = h_lbaas.LBaaSSpecHandler._has_port_changes(
m_handler, m_service, m_lbaas_spec)
self.assertFalse(ret)
def test_annotate(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
ret = {'metadata': {'annotations': annotations,
"resourceVersion": resource_version}}
data = jsonutils.dumps(ret, sort_keys=True)
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.json.return_value = ret
m_patch.return_value = m_resp
self.assertEqual(annotations, self.client.annotate(
path, annotations, resource_version=resource_version))
m_patch.assert_called_once_with(self.base_url + path,
data=data, headers=mock.ANY,
cert=(None, None), verify=False)
def test_watch(self, m_get):
path = '/test'
data = [{'obj': 'obj%s' % i} for i in range(3)]
lines = [jsonutils.dumps(i) for i in data]
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.iter_lines.return_value = lines
m_get.return_value = m_resp
cycles = 3
self.assertEqual(
data * cycles,
list(itertools.islice(self.client.watch(path),
len(data) * cycles)))
self.assertEqual(cycles, m_get.call_count)
self.assertEqual(cycles, m_resp.close.call_count)
m_get.assert_called_with(self.base_url + path, headers={}, stream=True,
params={'watch': 'true'}, cert=(None, None),
verify=False)
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
"""Upload copies files to non-native store correctly with no
progress"""
import shutil
import tempfile
temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
temp_src_dir = os.path.dirname(temp_file.name)
temp_dst_dir = tempfile.mkdtemp()
shutil_mock = MagicMock()
shutil_mock.copyfile.return_value = None
with patch('sfctl.custom_app.shutil', new=shutil_mock):
sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
shutil_mock.copyfile.assert_called_once()
temp_file.close()
shutil.rmtree(os.path.dirname(temp_file.name))
shutil.rmtree(temp_dst_dir)
def test__power_status_ironic_exception_index_error(self, mock_get_conn):
mock_connection = mock.MagicMock(spec_set=['get_relays'])
side_effect = IndexError("Gotcha!")
mock_connection.get_relays.side_effect = side_effect
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.ERROR, status)
mock_get_conn.assert_called_once_with(info)
mock_connection.get_relays.assert_called_once_with()
def test__power_status_retries(self, mock_get_conn):
self.config(max_retry=1, group='iboot')
mock_connection = mock.MagicMock(spec_set=['get_relays'])
side_effect = TypeError("Surprise!")
mock_connection.get_relays.side_effect = side_effect
mock_get_conn.return_value = mock_connection
node = obj_utils.create_test_node(
self.context,
driver='fake_iboot_fake',
driver_info=INFO_DICT)
info = iboot_power._parse_driver_info(node)
status = iboot_power._power_status(info)
self.assertEqual(states.ERROR, status)
mock_get_conn.assert_called_once_with(info)
self.assertEqual(2, mock_connection.get_relays.call_count)
def test_reboot_good(self, mock_switch, mock_power_status,
mock_sleep_switch):
self.config(reboot_delay=3, group='iboot')
manager = mock.MagicMock(spec_set=['switch', 'sleep'])
mock_power_status.return_value = states.POWER_ON
manager.attach_mock(mock_switch, 'switch')
manager.attach_mock(mock_sleep_switch, 'sleep')
expected = [mock.call.switch(self.info, False),
mock.call.sleep(3),
mock.call.switch(self.info, True)]
with task_manager.acquire(self.context, self.node.uuid) as task:
task.driver.power.reboot(task)
self.assertEqual(expected, manager.mock_calls)
def test_reboot_bad(self, mock_switch, mock_power_status,
mock_sleep_switch):
self.config(reboot_delay=3, group='iboot')
manager = mock.MagicMock(spec_set=['switch', 'sleep'])
mock_power_status.return_value = states.POWER_OFF
manager.attach_mock(mock_switch, 'switch')
manager.attach_mock(mock_sleep_switch, 'sleep')
expected = [mock.call.switch(self.info, False),
mock.call.sleep(3),
mock.call.switch(self.info, True)]
with task_manager.acquire(self.context, self.node.uuid) as task:
self.assertRaises(ironic_exception.PowerStateFailure,
task.driver.power.reboot, task)
self.assertEqual(expected, manager.mock_calls)
def test__execute_nm_command(self, addr_mock, raw_mock):
addr_mock.return_value = ('0x0A', '0x0B')
raw_mock.return_value = ('0x03 0x04', '')
fake_data = {'foo': 'bar'}
fake_command = mock.MagicMock()
fake_parse = mock.MagicMock()
fake_command.return_value = ('0x01', '0x02')
with task_manager.acquire(self.context, self.node.uuid,
shared=False) as task:
nm_vendor._execute_nm_command(task, fake_data, fake_command,
fake_parse)
self.assertEqual('single', task.node.driver_info['ipmi_bridging'])
self.assertEqual('0x0A',
task.node.driver_info['ipmi_target_channel'])
self.assertEqual('0x0B',
task.node.driver_info['ipmi_target_address'])
fake_command.assert_called_once_with(fake_data)
raw_mock.assert_called_once_with(task, '0x01 0x02')
fake_parse.assert_called_once_with(['0x03', '0x04'])
def test_bind_port(self):
self.vif_details = {'ovs_hybrid_plug': True}
network = mock.MagicMock(spec=api.NetworkContext)
port_context = mock.MagicMock(
spec=ctx.PortContext, current={'id': 'CURRENT_CONTEXT_ID'},
segments_to_bind=[self.valid_segment],
network=network)
# when port is bound
self.bind_port(port_context)
# port_context.
# then context binding is setup with returned vif_type and valid
# segment api ID
port_context.set_binding.assert_called_once_with(
self.valid_segment[api.ID], 'ovs', self.vif_details)
def test_fetch_from_url_or_retry_post_json(self):
# mocked requests
identifier = "1csb, 2pah"
base_url = c.http_pdbe
endpoint_url = "api/pdb/entry/summary/"
response = response_mocker(kwargs={}, base_url=base_url,
endpoint_url=endpoint_url,
content_type='application/octet-stream',
post=True, data=identifier)
self.fetch_from_url_or_retry = MagicMock(return_value=response)
url = base_url + endpoint_url + identifier
r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier,
header={'application/octet-stream'},
retry_in=None, wait=0,
n_retries=10, stream=False).json()
self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
buy_states_test.py 文件源码
项目:bitcoin-trading-system
作者: vinicius-ronconi
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def setUp(self):
client = mock.MagicMock()
bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)
logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
self.addCleanup(logging_patch.stop)
logging_patch.start()
self.system = TrailingOrders(client, bootstrap)
self.system.get_pending_orders = mock.MagicMock(return_value=[])
self.set_next_operation = mock.MagicMock()
next_operation_patcher = mock.patch(
'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
)
self.addCleanup(next_operation_patcher.stop)
next_operation_patcher.start()