python类Mock()的实例源码

test_streams.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_get_cached_content_ids__calls(self, mock_get, mock_queryset):
        mock_redis = Mock(zrevrange=Mock(return_value=[]))
        mock_get.return_value = mock_redis
        self.stream.stream_type = StreamType.PUBLIC
        self.stream.get_cached_content_ids()
        # Skips zrevrank if not last_id
        self.assertFalse(mock_redis.zrevrank.called)
        # Calls zrevrange with correct parameters
        mock_redis.zrevrange.assert_called_once_with(self.stream.key, 0, self.stream.paginate_by)
        mock_redis.reset_mock()
        # Calls zrevrank with last_id
        self.stream.last_id = self.content2.id
        mock_redis.zrevrank.return_value = 3
        self.stream.get_cached_content_ids()
        mock_redis.zrevrank.assert_called_once_with(self.stream.key, self.content2.id)
        mock_redis.zrevrange.assert_called_once_with(self.stream.key, 4, 4 + self.stream.paginate_by)
test_streams.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_get_cached_range(self, mock_get, mock_queryset):
        self.stream.stream_type = StreamType.PUBLIC
        mock_zrevrange = Mock(return_value=[str(self.content2.id), str(self.content1.id)])
        mock_hmget = Mock(return_value=[str(self.content2.id), str(self.content1.id)])
        mock_redis = Mock(zrevrange=mock_zrevrange, hmget=mock_hmget)
        mock_get.return_value = mock_redis
        ids, throughs = self.stream.get_cached_range(0)
        self.assertEqual(ids, [self.content2.id, self.content1.id])
        self.assertEqual(throughs, {self.content2.id: self.content2.id, self.content1.id: self.content1.id})
        mock_zrevrange.assert_called_once_with(self.stream.key, 0, 0 + self.stream.paginate_by)
        mock_hmget.assert_called_once_with(BaseStream.get_throughs_key(self.stream.key), keys=[
            self.content2.id, self.content1.id,
        ])

        # Non-zero index
        mock_zrevrange.reset_mock()
        self.stream.get_cached_range(5)
        mock_zrevrange.assert_called_once_with(self.stream.key, 5, 5 + self.stream.paginate_by)
test_event_controller.py 文件源码 项目:abodepy 作者: MisterWil 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def tests_event_registration(self):
        """Tests that events register correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Test that a valid event registers
        self.assertTrue(
            events.add_event_callback(TIMELINE.ALARM_GROUP, callback))

        # Test that no event group returns false
        self.assertFalse(events.add_event_callback(None, callback))

        # Test that an invalid event throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_event_callback("lol", callback)
test_event_controller.py 文件源码 项目:abodepy 作者: MisterWil 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def tests_timeline_registration(self):
        """Tests that timeline events register correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Test that a valid timeline event registers
        self.assertTrue(
            events.add_timeline_callback(
                TIMELINE.CAPTURE_IMAGE, callback))

        # Test that no timeline event returns false
        self.assertFalse(events.add_timeline_callback(None, callback))

        # Test that an invalid timeline event string throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_timeline_callback("lol", callback)

        # Test that an invalid timeline event dict throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_timeline_callback({"lol": "lol"}, callback)
