test_slot.py 文件源码

python
阅读 42 收藏 0 点赞 0 评论 0

项目:pg2kinesis 作者: handshake 项目源码 文件源码
def test__enter__(slot):
    # returns its self

    with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
            patch.object(slot, '_send_keepalive') as mock_ka:
        assert slot == slot.__enter__(), 'Returns itself'
        assert mock_gc.call_count == 2

        assert call.set_isolation_level(0) in slot._normal_conn.method_calls, 'make sure we are in autocommit'
        assert call.cursor() in slot._repl_conn.method_calls, 'we opened a cursor'

        assert not mock_ka.called, "with no window we didn't start keep alive"

    slot._keepalive_window = 1
    with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
            patch.object(slot, '_send_keepalive') as mock_ka:
        slot.__enter__()
        assert mock_ka.called, " we started up keepalive"
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号