test_pump.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:cyphon 作者: dunbarcyber 项目源码 文件源码
def test_normal_streaming_query(self):
        """
        Tests the start method for a Pump with a streaming Pipe and a query that
        doesn't exceed the Pipe's specs.
        """
        with LogCapture() as log_capture:
            self.stream_pump._factor_query = Mock(return_value=[self.subquery1])
            self.stream_pump._process_streaming_query = Mock()
            self.stream_pump.start(self.subquery1)

            # check that _factor_query() was called with the value that was
            # passed to start()
            self.stream_pump._factor_query.assert_called_once_with(self.subquery1)

            # check that _process_nonstreaming_queries() was called with the
            # first element of the query list returned by _factor_query()
            self.stream_pump._process_streaming_query.assert_called_once_with(
                self.subquery1)

            log_capture.check()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号