def test_get_cached_content_ids__calls(self, mock_get, mock_queryset):
mock_redis = Mock(zrevrange=Mock(return_value=[]))
mock_get.return_value = mock_redis
self.stream.stream_type = StreamType.PUBLIC
self.stream.get_cached_content_ids()
# Skips zrevrank if not last_id
self.assertFalse(mock_redis.zrevrank.called)
# Calls zrevrange with correct parameters
mock_redis.zrevrange.assert_called_once_with(self.stream.key, 0, self.stream.paginate_by)
mock_redis.reset_mock()
# Calls zrevrank with last_id
self.stream.last_id = self.content2.id
mock_redis.zrevrank.return_value = 3
self.stream.get_cached_content_ids()
mock_redis.zrevrank.assert_called_once_with(self.stream.key, self.content2.id)
mock_redis.zrevrange.assert_called_once_with(self.stream.key, 4, 4 + self.stream.paginate_by)
python类Mock()的实例源码
def test_get_cached_range(self, mock_get, mock_queryset):
self.stream.stream_type = StreamType.PUBLIC
mock_zrevrange = Mock(return_value=[str(self.content2.id), str(self.content1.id)])
mock_hmget = Mock(return_value=[str(self.content2.id), str(self.content1.id)])
mock_redis = Mock(zrevrange=mock_zrevrange, hmget=mock_hmget)
mock_get.return_value = mock_redis
ids, throughs = self.stream.get_cached_range(0)
self.assertEqual(ids, [self.content2.id, self.content1.id])
self.assertEqual(throughs, {self.content2.id: self.content2.id, self.content1.id: self.content1.id})
mock_zrevrange.assert_called_once_with(self.stream.key, 0, 0 + self.stream.paginate_by)
mock_hmget.assert_called_once_with(BaseStream.get_throughs_key(self.stream.key), keys=[
self.content2.id, self.content1.id,
])
# Non-zero index
mock_zrevrange.reset_mock()
self.stream.get_cached_range(5)
mock_zrevrange.assert_called_once_with(self.stream.key, 5, 5 + self.stream.paginate_by)
def tests_event_registration(self):
"""Tests that events register correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Test that a valid event registers
self.assertTrue(
events.add_event_callback(TIMELINE.ALARM_GROUP, callback))
# Test that no event group returns false
self.assertFalse(events.add_event_callback(None, callback))
# Test that an invalid event throws exception
with self.assertRaises(abodepy.AbodeException):
events.add_event_callback("lol", callback)
def tests_timeline_registration(self):
"""Tests that timeline events register correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Test that a valid timeline event registers
self.assertTrue(
events.add_timeline_callback(
TIMELINE.CAPTURE_IMAGE, callback))
# Test that no timeline event returns false
self.assertFalse(events.add_timeline_callback(None, callback))
# Test that an invalid timeline event string throws exception
with self.assertRaises(abodepy.AbodeException):
events.add_timeline_callback("lol", callback)
# Test that an invalid timeline event dict throws exception
with self.assertRaises(abodepy.AbodeException):
events.add_timeline_callback({"lol": "lol"}, callback)
def tests_multi_events_callback(self):
"""Tests that multiple event updates callback correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Register our events
self.assertTrue(
events.add_event_callback(
[TIMELINE.ALARM_GROUP, TIMELINE.CAPTURE_GROUP],
callback))
# Call our events callback method and trigger a capture group event
# pylint: disable=protected-access
event_json = json.loads(IRCAMERA.timeline_event())
events._on_timeline_update(event_json)
# Ensure our callback was called
callback.assert_called_with(event_json)
def tests_multi_timeline_callback(self):
"""Tests that multiple timeline updates callback correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callback
callback = Mock()
# Register our events
self.assertTrue(
events.add_timeline_callback(
[TIMELINE.CAPTURE_IMAGE, TIMELINE.OPENED], callback))
# Call our events callback method and trigger a capture group event
# pylint: disable=protected-access
event_json = json.loads(IRCAMERA.timeline_event())
events._on_timeline_update(event_json)
# Ensure our callback was called
callback.assert_called_with(event_json)
def tests_automations_callback(self):
"""Tests that automation updates callback correctly."""
# Get the event controller
events = self.abode.events
self.assertIsNotNone(events)
# Create mock callbacks
automation_callback = Mock()
# Register our events
self.assertTrue(
events.add_event_callback(
TIMELINE.AUTOMATION_EDIT_GROUP, automation_callback))
# Call our events callback method and trigger a capture group event
# pylint: disable=protected-access
events._on_automation_update('{}')
# Our capture callback should get one, but our alarm should not
automation_callback.assert_called_with('{}')
def test_no_modifications(self):
self.io._write_attr_annotations = mock.Mock()
self.io.write_all_blocks(self.neo_blocks)
self.io._write_attr_annotations.assert_not_called()
self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks)
# clearing hashes and checking again
for k in self.io._object_hashes.keys():
self.io._object_hashes[k] = None
self.io.write_all_blocks(self.neo_blocks)
self.io._write_attr_annotations.assert_not_called()
# changing hashes to force rewrite
for k in self.io._object_hashes.keys():
self.io._object_hashes[k] = "_"
self.io.write_all_blocks(self.neo_blocks)
callcount = self.io._write_attr_annotations.call_count
self.assertEqual(callcount, len(self.io._object_hashes))
self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks)
def test_subscription_recording_processor_for_request(self):
replaced_subscription_id = str(uuid.uuid4())
rp = SubscriptionRecordingProcessor(replaced_subscription_id)
uri_templates = ['https://management.azure.com/subscriptions/{}/providers/Microsoft.ContainerRegistry/'
'checkNameAvailability?api-version=2017-03-01',
'https://graph.windows.net/{}/applications?api-version=1.6']
for template in uri_templates:
mock_sub_id = str(uuid.uuid4())
mock_request = mock.Mock()
mock_request.uri = template.format(mock_sub_id)
mock_request.body = self._mock_subscription_request_body(mock_sub_id)
rp.process_request(mock_request)
self.assertEqual(mock_request.uri, template.format(replaced_subscription_id))
self.assertEqual(mock_request.body,
self._mock_subscription_request_body(replaced_subscription_id))
def test_getattr_with_under(self, *args, **kwrags):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
delegate.name = 'blabla'
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)
coll = coll(db, name)
self.assertEqual(coll._get_write_mode, coll.delegate._get_write_mode)
def test_getitem(self, *args, **kwargs):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
delegate.name = 'blabla'
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)
coll = coll(db, name)
other_coll = coll['other']
self.assertTrue(isinstance(other_coll, type(coll)))
def test_find(self, *args, **kwargs):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
delegate.name = 'blabla'
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)(db, name)
cursor = create_class_with_framework(core.SanicMongoAgnosticCursor,
asyncio_framework,
self.__module__)
self.assertIsInstance(coll.find(), cursor)
def test_get_item(self, *args, **kwargs):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
comp_coll = create_class_with_framework(
core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)
coll = db['bla']
self.assertTrue(isinstance(coll, comp_coll))
def test_asynchornize(self):
test_mock = Mock()
class TestClass:
@classmethod
def _get_db(cls):
db = Mock()
db._framework = asyncio_framework
return db
@metaprogramming.asynchronize
def sync(self):
test_mock()
testobj = TestClass()
self.assertTrue(isinstance(testobj.sync(), Future))
yield from testobj.sync()
self.assertTrue(test_mock.called)
def test_asynchornize_with_exception(self):
class TestClass:
@classmethod
def _get_db(cls):
db = Mock()
db._framework = asyncio_framework
return db
@metaprogramming.asynchronize
def sync(self):
raise Exception
testobj = TestClass()
with self.assertRaises(Exception):
yield from testobj.sync()
def test_create_attribute_with_attribute_error(self):
test_mock = Mock()
class BaseTestClass:
@classmethod
def _get_db(cls):
db = Mock()
db._framework = asyncio_framework
return db
def some_method(self):
test_mock()
class TestClass(BaseTestClass):
some_method = metaprogramming.Async()
test_class = TestClass
with self.assertRaises(AttributeError):
test_class.some_method = TestClass.some_method.create_attribute(
TestClass, 'some_other_method')
def test_errors_reset():
from jenkins_epo.bot import Error, Instruction
from jenkins_epo.utils import parse_datetime
from jenkins_epo.extensions.core import ErrorExtension
ext = ErrorExtension('error', Mock())
ext.current = ext.bot.current
ext.current.denied_instructions = []
ext.current.errors = [
Error('message', parse_datetime('2016-08-03T15:58:47Z')),
]
ext.current.error_reset = None
ext.process_instruction(
Instruction(name='reset-errors', author='bot', date=datetime.utcnow())
)
assert ext.current.error_reset
yield from ext.run()
assert not ext.current.head.comment.mock_calls
def test_error_denied():
from jenkins_epo.bot import Instruction
from jenkins_epo.extensions.core import ErrorExtension
ext = ErrorExtension('error', Mock())
ext.current = ext.bot.current
ext.current.denied_instructions = [Mock()]
ext.current.errors = []
ext.current.error_reset = None
ext.process_instruction(
Instruction(name='reset-denied', author='bot')
)
assert not ext.current.denied_instructions
yield from ext.run()
assert not ext.current.head.comment.mock_calls
ext.current.denied_instructions = [Mock(author='toto')]
yield from ext.run()
assert ext.current.head.comment.mock_calls
def test_yml_found(mocker, SETTINGS):
GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB')
Job = mocker.patch('jenkins_epo.extensions.core.Job')
from jenkins_epo.extensions.core import YamlExtension
Job.jobs_filter = ['*', '-skip']
SETTINGS.update(YamlExtension.SETTINGS)
ext = YamlExtension('ext', Mock())
ext.current = ext.bot.current
ext.current.yaml = {'job': dict()}
GITHUB.fetch_file_contents = CoroutineMock(
return_value="job: command\nskip: command",
)
head = ext.current.head
head.repository.url = 'https://github.com/owner/repo.git'
head.repository.jobs = {}
yield from ext.run()
assert GITHUB.fetch_file_contents.mock_calls
assert 'job' in ext.current.job_specs
assert 'skip' not in ext.current.job_specs
def test_yml_comment_dict():
from jenkins_epo.bot import Instruction
from jenkins_epo.extensions.core import YamlExtension
ext = YamlExtension('ext', Mock())
ext.current = ext.bot.current
ext.current.yaml = {}
ext.process_instruction(Instruction(
author='a', name='yaml', args=dict(job=dict(parameters=dict(PARAM1=1)))
))
ext.process_instruction(Instruction(
author='a', name='params', args=dict(job=dict(PARAM2=1))
))
assert 'PARAM1' in ext.current.yaml['job']['parameters']
assert 'PARAM2' in ext.current.yaml['job']['parameters']
def test_job_update():
from jenkins_epo.extensions.jenkins import CreateJobsExtension
ext = CreateJobsExtension('createjob', Mock())
ext.current = ext.bot.current
ext.current.refresh_jobs = None
ext.current.job_specs = {'new_job': Mock(config=dict())}
ext.current.job_specs['new_job'].name = 'new_job'
job = Mock()
job.spec.contains.return_value = False
ext.current.jobs = {'new_job': job}
res = [x for x in ext.process_job_specs()]
assert res
action, spec = res[0]
assert action == job.update
def test_jenkins_fails_existing(mocker):
process_job_specs = mocker.patch(
'jenkins_epo.extensions.jenkins.CreateJobsExtension.process_job_specs'
)
from jenkins_yml.job import Job as JobSpec
from jenkins_epo.extensions.jenkins import CreateJobsExtension
ext = CreateJobsExtension('createjob', Mock())
ext.current = ext.bot.current
ext.current.errors = []
ext.current.head.sha = 'cafed0d0'
ext.current.head.repository.jobs = {'job': Mock()}
ext.current.job_specs = dict(job=JobSpec.factory('job', 'toto'))
job = Mock()
ext.current.jobs = {'job': job}
job.update = CoroutineMock(side_effect=Exception('POUET'))
process_job_specs.return_value = [(job.update, Mock())]
yield from ext.run()
assert ext.current.errors
assert job.update.mock_calls
def test_second_stage():
from jenkins_epo.extensions.jenkins import StagesExtension
ext = StagesExtension('stages', Mock())
ext.current = Mock()
ext.current.head.ref = 'pr'
ext.current.SETTINGS.STAGES = ['build', 'test']
ext.current.job_specs = specs = {
'test': Mock(config=dict()),
'missing': Mock(config=dict()),
}
specs['test'].name = 'test'
specs['missing'].name = 'missing'
ext.current.jobs = jobs = {
'test': Mock(),
}
jobs['test'].list_contexts.return_value = ['test']
ext.current.statuses = {}
yield from ext.run()
assert ext.current.current_stage.name == 'test'
def test_no_test_stage():
from jenkins_epo.extensions.jenkins import StagesExtension
ext = StagesExtension('stages', Mock())
ext.current = Mock()
ext.current.head.ref = 'pr'
ext.current.SETTINGS.STAGES = ['build', 'deploy']
ext.current.job_specs = specs = {
'build': Mock(config=dict()),
}
specs['build'].name = 'build'
ext.current.jobs = jobs = {
'build': Mock(),
}
jobs['build'].list_contexts.return_value = ['build']
ext.current.statuses = {}
yield from ext.run()
assert 'build' == ext.current.current_stage.name
assert 'build' in ext.current.job_specs
def test_branches_limit():
from jenkins_epo.extensions.jenkins import StagesExtension
ext = StagesExtension('stages', Mock())
ext.current = Mock()
ext.current.head.ref = 'pr'
ext.current.SETTINGS.STAGES = ['test']
ext.current.job_specs = specs = {
'job': Mock(config=dict(branches='master')),
}
specs['job'].name = 'job'
ext.current.jobs = jobs = {'job': Mock()}
jobs['job'].list_contexts.return_value = ['job']
ext.current.statuses = {}
yield from ext.run()
assert ext.current.current_stage.name == 'test'
assert 'job' not in ext.current.job_specs
def test_comment_deny():
from jenkins_epo.bot import Instruction
from jenkins_epo.extensions.core import MergerExtension
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.SETTINGS.COLLABORATORS = []
ext.current.last_commit.date = datetime.now()
ext.current.opm = {}
ext.current.opm_denied = [Instruction(
author='noncollaborator', name='opm',
date=ext.current.last_commit.date + timedelta(hours=1),
)]
yield from ext.run()
assert ext.current.head.comment.mock_calls
def test_deny_outdated_opm():
from jenkins_epo.bot import Instruction
from jenkins_epo.extensions.core import MergerExtension
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.SETTINGS.COLLABORATORS = ['collaborator']
ext.current.last_commit.date = datetime.now()
ext.current.opm = {}
ext.current.opm_denied = []
ext.process_instruction(Instruction(
author='collaborator', name='opm',
date=ext.current.last_commit.date - timedelta(hours=1),
))
assert not ext.current.opm
def test_deny_non_collaborator_processed():
from jenkins_epo.bot import Instruction
from jenkins_epo.extensions.core import MergerExtension
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.SETTINGS.COLLABORATORS = []
ext.current.last_commit.date = datetime.now()
ext.current.opm = None
ext.current.opm_denied = [Instruction(
author='noncollaborator', name='opm',
)]
ext.process_instruction(Instruction(
author='bot', name='opm-processed',
date=ext.current.last_commit.date + timedelta(hours=2),
))
assert not ext.current.opm
assert not ext.current.opm_denied
def test_accept_lgtm():
from jenkins_epo.bot import Instruction
from jenkins_epo.extensions.core import MergerExtension
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.SETTINGS.COLLABORATORS = ['collaborator']
ext.current.last_commit.date = commit_date = datetime.now()
ext.current.opm = None
ext.current.opm_denied = []
ext.process_instruction(Instruction(
author='collaborator', name='opm',
date=commit_date + timedelta(hours=1),
))
assert 'collaborator' == ext.current.opm.author
assert not ext.current.opm_denied
def test_merge_wip():
from jenkins_epo.extensions.core import MergerExtension
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.SETTINGS.COLLABORATORS = ['collaborator']
ext.current.last_commit.date = datetime.now()
ext.current.opm.author = 'collaborator'
ext.current.opm.date = datetime.now()
ext.current.opm_denied = []
ext.current.opm_processed = None
ext.current.wip = True
yield from ext.run()
assert ext.current.head.comment.mock_calls
body = ext.current.head.comment.call_args[1]['body']
assert '@collaborator' in body
assert not ext.current.head.merge.mock_calls