test_event_controller.py 文件源码 项目:abodepy 作者: MisterWil 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def tests_multi_events_callback(self):
        """Tests that multiple event updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_event_callback(
                [TIMELINE.ALARM_GROUP, TIMELINE.CAPTURE_GROUP],
                callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        event_json = json.loads(IRCAMERA.timeline_event())
        events._on_timeline_update(event_json)

        # Ensure our callback was called
        callback.assert_called_with(event_json)
test_event_controller.py 文件源码 项目:abodepy 作者: MisterWil 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def tests_multi_timeline_callback(self):
        """Tests that multiple timeline updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_timeline_callback(
                [TIMELINE.CAPTURE_IMAGE, TIMELINE.OPENED], callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        event_json = json.loads(IRCAMERA.timeline_event())
        events._on_timeline_update(event_json)

        # Ensure our callback was called
        callback.assert_called_with(event_json)
test_event_controller.py 文件源码 项目:abodepy 作者: MisterWil 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def tests_automations_callback(self):
        """Tests that automation updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callbacks
        automation_callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_event_callback(
                TIMELINE.AUTOMATION_EDIT_GROUP, automation_callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        events._on_automation_update('{}')

        # Our capture callback should get one, but our alarm should not
        automation_callback.assert_called_with('{}')
test_nixio.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_no_modifications(self):
        self.io._write_attr_annotations = mock.Mock()

        self.io.write_all_blocks(self.neo_blocks)
        self.io._write_attr_annotations.assert_not_called()
        self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks)

        # clearing hashes and checking again
        for k in self.io._object_hashes.keys():
            self.io._object_hashes[k] = None
        self.io.write_all_blocks(self.neo_blocks)
        self.io._write_attr_annotations.assert_not_called()

        # changing hashes to force rewrite
        for k in self.io._object_hashes.keys():
            self.io._object_hashes[k] = "_"
        self.io.write_all_blocks(self.neo_blocks)
        callcount = self.io._write_attr_annotations.call_count
        self.assertEqual(callcount, len(self.io._object_hashes))
        self.compare_blocks(self.neo_blocks, self.io.nix_file.blocks)
test_recording_processor.py 文件源码 项目:azure-python-devtools 作者: Azure 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_subscription_recording_processor_for_request(self):
        replaced_subscription_id = str(uuid.uuid4())
        rp = SubscriptionRecordingProcessor(replaced_subscription_id)

        uri_templates = ['https://management.azure.com/subscriptions/{}/providers/Microsoft.ContainerRegistry/'
                         'checkNameAvailability?api-version=2017-03-01',
                         'https://graph.windows.net/{}/applications?api-version=1.6']

        for template in uri_templates:
            mock_sub_id = str(uuid.uuid4())
            mock_request = mock.Mock()
            mock_request.uri = template.format(mock_sub_id)
            mock_request.body = self._mock_subscription_request_body(mock_sub_id)

            rp.process_request(mock_request)
            self.assertEqual(mock_request.uri, template.format(replaced_subscription_id))
            self.assertEqual(mock_request.body,
                             self._mock_subscription_request_body(replaced_subscription_id))
test_core.py 文件源码 项目:SanicMongo 作者: beepaste 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_getattr_with_under(self, *args, **kwrags):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        delegate.name = 'blabla'
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
                                           asyncio_framework,
                                           self.__module__)
        coll = coll(db, name)

        self.assertEqual(coll._get_write_mode, coll.delegate._get_write_mode)
test_core.py 文件源码 项目:SanicMongo 作者: beepaste 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_getitem(self, *args, **kwargs):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        delegate.name = 'blabla'
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
                                           asyncio_framework,
                                           self.__module__)
        coll = coll(db, name)

        other_coll = coll['other']
        self.assertTrue(isinstance(other_coll, type(coll)))
test_core.py 文件源码 项目:SanicMongo 作者: beepaste 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_find(self, *args, **kwargs):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        delegate.name = 'blabla'
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
                                           asyncio_framework,
                                           self.__module__)(db, name)
        cursor = create_class_with_framework(core.SanicMongoAgnosticCursor,
                                             asyncio_framework,
                                             self.__module__)
        self.assertIsInstance(coll.find(), cursor)
test_core.py 文件源码 项目:SanicMongo 作者: beepaste 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_item(self, *args, **kwargs):
        connection = Mock(spec=core.SanicMongoAgnosticClient)
        delegate = MagicMock()
        connection.delegate = delegate
        name = 'blabla'
        db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
                                         asyncio_framework,
                                         self.__module__)(connection, name)

        comp_coll = create_class_with_framework(
            core.SanicMongoAgnosticCollection,
            asyncio_framework,
            self.__module__)

        coll = db['bla']
        self.assertTrue(isinstance(coll, comp_coll))
test_metaprogramming.py 文件源码 项目:SanicMongo 作者: beepaste 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_asynchornize(self):

        test_mock = Mock()

        class TestClass:

            @classmethod
            def _get_db(cls):
                db = Mock()
                db._framework = asyncio_framework
                return db

            @metaprogramming.asynchronize
            def sync(self):
                test_mock()

        testobj = TestClass()
        self.assertTrue(isinstance(testobj.sync(), Future))
        yield from testobj.sync()
        self.assertTrue(test_mock.called)
test_metaprogramming.py 文件源码 项目:SanicMongo 作者: beepaste 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_asynchornize_with_exception(self):

        class TestClass:

            @classmethod
            def _get_db(cls):
                db = Mock()
                db._framework = asyncio_framework
                return db

            @metaprogramming.asynchronize
            def sync(self):
                raise Exception

        testobj = TestClass()
        with self.assertRaises(Exception):
            yield from testobj.sync()
test_metaprogramming.py 文件源码 项目:SanicMongo 作者: beepaste 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_create_attribute_with_attribute_error(self):

        test_mock = Mock()

        class BaseTestClass:

            @classmethod
            def _get_db(cls):
                db = Mock()
                db._framework = asyncio_framework
                return db

            def some_method(self):
                test_mock()

        class TestClass(BaseTestClass):

            some_method = metaprogramming.Async()

        test_class = TestClass
        with self.assertRaises(AttributeError):
            test_class.some_method = TestClass.some_method.create_attribute(
                TestClass, 'some_other_method')
test_core.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_errors_reset():
    from jenkins_epo.bot import Error, Instruction
    from jenkins_epo.utils import parse_datetime
    from jenkins_epo.extensions.core import ErrorExtension

    ext = ErrorExtension('error', Mock())
    ext.current = ext.bot.current
    ext.current.denied_instructions = []
    ext.current.errors = [
        Error('message', parse_datetime('2016-08-03T15:58:47Z')),
    ]
    ext.current.error_reset = None
    ext.process_instruction(
        Instruction(name='reset-errors', author='bot', date=datetime.utcnow())
    )
    assert ext.current.error_reset

    yield from ext.run()

    assert not ext.current.head.comment.mock_calls
test_core.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_error_denied():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import ErrorExtension

    ext = ErrorExtension('error', Mock())
    ext.current = ext.bot.current
    ext.current.denied_instructions = [Mock()]
    ext.current.errors = []
    ext.current.error_reset = None

    ext.process_instruction(
        Instruction(name='reset-denied', author='bot')
    )

    assert not ext.current.denied_instructions

    yield from ext.run()

    assert not ext.current.head.comment.mock_calls

    ext.current.denied_instructions = [Mock(author='toto')]

    yield from ext.run()

    assert ext.current.head.comment.mock_calls
test_create_jobs.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_yml_found(mocker, SETTINGS):
    GITHUB = mocker.patch('jenkins_epo.extensions.core.GITHUB')
    Job = mocker.patch('jenkins_epo.extensions.core.Job')
    from jenkins_epo.extensions.core import YamlExtension

    Job.jobs_filter = ['*', '-skip']
    SETTINGS.update(YamlExtension.SETTINGS)

    ext = YamlExtension('ext', Mock())
    ext.current = ext.bot.current
    ext.current.yaml = {'job': dict()}

    GITHUB.fetch_file_contents = CoroutineMock(
        return_value="job: command\nskip: command",
    )

    head = ext.current.head
    head.repository.url = 'https://github.com/owner/repo.git'
    head.repository.jobs = {}

    yield from ext.run()

    assert GITHUB.fetch_file_contents.mock_calls
    assert 'job' in ext.current.job_specs
    assert 'skip' not in ext.current.job_specs
test_create_jobs.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_yml_comment_dict():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import YamlExtension

    ext = YamlExtension('ext', Mock())
    ext.current = ext.bot.current
    ext.current.yaml = {}

    ext.process_instruction(Instruction(
        author='a', name='yaml', args=dict(job=dict(parameters=dict(PARAM1=1)))
    ))

    ext.process_instruction(Instruction(
        author='a', name='params', args=dict(job=dict(PARAM2=1))
    ))

    assert 'PARAM1' in ext.current.yaml['job']['parameters']
    assert 'PARAM2' in ext.current.yaml['job']['parameters']
test_create_jobs.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_job_update():
    from jenkins_epo.extensions.jenkins import CreateJobsExtension

    ext = CreateJobsExtension('createjob', Mock())
    ext.current = ext.bot.current
    ext.current.refresh_jobs = None
    ext.current.job_specs = {'new_job': Mock(config=dict())}
    ext.current.job_specs['new_job'].name = 'new_job'
    job = Mock()
    job.spec.contains.return_value = False
    ext.current.jobs = {'new_job': job}

    res = [x for x in ext.process_job_specs()]
    assert res

    action, spec = res[0]

    assert action == job.update
test_create_jobs.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_jenkins_fails_existing(mocker):
    process_job_specs = mocker.patch(
        'jenkins_epo.extensions.jenkins.CreateJobsExtension.process_job_specs'
    )

    from jenkins_yml.job import Job as JobSpec
    from jenkins_epo.extensions.jenkins import CreateJobsExtension

    ext = CreateJobsExtension('createjob', Mock())
    ext.current = ext.bot.current
    ext.current.errors = []
    ext.current.head.sha = 'cafed0d0'
    ext.current.head.repository.jobs = {'job': Mock()}
    ext.current.job_specs = dict(job=JobSpec.factory('job', 'toto'))
    job = Mock()
    ext.current.jobs = {'job': job}
    job.update = CoroutineMock(side_effect=Exception('POUET'))

    process_job_specs.return_value = [(job.update, Mock())]

    yield from ext.run()

    assert ext.current.errors
    assert job.update.mock_calls
test_stages.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_second_stage():
    from jenkins_epo.extensions.jenkins import StagesExtension

    ext = StagesExtension('stages', Mock())
    ext.current = Mock()
    ext.current.head.ref = 'pr'
    ext.current.SETTINGS.STAGES = ['build', 'test']
    ext.current.job_specs = specs = {
        'test': Mock(config=dict()),
        'missing': Mock(config=dict()),
    }
    specs['test'].name = 'test'
    specs['missing'].name = 'missing'

    ext.current.jobs = jobs = {
        'test': Mock(),
    }
    jobs['test'].list_contexts.return_value = ['test']

    ext.current.statuses = {}

    yield from ext.run()

    assert ext.current.current_stage.name == 'test'
test_stages.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_no_test_stage():
    from jenkins_epo.extensions.jenkins import StagesExtension

    ext = StagesExtension('stages', Mock())
    ext.current = Mock()
    ext.current.head.ref = 'pr'
    ext.current.SETTINGS.STAGES = ['build', 'deploy']
    ext.current.job_specs = specs = {
        'build': Mock(config=dict()),
    }
    specs['build'].name = 'build'

    ext.current.jobs = jobs = {
        'build': Mock(),
    }
    jobs['build'].list_contexts.return_value = ['build']

    ext.current.statuses = {}

    yield from ext.run()

    assert 'build' == ext.current.current_stage.name
    assert 'build' in ext.current.job_specs
test_stages.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_branches_limit():
    from jenkins_epo.extensions.jenkins import StagesExtension

    ext = StagesExtension('stages', Mock())
    ext.current = Mock()
    ext.current.head.ref = 'pr'
    ext.current.SETTINGS.STAGES = ['test']
    ext.current.job_specs = specs = {
        'job': Mock(config=dict(branches='master')),
    }
    specs['job'].name = 'job'

    ext.current.jobs = jobs = {'job': Mock()}
    jobs['job'].list_contexts.return_value = ['job']

    ext.current.statuses = {}

    yield from ext.run()

    assert ext.current.current_stage.name == 'test'
    assert 'job' not in ext.current.job_specs
test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_comment_deny():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = []
    ext.current.last_commit.date = datetime.now()
    ext.current.opm = {}
    ext.current.opm_denied = [Instruction(
        author='noncollaborator', name='opm',
        date=ext.current.last_commit.date + timedelta(hours=1),
    )]

    yield from ext.run()

    assert ext.current.head.comment.mock_calls
test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_deny_outdated_opm():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm = {}
    ext.current.opm_denied = []

    ext.process_instruction(Instruction(
        author='collaborator', name='opm',
        date=ext.current.last_commit.date - timedelta(hours=1),
    ))

    assert not ext.current.opm
test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_deny_non_collaborator_processed():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = []
    ext.current.last_commit.date = datetime.now()
    ext.current.opm = None
    ext.current.opm_denied = [Instruction(
        author='noncollaborator', name='opm',
    )]

    ext.process_instruction(Instruction(
        author='bot', name='opm-processed',
        date=ext.current.last_commit.date + timedelta(hours=2),
    ))

    assert not ext.current.opm
    assert not ext.current.opm_denied
test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_accept_lgtm():
    from jenkins_epo.bot import Instruction
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = commit_date = datetime.now()
    ext.current.opm = None
    ext.current.opm_denied = []

    ext.process_instruction(Instruction(
        author='collaborator', name='opm',
        date=commit_date + timedelta(hours=1),
    ))

    assert 'collaborator' == ext.current.opm.author
    assert not ext.current.opm_denied
test_merger.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_merge_wip():
    from jenkins_epo.extensions.core import MergerExtension

    ext = MergerExtension('merger', Mock())
    ext.current = Mock()
    ext.current.SETTINGS.COLLABORATORS = ['collaborator']
    ext.current.last_commit.date = datetime.now()
    ext.current.opm.author = 'collaborator'
    ext.current.opm.date = datetime.now()
    ext.current.opm_denied = []
    ext.current.opm_processed = None
    ext.current.wip = True

    yield from ext.run()

    assert ext.current.head.comment.mock_calls
    body = ext.current.head.comment.call_args[1]['body']
    assert '@collaborator' in body
    assert not ext.current.head.merge.mock_calls


问题


面经


文章

微信
公众号

扫码关注公众号