python类Mock()的实例源码

test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_merge_wip_skip_outdated():
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm.author = 'collaborator'
    ext.current.opm.date = datetime.now()
    ext.current.opm_denied = []
    ext.current.wip = True
    ext.current.opm_processed = Mock(
        date=ext.current.opm.date + timedelta(minutes=5)
    )

    yield from ext.run()

    assert not ext.current.head.comment.mock_calls
    assert not ext.current.head.merge.mock_calls
test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_merge_fail():
    from jenkins_epo.extensions.core import MergerExtension, ApiError

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm.author = 'collaborator'
    ext.current.opm_denied = []
    ext.current.wip = None
    ext.current.last_commit.fetch_combined_status = CoroutineMock(
        return_value={'state': 'success'}
    )
    ext.current.head.merge.side_effect = ApiError('url', {}, dict(
        code=405, json=dict(message="error")
    ))

    yield from ext.run()

    assert not ext.current.head.delete_branch.mock_calls
test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_merge_success():
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.opm = Mock(author='author')
    ext.current.opm_denied = []
    ext.current.last_merge_error = None
    ext.current.last_commit.fetch_combined_status = CoroutineMock(
        return_value={'state': 'success'}
    )
    ext.current.wip = None

    yield from ext.run()

    assert ext.current.head.merge.mock_calls
    assert not ext.current.head.comment.mock_calls
    assert ext.current.head.delete_branch.mock_calls
test_security.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_process_allow():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import SecurityExtension

    ext = SecurityExtension('sec', Mock())
    ext.current = ext.bot.current
    ext.current.head.author = 'contributor'
    ext.current.security_feedback_processed = None
    ext.current.SETTINGS.COLLABORATORS = ['owner']
    ext.current.denied_instructions = [
        Instruction(author='contributor', name='skip')
    ]

    ext.process_instruction(Instruction(
        author='owner', name='allow'
    ))

    assert 'contributor' in ext.current.SETTINGS.COLLABORATORS
    assert not ext.current.denied_instructions
test_poll.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_skip_outdated():
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.cancel_queue = []
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = True

    yield from ext.run()

    assert not builds[0].is_running.mock_calls
    assert 0 == len(ext.current.cancel_queue)
test_poll.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_skip_build_not_running():
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.cancel_queue = []
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = False
    build.is_running = False

    yield from ext.run()

    assert 0 == len(ext.current.cancel_queue)
test_poll.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_skip_current_sha(mocker):
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.cancel_queue = []
    ext.current.head.ref = 'branch'
    ext.current.head.sha = 'bab1'
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.list_contexts.return_value = []
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = False
    build.is_running = True
    build.ref = 'branch'
    build.sha = ext.current.head.sha

    yield from ext.run()

    assert 0 == len(ext.current.cancel_queue)
test_poll.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_cancel(mocker):
    from jenkins_epo.extensions.jenkins import PollExtension

    ext = PollExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.cancel_queue = []
    ext.current.head.ref = 'branch'
    ext.current.head.sha = 'bab1'
    ext.current.job_specs = {'job': Mock()}
    ext.current.job_specs['job'].name = 'job'
    ext.current.jobs = {}
    ext.current.jobs['job'] = job = Mock()
    job.fetch_builds = CoroutineMock()
    job.process_builds.return_value = builds = [Mock()]
    build = builds[0]
    build.is_outdated = False
    build.is_running = True
    build.ref = 'branch'
    build.sha = '01d'
    build.url = 'url://'
    build.commit_status = dict()

    yield from ext.run()

    assert 1 == len(ext.current.cancel_queue)
test_jenkins.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_cancel_ignore_other(mocker):
    Build = mocker.patch('jenkins_epo.extensions.jenkins.Build')
    from jenkins_epo.extensions.jenkins import (
        CancellerExtension, CommitStatus, NotOnJenkins
    )

    Build.from_url = CoroutineMock(side_effect=NotOnJenkins())

    commit = Mock()

    ext = CancellerExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.poll_queue = []
    ext.current.cancel_queue = [
        (commit, CommitStatus(
            context='ci/...', target_url='circleci://1', state='pending',
        )),
    ]
    ext.current.last_commit.fetch_statuses.return_value = []
    ext.current.last_commit.maybe_update_status = CoroutineMock()

    yield from ext.run()

    assert not commit.maybe_update_status.mock_calls
