def setUp(self):
"""Set up."""
self.cycle_timestamp = '20001225T120000Z'
self.configs = fake_configs.FAKE_CONFIGS
self.resource_name = 'instance_templates'
self.maxDiff = None
self.mock_compute_client = mock.create_autospec(compute.ComputeClient)
self.mock_dao = mock.create_autospec(instance_template_dao.InstanceTemplateDao)
self.pipeline = (
load_instance_templates_pipeline.LoadInstanceTemplatesPipeline(
self.cycle_timestamp,
self.configs,
self.mock_compute_client,
self.mock_dao))
self.project_ids = fake_instance_templates \
.FAKE_PROJECT_INSTANCE_TEMPLATES_MAP.keys()
self.projects = [project_dao.ProjectDao.map_row_to_object(p)
for p in fake_projects.EXPECTED_LOADABLE_PROJECTS
if p['project_id'] in self.project_ids]
python类create_autospec()的实例源码
load_instance_templates_pipeline_test.py 文件源码
项目:forseti-security
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
load_appengine_pipeline_test.py 文件源码
项目:forseti-security
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def setUp(self):
"""Set up."""
self.cycle_timestamp = '20001225T120000Z'
self.configs = fake_configs.FAKE_CONFIGS
self.mock_appengine = mock.create_autospec(appengine.AppEngineClient)
self.mock_dao = mock.create_autospec(appengine_dao.AppEngineDao)
self.pipeline = (
load_appengine_pipeline.LoadAppenginePipeline(
self.cycle_timestamp,
self.configs,
self.mock_appengine,
self.mock_dao))
self.project_ids = fake_appengine_applications \
.FAKE_PROJECT_APPLICATIONS_MAP.keys()
self.projects = [project_dao.ProjectDao.map_row_to_object(p)
for p in fake_projects.EXPECTED_LOADABLE_PROJECTS
if p['project_id'] in self.project_ids]
load_instance_group_managers_pipeline_test.py 文件源码
项目:forseti-security
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def setUp(self):
"""Set up."""
self.cycle_timestamp = '20001225T120000Z'
self.configs = fake_configs.FAKE_CONFIGS
self.resource_name = 'instance_group_managers'
self.maxDiff = None
self.mock_compute_client = mock.create_autospec(compute.ComputeClient)
self.mock_dao = mock.create_autospec(instance_group_manager_dao.InstanceGroupManagerDao)
self.pipeline = (
load_instance_group_managers_pipeline.LoadInstanceGroupManagersPipeline(
self.cycle_timestamp,
self.configs,
self.mock_compute_client,
self.mock_dao))
self.project_ids = fake_instance_group_managers \
.FAKE_PROJECT_INSTANCE_GROUP_MANAGERS_MAP.keys()
self.projects = [project_dao.ProjectDao.map_row_to_object(p)
for p in fake_projects.EXPECTED_LOADABLE_PROJECTS
if p['project_id'] in self.project_ids]
load_forwarding_rules_pipeline_test.py 文件源码
项目:forseti-security
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def setUp(self):
"""Set up."""
self.cycle_timestamp = '20001225T120000Z'
self.configs = fake_configs.FAKE_CONFIGS
self.resource_name = 'forwarding_rules'
self.mock_compute_client = mock.create_autospec(compute.ComputeClient)
self.mock_dao = mock.create_autospec(frdao.ForwardingRulesDao)
self.pipeline = (
load_forwarding_rules_pipeline.LoadForwardingRulesPipeline(
self.cycle_timestamp,
self.configs,
self.mock_compute_client,
self.mock_dao))
self.project_ids = fake_forwarding_rules \
.FAKE_PROJECT_FWD_RULES_MAP.keys()
self.projects = [project_dao.ProjectDao.map_row_to_object(p)
for p in fake_projects.EXPECTED_LOADABLE_PROJECTS
if p['project_id'] in self.project_ids]
load_service_accounts_pipeline_test.py 文件源码
项目:forseti-security
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def setUp(self):
"""Set up."""
self.cycle_timestamp = '20001225T120000Z'
self.configs = fake_configs.FAKE_CONFIGS
self.mock_iam = mock.create_autospec(iam.IAMClient)
self.mock_dao = mock.create_autospec(
service_account_dao.ServiceAccountDao)
self.pipeline = (
load_service_accounts_pipeline.LoadServiceAccountsPipeline(
self.cycle_timestamp,
self.configs,
self.mock_iam,
self.mock_dao))
self.project_ids = fake_service_accounts \
.FAKE_PROJECT_SERVICE_ACCOUNTS_MAP.keys()
self.projects = [project_dao.ProjectDao.map_row_to_object(p)
for p in fake_projects.EXPECTED_LOADABLE_PROJECTS
if p['project_id'] in self.project_ids]
load_instance_groups_pipeline_test.py 文件源码
项目:forseti-security
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def setUp(self):
"""Set up."""
self.cycle_timestamp = '20001225T120000Z'
self.configs = fake_configs.FAKE_CONFIGS
self.resource_name = 'instance_groups'
self.maxDiff = None
self.mock_compute_client = mock.create_autospec(compute.ComputeClient)
self.mock_dao = mock.create_autospec(instance_group_dao.InstanceGroupDao)
self.pipeline = (
load_instance_groups_pipeline.LoadInstanceGroupsPipeline(
self.cycle_timestamp,
self.configs,
self.mock_compute_client,
self.mock_dao))
self.project_ids = fake_instance_groups \
.FAKE_PROJECT_INSTANCE_GROUPS_MAP.keys()
self.projects = [project_dao.ProjectDao.map_row_to_object(p)
for p in fake_projects.EXPECTED_LOADABLE_PROJECTS
if p['project_id'] in self.project_ids]
def setUp(self):
self.getuser_patch = mock.patch(
'slivka.scheduler.executors.getpass.getuser',
return_value='mockuser'
)
self.getuser_patch.start()
self.subprocess_patch = mock.patch(
'slivka.scheduler.executors.subprocess',
autospec=True
)
self.mock_subprocess = self.subprocess_patch.start()
mock_popen = self.mock_subprocess.Popen.return_value
mock_popen.communicate.return_value = (self.qstat_output, '')
mock_exec = mock.create_autospec(Executor)
mock_exec.result_paths = []
self.job = GridEngineJob('', '', mock_exec)
test_service_account.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_before_request_refreshes(self, jwt_grant):
credentials = self.make_credentials()
token = 'token'
jwt_grant.return_value = (
token, _helpers.utcnow() + datetime.timedelta(seconds=500), None)
request = mock.create_autospec(transport.Request, instance=True)
# Credentials should start as invalid
assert not credentials.valid
# before_request should cause a refresh
credentials.before_request(
request, 'GET', 'http://example.com?a=1#3', {})
# The refresh endpoint should've been called.
assert jwt_grant.called
# Credentials should now be valid.
assert credentials.valid
test_grpc.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_call_no_refresh(self):
credentials = CredentialsStub()
request = mock.create_autospec(transport.Request)
plugin = google.auth.transport.grpc.AuthMetadataPlugin(
credentials, request)
context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
context.method_name = mock.sentinel.method_name
context.service_url = mock.sentinel.service_url
callback = mock.create_autospec(grpc.AuthMetadataPluginCallback)
plugin(context, callback)
callback.assert_called_once_with(
[(u'authorization', u'Bearer {}'.format(credentials.token))], None)
test_grpc.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_call_refresh(self):
credentials = CredentialsStub()
credentials.expiry = datetime.datetime.min + _helpers.CLOCK_SKEW
request = mock.create_autospec(transport.Request)
plugin = google.auth.transport.grpc.AuthMetadataPlugin(
credentials, request)
context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
context.method_name = mock.sentinel.method_name
context.service_url = mock.sentinel.service_url
callback = mock.create_autospec(grpc.AuthMetadataPluginCallback)
plugin(context, callback)
assert credentials.token == 'token1'
callback.assert_called_once_with(
[(u'authorization', u'Bearer {}'.format(credentials.token))], None)
def test_remind(self):
"""
remind should send remind and mark job as done.
"""
job = self.create_job(state='queued')
with mock_instance('sndlatr.gmail.Mailman') as mailman:
mail = mock.create_autospec(spec=gae_mail.EmailMessage)
mailman.build_reply.return_value = mail
job.remind('token')
mailman.send_mail.assert_called_with(mail)
mailman.build_reply.assert_called_with(job.thread_id_int,
mock.ANY)
mailman.quit.assert_called_with()
job = job.key.get()
self.assertEquals(job.state, 'done')
def test_find_reply(self):
""" Should find first unknown reply """
job = self.create_job(known_message_ids=['abc1'])
mailman = mock.create_autospec(gmail.Mailman)
mail1 = create_thread_mail(message_id='abc1')
mail2 = create_thread_mail(message_id='abc2')
mail3 = create_thread_mail(message_id='abc3')
mailman.get_thread.return_value = [mail1, mail2, mail3]
reply = job.find_reply(mailman)
# should match first unknown reply
self.assertEqual(reply, mail2)
def test_find_reply_none(self):
""" Return value should be none if there is no unkown reply """
job = self.create_job(known_message_ids=['abc1', 'abc2'])
mailman = mock.create_autospec(gmail.Mailman)
mailman.get_thread.return_value = [
create_thread_mail(message_id='abc1'),
create_thread_mail(message_id='abc2')]
self.assertIsNone(job.find_reply(mailman))
def test_BrowserReturnPositioner():
fake_browser = mock.create_autospec(EpiphanyBrowser)
html_template = jinja2.Template('genre: {{ genre }}')
tempdir = tempfile.mkdtemp()
try:
positioner = BrowserReturnPositioner(fake_browser, html_template, tempdir)
positioner.show('infraA[?3]')
assert fake_browser.open.called
positioner.hide()
assert fake_browser.close.called
finally:
shutil.rmtree(tempdir)
def test_KintoneLogger():
kintone = mock.create_autospec(Kintone)
logger = KintoneLogger('system1', kintone)
logger.log_nfc_connected('0123', 'user-hoge')
assert kintone.add_log.called
system_id, json_msg = kintone.add_log.call_args[0]
json_obj = json.loads(json_msg)
assert system_id == 'system1'
assert json_obj['employee_id'] == '0123'
assert json_obj['user_code'] == 'user-hoge'
def setUp(self):
self.orig_directory_init = OktaDirectoryConnector.__init__
OktaDirectoryConnector.__init__ = mock.Mock(return_value=None)
directory = OktaDirectoryConnector({})
directory.options = {'all_users_filter': None, 'group_filter_format': '{group}'}
directory.logger = mock.create_autospec(logging.Logger)
directory.groups_client = okta.UserGroupsClient('example.com', 'xyz')
self.directory = directory
def setUp(self):
class MockResponse:
def __init__(self, status_code, data):
self.status_code = status_code
self.text = json.dumps(data)
self.links = {}
self.mock_response = MockResponse
self.orig_directory_init = OktaDirectoryConnector.__init__
OktaDirectoryConnector.__init__ = mock.Mock(return_value=None)
directory = OktaDirectoryConnector({})
directory.logger = mock.create_autospec(logging.Logger)
directory.groups_client = okta.UserGroupsClient('example.com', 'xyz')
directory.user_identity_type = 'enterpriseID'
self.directory = directory
def recruiter(self, active_config):
from dallinger.mturk import MTurkService
from dallinger.recruiters import MTurkRecruiter
with mock.patch.multiple('dallinger.recruiters',
os=mock.DEFAULT,
get_base_url=mock.DEFAULT) as mocks:
mocks['get_base_url'].return_value = 'http://fake-domain'
mocks['os'].getenv.return_value = 'fake-host-domain'
mockservice = mock.create_autospec(MTurkService)
active_config.extend({'mode': u'sandbox'})
r = MTurkRecruiter()
r.mturkservice = mockservice('fake key', 'fake secret')
r.mturkservice.check_credentials.return_value = True
r.mturkservice.create_hit.return_value = {'type_id': 'fake type id'}
return r