def test_merge_wip_skip_outdated():
from jenkins_epo.extensions.core import MergerExtension
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.SETTINGS.COLLABORATORS = ['collaborator']
ext.current.last_commit.date = datetime.now()
ext.current.opm.author = 'collaborator'
ext.current.opm.date = datetime.now()
ext.current.opm_denied = []
ext.current.wip = True
ext.current.opm_processed = Mock(
date=ext.current.opm.date + timedelta(minutes=5)
)
yield from ext.run()
assert not ext.current.head.comment.mock_calls
assert not ext.current.head.merge.mock_calls
python类Mock()的实例源码
def test_merge_fail():
from jenkins_epo.extensions.core import MergerExtension, ApiError
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.SETTINGS.COLLABORATORS = ['collaborator']
ext.current.last_commit.date = datetime.now()
ext.current.opm.author = 'collaborator'
ext.current.opm_denied = []
ext.current.wip = None
ext.current.last_commit.fetch_combined_status = CoroutineMock(
return_value={'state': 'success'}
)
ext.current.head.merge.side_effect = ApiError('url', {}, dict(
code=405, json=dict(message="error")
))
yield from ext.run()
assert not ext.current.head.delete_branch.mock_calls
def test_merge_success():
from jenkins_epo.extensions.core import MergerExtension
ext = MergerExtension('merger', Mock())
ext.current = Mock()
ext.current.opm = Mock(author='author')
ext.current.opm_denied = []
ext.current.last_merge_error = None
ext.current.last_commit.fetch_combined_status = CoroutineMock(
return_value={'state': 'success'}
)
ext.current.wip = None
yield from ext.run()
assert ext.current.head.merge.mock_calls
assert not ext.current.head.comment.mock_calls
assert ext.current.head.delete_branch.mock_calls
def test_process_allow():
from jenkins_epo.bot import Instruction
from jenkins_epo.extensions.core import SecurityExtension
ext = SecurityExtension('sec', Mock())
ext.current = ext.bot.current
ext.current.head.author = 'contributor'
ext.current.security_feedback_processed = None
ext.current.SETTINGS.COLLABORATORS = ['owner']
ext.current.denied_instructions = [
Instruction(author='contributor', name='skip')
]
ext.process_instruction(Instruction(
author='owner', name='allow'
))
assert 'contributor' in ext.current.SETTINGS.COLLABORATORS
assert not ext.current.denied_instructions
def test_skip_outdated():
from jenkins_epo.extensions.jenkins import PollExtension
ext = PollExtension('test', Mock())
ext.current = ext.bot.current
ext.current.head.sha = 'cafed0d0'
ext.current.cancel_queue = []
ext.current.job_specs = {'job': Mock()}
ext.current.job_specs['job'].name = 'job'
ext.current.jobs = {}
ext.current.jobs['job'] = job = Mock()
job.fetch_builds = CoroutineMock()
job.process_builds.return_value = builds = [Mock()]
build = builds[0]
build.is_outdated = True
yield from ext.run()
assert not builds[0].is_running.mock_calls
assert 0 == len(ext.current.cancel_queue)
def test_skip_build_not_running():
from jenkins_epo.extensions.jenkins import PollExtension
ext = PollExtension('test', Mock())
ext.current = ext.bot.current
ext.current.head.sha = 'cafed0d0'
ext.current.cancel_queue = []
ext.current.job_specs = {'job': Mock()}
ext.current.job_specs['job'].name = 'job'
ext.current.jobs = {}
ext.current.jobs['job'] = job = Mock()
job.fetch_builds = CoroutineMock()
job.process_builds.return_value = builds = [Mock()]
build = builds[0]
build.is_outdated = False
build.is_running = False
yield from ext.run()
assert 0 == len(ext.current.cancel_queue)
def test_skip_current_sha(mocker):
from jenkins_epo.extensions.jenkins import PollExtension
ext = PollExtension('test', Mock())
ext.current = ext.bot.current
ext.current.cancel_queue = []
ext.current.head.ref = 'branch'
ext.current.head.sha = 'bab1'
ext.current.job_specs = {'job': Mock()}
ext.current.job_specs['job'].name = 'job'
ext.current.jobs = {}
ext.current.jobs['job'] = job = Mock()
job.list_contexts.return_value = []
job.fetch_builds = CoroutineMock()
job.process_builds.return_value = builds = [Mock()]
build = builds[0]
build.is_outdated = False
build.is_running = True
build.ref = 'branch'
build.sha = ext.current.head.sha
yield from ext.run()
assert 0 == len(ext.current.cancel_queue)
def test_cancel(mocker):
from jenkins_epo.extensions.jenkins import PollExtension
ext = PollExtension('test', Mock())
ext.current = ext.bot.current
ext.current.cancel_queue = []
ext.current.head.ref = 'branch'
ext.current.head.sha = 'bab1'
ext.current.job_specs = {'job': Mock()}
ext.current.job_specs['job'].name = 'job'
ext.current.jobs = {}
ext.current.jobs['job'] = job = Mock()
job.fetch_builds = CoroutineMock()
job.process_builds.return_value = builds = [Mock()]
build = builds[0]
build.is_outdated = False
build.is_running = True
build.ref = 'branch'
build.sha = '01d'
build.url = 'url://'
build.commit_status = dict()
yield from ext.run()
assert 1 == len(ext.current.cancel_queue)
def test_cancel_ignore_other(mocker):
Build = mocker.patch('jenkins_epo.extensions.jenkins.Build')
from jenkins_epo.extensions.jenkins import (
CancellerExtension, CommitStatus, NotOnJenkins
)
Build.from_url = CoroutineMock(side_effect=NotOnJenkins())
commit = Mock()
ext = CancellerExtension('test', Mock())
ext.current = ext.bot.current
ext.current.head.sha = 'cafed0d0'
ext.current.poll_queue = []
ext.current.cancel_queue = [
(commit, CommitStatus(
context='ci/...', target_url='circleci://1', state='pending',
)),
]
ext.current.last_commit.fetch_statuses.return_value = []
ext.current.last_commit.maybe_update_status = CoroutineMock()
yield from ext.run()
assert not commit.maybe_update_status.mock_calls
def test_poll_lost_build(mocker):
JENKINS = mocker.patch('jenkins_epo.extensions.jenkins.JENKINS')
Build = mocker.patch('jenkins_epo.extensions.jenkins.Build')
from jenkins_epo.extensions.jenkins import CancellerExtension, CommitStatus
commit = Mock()
commit.maybe_update_status = CoroutineMock()
ext = CancellerExtension('test', Mock())
ext.current = ext.bot.current
ext.current.head.sha = 'cafed0d0'
ext.current.poll_queue = []
ext.current.cancel_queue = [
(commit, CommitStatus(context='job', target_url='jenkins://job/1')),
]
ext.current.last_commit.fetch_statuses.return_value = []
JENKINS.baseurl = 'jenkins://'
Build.from_url = CoroutineMock()
yield from ext.run()
assert Build.from_url.mock_calls
assert commit.maybe_update_status.mock_calls
def test_get_network_data(self, time_mock, sleep_mock):
time_mock.side_effect = [1, 2]
Counter = namedtuple('Counter',
['bytes_sent', 'bytes_recv', 'packets_sent',
'packets_recv'])
first_counter = Counter(bytes_sent=54000, bytes_recv=12000,
packets_sent=50, packets_recv=100)
second_counter = Counter(bytes_sent=108000, bytes_recv=36000,
packets_sent=75, packets_recv=150)
m = mock.Mock()
m.side_effect = [
{'eth0': first_counter}, {'eth0': second_counter}
]
self.network.psutil.net_io_counters = m
kb_ul, kb_dl, p_ul, p_dl = self.network.get_network_data(
interface='eth0', delay=1)
self.assertEqual(kb_ul, 54000)
self.assertEqual(kb_dl, 24000)
self.assertEqual(p_ul, 25)
self.assertEqual(p_dl, 50)
def test_xsubscribe(service_a1, service_d1, registry):
# assert service_d1 == {}
registry.register_service(
packet={'params': service_a1}, registry_protocol=mock.Mock())
registry.register_service(
packet={'params': service_d1}, registry_protocol=mock.Mock())
registry._xsubscribe(packet={'params': service_d1})
protocol = mock.Mock()
params = {
'name': service_a1['name'],
'version': service_a1['version'],
'endpoint': service_d1['events'][0]['endpoint']
}
registry.get_subscribers(packet={'params': params, 'request_id': str(uuid.uuid4())}, protocol=protocol)
assert subscriber_returned_successfully(protocol.send.call_args_list[0][0][0], service_d1)
def test_non_post_entity_types_are_skipped(self, mock_logger):
process_entity_retraction(Mock(entity_type="foo"), Mock())
mock_logger.assert_called_with("Ignoring retraction of entity_type %s", "foo")
def test_does_nothing_if_content_doesnt_exist(self, mock_logger):
process_entity_retraction(Mock(entity_type="Post", target_guid="bar"), Mock())
mock_logger.assert_called_with("Retracted remote content %s cannot be found", "bar")
def test_does_nothing_if_content_is_not_local(self, mock_logger):
process_entity_retraction(Mock(entity_type="Post", target_guid=self.local_content.guid), Mock())
mock_logger.assert_called_with(
"Retracted remote content %s cannot be found", self.local_content.guid,
)
def test_does_nothing_if_content_author_is_not_same_as_remote_profile(self, mock_logger):
remote_profile = Mock()
process_entity_retraction(Mock(entity_type="Post", target_guid=self.content.guid), remote_profile)
mock_logger.assert_called_with(
"Content %s is not owned by remote retraction profile %s", self.content, remote_profile
)
def test_deletes_content(self):
process_entity_retraction(
Mock(entity_type="Post", target_guid=self.content.guid, handle=self.content.author.handle),
self.content.author
)
with self.assertRaises(Content.DoesNotExist):
Content.objects.get(id=self.content.id)
def test_returns_none_on_exception(self, mock_post):
entity = make_federable_retraction(Mock(), Mock())
self.assertIsNone(entity)
def test_remote_profile_public_key_is_returned(self, mock_from_remote, mock_retrieve):
remote_profile = Mock(rsa_public_key="foo")
mock_retrieve.return_value = mock_from_remote.return_value = remote_profile
self.assertEqual(sender_key_fetcher("bar"), "foo")
mock_retrieve.assert_called_once_with("bar")
mock_from_remote.assert_called_once_with(remote_profile)
def test_returns_legacy_xml_payload(self):
self.assertEqual(
DiasporaReceiveViewMixin.get_payload_from_request(Mock(body="foobar", POST={"xml": "barfoo"})),
"barfoo",
)