test_jenkins.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_poll_lost_build(mocker):
    JENKINS = mocker.patch('jenkins_epo.extensions.jenkins.JENKINS')
    Build = mocker.patch('jenkins_epo.extensions.jenkins.Build')
    from jenkins_epo.extensions.jenkins import CancellerExtension, CommitStatus

    commit = Mock()
    commit.maybe_update_status = CoroutineMock()

    ext = CancellerExtension('test', Mock())
    ext.current = ext.bot.current
    ext.current.head.sha = 'cafed0d0'
    ext.current.poll_queue = []
    ext.current.cancel_queue = [
        (commit, CommitStatus(context='job', target_url='jenkins://job/1')),
    ]
    ext.current.last_commit.fetch_statuses.return_value = []

    JENKINS.baseurl = 'jenkins://'
    Build.from_url = CoroutineMock()

    yield from ext.run()

    assert Build.from_url.mock_calls
    assert commit.maybe_update_status.mock_calls
test_plugins.py 文件源码 项目:sauna 作者: NicolasLM 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_network_data(self, time_mock, sleep_mock):
        time_mock.side_effect = [1, 2]

        Counter = namedtuple('Counter',
                             ['bytes_sent', 'bytes_recv', 'packets_sent',
                              'packets_recv'])

        first_counter = Counter(bytes_sent=54000, bytes_recv=12000,
                                packets_sent=50, packets_recv=100)
        second_counter = Counter(bytes_sent=108000, bytes_recv=36000,
                                 packets_sent=75, packets_recv=150)

        m = mock.Mock()
        m.side_effect = [
            {'eth0': first_counter}, {'eth0': second_counter}
        ]

        self.network.psutil.net_io_counters = m
        kb_ul, kb_dl, p_ul, p_dl = self.network.get_network_data(
            interface='eth0', delay=1)
        self.assertEqual(kb_ul, 54000)
        self.assertEqual(kb_dl, 24000)
        self.assertEqual(p_ul, 25)
        self.assertEqual(p_dl, 50)
test_registry.py 文件源码 项目:trellio 作者: artificilabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_xsubscribe(service_a1, service_d1, registry):
    # assert service_d1 == {}
    registry.register_service(
        packet={'params': service_a1}, registry_protocol=mock.Mock())
    registry.register_service(
        packet={'params': service_d1}, registry_protocol=mock.Mock())
    registry._xsubscribe(packet={'params': service_d1})

    protocol = mock.Mock()
    params = {
        'name': service_a1['name'],
        'version': service_a1['version'],
        'endpoint': service_d1['events'][0]['endpoint']
    }
    registry.get_subscribers(packet={'params': params, 'request_id': str(uuid.uuid4())}, protocol=protocol)
    assert subscriber_returned_successfully(protocol.send.call_args_list[0][0][0], service_d1)
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_non_post_entity_types_are_skipped(self, mock_logger):
        process_entity_retraction(Mock(entity_type="foo"), Mock())
        mock_logger.assert_called_with("Ignoring retraction of entity_type %s", "foo")
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_does_nothing_if_content_doesnt_exist(self, mock_logger):
        process_entity_retraction(Mock(entity_type="Post", target_guid="bar"), Mock())
        mock_logger.assert_called_with("Retracted remote content %s cannot be found", "bar")
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_does_nothing_if_content_is_not_local(self, mock_logger):
        process_entity_retraction(Mock(entity_type="Post", target_guid=self.local_content.guid), Mock())
        mock_logger.assert_called_with(
            "Retracted remote content %s cannot be found", self.local_content.guid,
        )
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_does_nothing_if_content_author_is_not_same_as_remote_profile(self, mock_logger):
        remote_profile = Mock()
        process_entity_retraction(Mock(entity_type="Post", target_guid=self.content.guid), remote_profile)
        mock_logger.assert_called_with(
            "Content %s is not owned by remote retraction profile %s", self.content, remote_profile
        )
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_deletes_content(self):
        process_entity_retraction(
            Mock(entity_type="Post", target_guid=self.content.guid, handle=self.content.author.handle),
            self.content.author
        )
        with self.assertRaises(Content.DoesNotExist):
            Content.objects.get(id=self.content.id)
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_returns_none_on_exception(self, mock_post):
        entity = make_federable_retraction(Mock(), Mock())
        self.assertIsNone(entity)
test_utils_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_remote_profile_public_key_is_returned(self, mock_from_remote, mock_retrieve):
        remote_profile = Mock(rsa_public_key="foo")
        mock_retrieve.return_value = mock_from_remote.return_value = remote_profile
        self.assertEqual(sender_key_fetcher("bar"), "foo")
        mock_retrieve.assert_called_once_with("bar")
        mock_from_remote.assert_called_once_with(remote_profile)
test_views.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_returns_legacy_xml_payload(self):
        self.assertEqual(
            DiasporaReceiveViewMixin.get_payload_from_request(Mock(body="foobar", POST={"xml": "barfoo"})),
            "barfoo",
        )


问题


面经


文章

微信
公众号

扫码关注公众号