worker_tests.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:easy-job 作者: inb-co 项目源码 文件源码
def test_normal_start(self, runner, process, getLogger):
        assert isinstance(process, mock.MagicMock)
        # Arrange
        process_count = 4
        logger = getLogger.return_value = mock.MagicMock()
        expected_result = runner.return_value = "SOME_RETURN_VALUE"
        result_backend = {
            'result_backend_class': 'easy_job.result_backends.dummy.DummyBackend'
        }
        # Act
        initializer = RabbitMQInitializer(count=process_count, logger="logger", result_backend=result_backend,
                                          options={'serialization_method': 'json', 'queue_name': 'queue',
                                                   'rabbitmq_configs': {}})
        result = initializer.start()
        # Assert
        logger.log.assert_called_once_with(10, "Starting {} RabbitMQ workers...".format(process_count))

        process.assert_has_calls([mock.call(args=(mock.ANY,), target=mock.ANY), mock.call().start()] * process_count)
        self.assertEqual(result, expected_result)
        _, args, kwargs = process.mock_calls[0]
        worker_object = kwargs['args'][0]
        self.assertIsInstance(worker_object, RabbitMQWorker)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号