def test_resource_lazy_getattr(self):
fake_manager = mock.Mock()
return_resource = SampleResource(None, dict(uuid=mock.sentinel.fake_id,
foo='bar',
name='fake_name'))
fake_manager.get.return_value = return_resource
r = SampleResource(fake_manager,
dict(uuid=mock.sentinel.fake_id, foo='bar'))
self.assertTrue(hasattr(r, 'foo'))
self.assertEqual('bar', r.foo)
self.assertFalse(r.has_attached())
# Trigger load
self.assertEqual('fake_name', r.name)
fake_manager.get.assert_called_once_with(mock.sentinel.fake_id)
self.assertTrue(r.has_attached())
# Missing stuff still fails after a second get
self.assertRaises(AttributeError, getattr, r, 'abc')
python类Mock()的实例源码
def test_power_off_domain_not_running(self, mock_open):
domain_name = 'domainA'
domain_mock = mock.Mock()
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
domain_mock.destroy.side_effect = \
DumbLibvirtError(DOMAIN_NOT_RUNNING,
conn=connection_mock)
mock_open.return_value = connection_mock
self.driver.power_off(domain_name)
domain_mock.destroy.assert_called_with()
connection_mock.close.assert_called_with()
def test_resource_lazy_getattr(self):
fake_manager = mock.Mock()
return_resource = SampleResource(None, dict(uuid=mock.sentinel.fake_id,
foo='bar',
name='fake_name'))
fake_manager.get.return_value = return_resource
r = SampleResource(fake_manager,
dict(uuid=mock.sentinel.fake_id, foo='bar'))
self.assertTrue(hasattr(r, 'foo'))
self.assertEqual('bar', r.foo)
self.assertFalse(r.has_attached())
# Trigger load
self.assertEqual('fake_name', r.name)
fake_manager.get.assert_called_once_with(mock.sentinel.fake_id)
self.assertTrue(r.has_attached())
# Missing stuff still fails after a second get
self.assertRaises(AttributeError, getattr, r, 'abc')
def test_when_the_config_key_does_not_exists(self):
from airflow import logging_config
def side_effect(*args):
if args[1] == 'logging_config_class':
raise AirflowConfigException
else:
return "bla_bla_from_test"
logging_config.conf.get = mock.Mock(side_effect=side_effect)
with patch.object(logging_config.log, 'debug') as mock_debug:
logging_config.configure_logging()
mock_debug.assert_any_call('Could not find key logging_config_class in config')
# Just default
def test_upload(self):
self.client.post.return_value = mock_upload_response('/files/content',
httplib.CREATED,
None,
'files', 'POST_response.json')
file_name = 'upload.jpg'
file_path = path.join(path.dirname(__file__), '..', 'fixtures', 'files', file_name)
folder_id = 'folder-id'
@mock.patch('orangecloud_client.files.guess_type', return_value=('image/jpeg', 'binary'))
@mock.patch('__builtin__.open', spec=open, return_value=MockFile())
@mock.patch('orangecloud_client.files.MultipartEncoder', return_value=mock.Mock())
def fire_test(mock_multipart_encoder, mock_open, _):
data = mock_multipart_encoder()
data.content_type = 'upload content type'
response_file = self.files.upload(file_path, folder_id)
self.client.post.assert_called_with(self.client.post.return_value.url,
data=data,
headers={'Content-Type': data.content_type})
self.assertEqual('file-id', response_file.fileId)
self.assertEqual('file-name', response_file.fileName)
fire_test()
def test_poster_with_mentions_twitter(self, tweepy_mock, tweepy_oauth_mock, phantom_driver):
mock_body = open(os.path.join(os.path.dirname(__file__), 'data', 'crs_page.html')).read()
posters_register._posters = []
posters_register.register_poster(tweet_status, 'twitter_done')
m = mock.Mock()
m.update_with_media.return_value = mock.Mock(id='1')
tweepy_mock.return_value = m
phantom_driver.return_value = MockPhantomJS(mock_body)
fetch_and_update()
self.assertEqual(m.update_status.call_count, 1)
self.assertEqual(m.update_with_media.call_count, 1)
m.update_status.assert_called_with(
'@4stagi @patrick91 Nuovo bollettino meteo ?',
in_reply_to_status_id='1'
)
def test_power_off_domain_not_running(self, mock_open):
domain_name = 'domainA'
domain_mock = mock.Mock()
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
domain_mock.destroy.side_effect = \
DumbLibvirtError(DOMAIN_NOT_RUNNING,
conn=connection_mock)
mock_open.return_value = connection_mock
self.driver.power_off(domain_name)
domain_mock.destroy.assert_called_with()
connection_mock.close.assert_called_with()
def prepare(self):
self.instance_ctl = mock.Mock()
self.volume_ctl = mock.Mock()
self.volume_type_ctl = mock.Mock()
self.entity_ctl = mock.Mock()
self.app_ctl = mock.Mock()
self.auth_adapter = mock.Mock()
routes.instance_ctl = self.instance_ctl
routes.volume_ctl = self.volume_ctl
routes.volume_type_ctl = self.volume_type_ctl
routes.entity_ctl = self.entity_ctl
routes.app_ctl = self.app_ctl
routes.auth_adapter = self.auth_adapter
self.app = flask.Flask("almanach")
self.app.register_blueprint(routes.api)
def test_build_version_with_valid_package_name(self):
package = Mock(project_name='foo', version='1.0.0')
setattr(settings, 'HEARTBEAT', {'package_name': 'foo'})
with mock.patch.object(build.WorkingSet, 'find', return_value=package):
distro = build.check(request=None)
assert distro == {'name': 'foo', 'version': '1.0.0'}
def test_get_distribution_list(self, dist_list):
dist_list.return_value = [
Mock(project_name=i, version='1.0.0') for i in range(3)]
distro = distribution_list.check(request=None)
assert {'version': '1.0.0', 'name': 1} in distro
assert {'version': '1.0.0', 'name': 2} in distro
def test__lshw(self, udev_mock, po, wm, output_helper, hwd):
wm.return_value = '/valid/path'
udev_mock.return_value = "mocked_udevadm_out"
process_mock = mock.Mock()
attrs = {'communicate.return_value': (output_helper.lshw_out['stdout'], 'error')}
process_mock.configure_mock(**attrs)
po.return_value = process_mock
out = hwd._lshw()
expected = output_helper.lshw_out['expected_return']
assert out == expected
def test__udevadm(self, po, output_helper, hwd):
# additional test for virtualized env?
process_mock = mock.Mock()
attrs = {'communicate.return_value': (output_helper._udevadm_out['stdout'], 'error')}
process_mock.configure_mock(**attrs)
po.return_value = process_mock
out = hwd._udevadm('/dev/sda')
expected = output_helper._udevadm_out['expected_return']
assert out == expected
def test__updates_needed_not(self, po, apt):
"""
No updates pending
"""
process_mock = mock.Mock()
attrs = {'communicate.return_value': ("", "0;0")}
process_mock.configure_mock(**attrs)
po.return_value = process_mock
ret = apt._updates_needed()
assert po.called is True
assert ret is False
def test__updates_needed(self, po, apt):
"""
Updates pending
"""
process_mock = mock.Mock()
attrs = {'communicate.return_value': ("", "1;1")}
process_mock.configure_mock(**attrs)
po.return_value = process_mock
ret = apt._updates_needed()
assert po.called is True
assert ret is True
def core_mock(self):
return mock.Mock()
def setUp(self):
super(TestSnmpClient, self).setUp()
self.command_generator_mock = mock.Mock()
self.pysnmp_mock = mock.Mock()
self.oneliner_cmdgen = mock.Mock()
self.oneliner_cmdgen.CommandGenerator.return_value = \
self.command_generator_mock
self.oneliner_cmdgen.CommunityData.return_value = \
sentinel.community_data
self.oneliner_cmdgen.UdpTransportTarget.return_value = \
sentinel.udp_transport_target
self.snmp_client = snmp_client.SnmpClient(
oneliner_cmdgen=self.oneliner_cmdgen,
host=sentinel.hostname,
port=sentinel.port,
community=sentinel.community,
timeout=sentinel.timeout,
retries=sentinel.retries,
)
self.all_error_indications = [
(errind.__dict__.get(a).__class__.__name__, errind.__dict__.get(a))
for a in dir(errind)
if isinstance(errind.__dict__.get(a), ErrorIndication)]
def test_power_on(self, mock_open):
domain_mock = mock.Mock()
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
mock_open.return_value = connection_mock
self.driver.power_on('domainA')
mock_open.assert_called_with('hello')
connection_mock.lookupByName.assert_called_with('domainA')
domain_mock.create.assert_called_with()
connection_mock.close.assert_called_with()
def test_power_off(self, mock_open):
domain_mock = mock.Mock()
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
mock_open.return_value = connection_mock
self.driver.power_off('domainA')
mock_open.assert_called_with('hello')
connection_mock.lookupByName.assert_called_with('domainA')
domain_mock.destroy.assert_called_with()
connection_mock.close.assert_called_with()
def test_get_power_state_on(self, mock_open):
domain_mock = mock.Mock()
domain_mock.isActive.return_value = 1
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
mock_open.return_value = connection_mock
self.assertEqual(drivers.POWER_ON,
self.driver.get_power_state('domainA'))
mock_open.assert_called_with('hello')
connection_mock.lookupByName.assert_called_with('domainA')
domain_mock.isActive.assert_called_with()
connection_mock.close.assert_called_with()
def test_get_power_state_off(self, mock_open):
domain_mock = mock.Mock()
domain_mock.isActive.return_value = 0
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
mock_open.return_value = connection_mock
self.assertEqual(drivers.POWER_OFF,
self.driver.get_power_state('domainA'))
mock_open.assert_called_with('hello')
connection_mock.lookupByName.assert_called_with('domainA')
domain_mock.isActive.assert_called_with()
connection_mock.close.assert_called_with()
def test_get_power_state_domain_not_found(self, mock_open):
connection_mock = mock.Mock()
connection_mock.lookupByName.side_effect = \
libvirt.libvirtError('virDomainLookupByName() failed',
conn=connection_mock)
mock_open.return_value = connection_mock
self.assertRaises(drivers.DeviceNotFound,
self.driver.get_power_state, 'domainA')
connection_mock.close.assert_called_with()
def test_power_on_domain_not_found(self, mock_open):
connection_mock = mock.Mock()
connection_mock.lookupByName.side_effect = \
libvirt.libvirtError('virDomainLookupByName() failed',
conn=connection_mock)
mock_open.return_value = connection_mock
self.assertRaises(drivers.DeviceNotFound,
self.driver.power_on, 'domainA')
connection_mock.close.assert_called_with()
def test_power_off_domain_not_found(self, mock_open):
connection_mock = mock.Mock()
connection_mock.lookupByName.side_effect = \
libvirt.libvirtError('virDomainLookupByName() failed',
conn=connection_mock)
mock_open.return_value = connection_mock
self.assertRaises(drivers.DeviceNotFound,
self.driver.power_off, 'domainA')
connection_mock.close.assert_called_with()
def test_power_on_generic_error_goes_through(self, mock_open):
domain_mock = mock.Mock()
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
domain_mock.create.side_effect = SpecificException()
mock_open.return_value = connection_mock
self.assertRaises(SpecificException, self.driver.power_on, 'domainA')
connection_mock.close.assert_called_with()
def test_power_on_stays_connected(self, mock_open):
domain_mock = mock.Mock()
connection_mock = mock.Mock()
connection_mock.lookupByName.return_value = domain_mock
mock_open.return_value = connection_mock
self.driver.power_on('domainA')
mock_open.assert_called_with('hello')
connection_mock.lookupByName.assert_called_with('domainA')
domain_mock.create.assert_called_with()
self.assertFalse(connection_mock.close.called)
def setUp(self):
super(PDUTestCase, self).setUp()
self.community = 'test1212'
self.core_mock = mock.Mock()
self.pdu = self.pdu_class(name='test_pdu', core=self.core_mock)
self.pdu_test_harness = pysnmp_handler.SNMPPDUHarness(
self.pdu,
'127.0.0.1',
random.randint(20000, 30000),
self.community)
self.pdu_test_harness.start()
def mocked_resource():
return mock.Mock()
def test_backup_mysql_status_updated(mock_get_dst,
mock_backup_stream,
mock_prep_status,
mock_mysql_source,
config_content):
s_config = config_content.format(destination="ssh", port='1234')
buf = StringIO.StringIO(s_config)
config = ConfigParser.ConfigParser()
config.readfp(buf)
mock_dst = mock.Mock()
mock_get_dst.return_value = mock_dst
backup_mysql("hourly", config)
mock_dst.status.assert_called_once()
test_rolling_upgrades.py 文件源码
项目:asg-rolling-upgrade
作者: crunch-accounting
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def mock_paginator():
return mock.Mock(spec=botocore.client.Paginator)
test_rolling_upgrades.py 文件源码
项目:asg-rolling-upgrade
作者: crunch-accounting
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def mock_as_client(mock_paginator):
mock_as_client = mock.Mock()
mock_as_client.get_paginator.return_value = mock_paginator
return mock_as_client