python类object()的实例源码

test_usermanager.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_verify_user(self):
        """
            Test verify user if passed user is unverified in group
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": ["@foouser"]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"),\
                patch.object(usrmgr, "_UserManager__save_config"):
            result = usrmgr.verify_user(1337, "@foouser", "foogroup")
            self.assertTrue(result)
            expected_config = {
                "foogroup": {"users": [{"id": 1337,
                                        "username": "@foouser"}]}
            }
            self.assertEqual(usrmgr.config, expected_config)
test_usermanager.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_add_user_already_in_grp(self):
        """
            Test adduser if the user is already in the passed group
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": ["@foouser"]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"),\
                patch.object(usrmgr, "_UserManager__save_config"):
            result = usrmgr.add_user("@foouser", "foogroup")
            self.assertFalse(result)
test_usermanager.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_add_user_verified(self):
        """
            Test adduser if the username and the user_id is passed
        """
        usrmgr = self.__get_dummy_object()
        self.__set_config(usrmgr, {})

        with patch.object(usrmgr, "_UserManager__load_config"),\
                patch.object(usrmgr, "_UserManager__save_config"):
            result = usrmgr.add_user("@foouser", "foogroup", user_id=1337)
            self.assertTrue(result)
            expected_config = {
                "foogroup": {"users": [{"id": 1337,
                                        "username": "@foouser"}]}
            }
            self.assertEqual(usrmgr.config, expected_config)
core.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_test_handler(testcase, *args, **kwargs):
    """
    Returns a TestHandler which will be used by the given testcase. This
    handler can be used to test log messages.

    Parameters
    ----------
    testcase: unittest.TestCase
        The test class in which the log handler will be used.
    *args, **kwargs
        Forwarded to the new TestHandler object.

    Returns
    -------
    handler: logbook.TestHandler
        The handler to use for the test case.
    """
    handler = TestHandler(*args, **kwargs)
    testcase.addCleanup(handler.close)
    return handler
test_urls.py 文件源码 项目:wagtail-sharing 作者: cfpb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        def test_view():
            pass  # pragma: no cover

        root_patterns = [
            url(r'^foo/$', url, name='foo'),
            url(r'^((?:[\w\-]+/)*)$', url, name='wagtail_serve'),
            url(r'^bar/$', url, name='bar'),
        ]

        self.patcher = patch.object(
            wagtail.wagtailcore.urls,
            'urlpatterns',
            root_patterns
        )
        self.patcher.start()
        self.addCleanup(self.patcher.stop)

        reload(wagtailsharing.urls)
        self.urlpatterns = wagtailsharing.urls.urlpatterns
test_apiutils.py 文件源码 项目:lecli 作者: rapid7 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_replace_loggroup_section(mocked_configparser_class):

    config_dir = api_utils.user_config_dir(cli.lecli.__name__)
    if not os.path.exists(api_utils.CONFIG_FILE_PATH):
        if not os.path.exists(config_dir):
            os.makedirs(config_dir)

    loggroups_section = api_utils.LOGGROUPS_SECTION
    config_parser_mock = mocked_configparser_class.return_value
    config_parser_mock.add_section(loggroups_section)
    with patch.object(api_utils.CONFIG, 'items',
                       return_value=[('test-log-group-favs', ["test-log-key1", "test-log-key2"])]):
        api_utils.replace_loggroup_section()
        assert not api_utils.CONFIG.has_section(loggroups_section)
        assert config_parser_mock.has_section(api_utils.CLI_FAVORITES_SECTION)

    try:
        os.remove(api_utils.CONFIG_FILE_PATH)
        os.rmdir(config_dir)
    except OSError:
        pass
test_server.py 文件源码 项目:ucsmsdk_samples 作者: CiscoUcs 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_valid_server_pool_add_slot(query_mock, add_mo_mock, commit_mock):
    query_mock.return_value = ComputePool(parent_mo_or_dn="org-root",
                                          name="test-pool")
    add_mo_mock.return_value = True
    commit_mock.return_value = True
    handle = UcsHandle('169.254.1.1', 'admin', 'password')

    # Scenario: Default parameters
    pool_retval = server_pool_add_slot(handle, 1, 8)
    # Verify we were passed back the correct object type
    assert isinstance(pool_retval, ComputePooledSlot)
    # Verify the ID we gave it was assigned correctly
    assert pool_retval.chassis_id == str(1)
    assert pool_retval.slot_id == str(8)


# Patch UcsHandle.commit to simulate ucsm interaction w/o real ucsm
test_rpc.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_non_backoff_exception(
        self, container_factory, rabbit_config, rpc_proxy
    ):
        """ Non-backoff exceptions are handled normally
        """
        class Boom(Exception):
            pass

        class Service(object):
            name = "service"

            @rpc
            def method(self):
                raise Boom()

        container = container_factory(Service, rabbit_config)
        container.start()

        with pytest.raises(RemoteError) as exc_info:
            rpc_proxy.service.method()
        assert exc_info.value.exc_type == "Boom"
test_rpc.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def container(self, container_factory):

        class Service(object):
            name = "service"

            @rpc
            def nothing_expected(self):
                raise Backoff()

            @rpc(expected_exceptions=self.UserException)
            def something_expected(self):
                raise Backoff()

            @rpc(expected_exceptions=(self.UserException, Backoff))
            def backoff_expected(self):
                raise Backoff()

        config = {'AMQP_URI': 'memory://localhost'}
        container = container_factory(Service, config)
        return container
conftest.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def entrypoint_tracker():

    class CallTracker(object):

        def __init__(self):
            self.calls = []

        def __len__(self):
            return len(self.calls)

        def track(self, **call):
            self.calls.append(call)

        def get_results(self):
            return [call['result'] for call in self.calls]

        def get_exceptions(self):
            return [call['exc_info'] for call in self.calls]

    return CallTracker()
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def container(self, container_factory, rabbit_config, counter):

        class Service(object):
            name = "service"

            backoff = BackoffPublisher()

            @rpc
            def slow(self):
                if counter["slow"].increment() <= 1:
                    raise SlowBackoff()
                return "slow"

            @rpc
            def quick(self):
                if counter["quick"].increment() <= 1:
                    raise QuickBackoff()
                return "quick"

        container = container_factory(Service, rabbit_config)
        container.start()

        return container
test_retry.py 文件源码 项目:nameko-amqp-retry 作者: nameko 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def container(self, container_factory, rabbit_config, counter):

        class Service(object):
            name = "service"

            backoff = BackoffPublisher()

            @rpc
            def bad(self):
                class BadBackoff(Backoff):
                    schedule = (-10, )
                raise BadBackoff()

        container = container_factory(Service, rabbit_config)
        container.start()

        return container
test_updates.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_noop_model_direct_update(self):
        """ Tests that calling update on a model with no changes will do nothing. """
        m0 = TestUpdateModel.create(count=5, text='monkey')

        with patch.object(self.session, 'execute') as execute:
            m0.update()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m0.update(count=5)
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m0.update(partition=m0.partition)

        with patch.object(self.session, 'execute') as execute:
            m0.update(cluster=m0.cluster)
test_updates.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_noop_model_assignation_update(self):
        """ Tests that assigning the same value on a model will do nothing. """
        # Create object and fetch it back to eliminate any hidden variable
        # cache effect.
        m0 = TestUpdateModel.create(count=5, text='monkey')
        m1 = TestUpdateModel.get(partition=m0.partition, cluster=m0.cluster)

        with patch.object(self.session, 'execute') as execute:
            m1.save()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m1.count = 5
            m1.save()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m1.partition = m0.partition
            m1.save()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m1.cluster = m0.cluster
            m1.save()
        assert execute.call_count == 0
test_updates.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_only_set_values_is_updated(self):
        """
        Test the updates work as expected when an object is deleted
        @since 3.9
        @jira_ticket PYTHON-657
        @expected_result the non updated column is None and the
        updated column has the set value

        @test_category object_mapper
        """

        ModelWithDefault.create(id=1, mf={1: 1}, dummy=1).save()

        item = ModelWithDefault.filter(id=1).first()
        ModelWithDefault.objects(id=1).delete()
        item.mf = {1: 2}
        udt, udt_default = UDT(age=1, mf={2:3}), UDT(age=1, mf={2:3})
        item.udt, item.udt_default = udt, udt_default
        item.save()

        self.assertEqual(ModelWithDefault.get()._as_dict(),
                         {'id': 1, 'dummy': None, 'mf': {1: 2}, "udt": udt, "udt_default": udt_default})
test_policies.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_speculative_and_timeout(self):
        """
        Test to ensure the timeout is honored when using speculative execution
        @since 3.10
        @jira_ticket PYTHON-750
        @expected_result speculative retries be schedule every fixed period, during the maximum
        period of the timeout.

        @test_category metadata
        """
        # We mock this so no messages are sent, otherwise a reponse might arrive
        # and we would not know how many hosts we queried
        with patch.object(Connection, "send_msg", return_value = 100) as mocked_send_msg:

            statement = SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (0, 1);", is_idempotent=True)

            # An OperationTimedOut is placed here in response_future,
            # that's why we can't call session.execute,which would raise it, but
            # we have to directly wait for the event
            response_future = self.session.execute_async(statement, execution_profile='spec_ep_brr_lim', timeout=2.2)
            response_future._event.wait(4)
            self.assertIsInstance(response_future._final_exception, OperationTimedOut)

            # This is because 2.2 / 0.4 + 1 = 6
            self.assertEqual(len(response_future.attempted_hosts), 6)
test_blockchain.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_genesis_block_whenCalled_thenCreatesAndReturnsBlockWithGenesisTransactions(self):
        genesis_transaction_one = Transaction(
            "0",
            "03dd1e57d05d9cab1d8d9b727568ad951ac2d9ecd082bc36f69e021b8427812924",
            500000,
            0
        )
        genesis_transaction_two = Transaction(
            "0",
            "03dd1e3defd36c8c0c7282ca1a324851efdb15f742cac0c5b258ef7b290ece9e5d",
            500000,
            0
        )
        genesis_transactions = [genesis_transaction_one, genesis_transaction_two]

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch('crankycoin.blockchain.Block') as patched_Block:
            patched_Block._calculate_block_hash.return_value = "mock_block_hash"
            subject = Blockchain()
            genesis_block = subject.get_genesis_block()

            patched_Block.assert_called_once_with(0, genesis_transactions, 0, 0, 0)
test_blockchain.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_check_hash_and_hash_pattern_whenBlockHasValidHashAndPattern_thenReturnsTrue(self):
        mock_block = Mock(Block)

        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 50
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty:
            mock_block.current_hash = "0000_valid_block_hash"
            mock_block.index = 35
            mock_block.previous_hash = "0000_valid_previous_hash"
            mock_block.transactions = [transaction]
            mock_block.nonce = 37
            mock_block.timestamp = 12341234
            subject = Blockchain()

            resp = subject._check_hash_and_hash_pattern(mock_block)

            self.assertIsNone(resp)
test_blockchain.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_check_hash_and_hash_pattern_whenBlockHasInvalidHash_thenReturnsFalse(self):
        mock_block = Mock(Block)

        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 50
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init:
            mock_block.current_hash = "0000_valid_block_hash"
            mock_block.index = 35
            mock_block.previous_hash = "0000_valid_previous_hash"
            mock_block.transactions = [transaction]
            mock_block.nonce = 37
            mock_block.timestamp = 12341234
            subject = Blockchain()

            with self.assertRaises(InvalidHash) as context:
                subject._check_hash_and_hash_pattern(mock_block)
                self.assertTrue("Block Hash Mismatch" in str(context.exception))
test_blockchain.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_check_hash_and_hash_pattern_whenBlockHasInvalidPattern_thenReturnsFalse(self):
        mock_block = Mock(Block)

        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 50
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty:
            mock_block.current_hash = "invalid_block_hash"
            mock_block.index = 35
            mock_block.previous_hash = "0000_valid_previous_hash"
            mock_block.transactions = [transaction]
            mock_block.nonce = 37
            mock_block.timestamp = 12341234
            subject = Blockchain()

            with self.assertRaises(InvalidHash) as context:
                subject._check_hash_and_hash_pattern(mock_block)
                self.assertTrue("Incompatible Block Hash" in str(context.exception))
test_blockchain.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_mine_block_whenOneTransaction_andIncorrectTransactionHash_thenReturnsNone(self):
        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 25
        transaction.fee = 0.1
        transaction.signature = "signature"
        transaction.tx_hash = "incorrect_transaction_hash"

        latest_block = Mock(Block)
        latest_block.index = 35
        latest_block.current_hash = "latest_block_hash"
        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \
                patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction:
            subject = Blockchain()

            resp = subject.mine_block("reward_address")

            self.assertIsNone(resp)
            self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2)
test_blockchain.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_mine_block_whenOneDuplicateTransaction_thenReturnsNone(self):
        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 25
        transaction.fee = 0.1
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        block_id_with_same_transaction = 38
        latest_block = Mock(Block)
        latest_block.index = 35
        latest_block.current_hash = "latest_block_hash"
        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \
                patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction, \
                patch.object(Blockchain, 'find_duplicate_transactions', return_value=block_id_with_same_transaction) as patched_find_duplicate_transactions:
            subject = Blockchain()

            resp = subject.mine_block("reward_address")

            self.assertIsNone(resp)
            self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2)
            patched_find_duplicate_transactions.asssert_called_once_with("transaction_hash")
test_blockchain.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_push_unconfirmed_transaction_thenPushesTransactionAndReturnsNone(self):
        transaction_one = {
            'from': 'from',
            'timestamp': 1498923800,
            'to': 'to',
            'amount': 1,
            'signature': 'signature_one',
            'hash': "transaction_hash_one"
        }
        with patch.object(Blockchain, '__init__', return_value=None) as patched_init:
            subject = Blockchain()
            subject.unconfirmed_transactions = Queue()

            resp = subject.push_unconfirmed_transaction(transaction_one)

            self.assertTrue(resp)
            self.assertEqual(subject.unconfirmed_transactions.qsize(), 1)
test_node.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_broadcast_transaction_thenBroadcastsToAllNodes(self):
        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \
                patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \
                patch("crankycoin.requests.post") as patched_requests:

            transaction = Transaction("source", "destination", 0, 0)
            node = FullNode("127.0.0.1", "reward_address")
            node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"}

            node.broadcast_transaction(transaction)

            patched_request_nodes_from_all.assert_called_once()
            patched_requests.assert_has_calls([
                call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'})
            ], True)
test_node.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_broadcast_transaction_whenRequestException_thenFailsGracefully(self):
        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \
                patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \
                patch("crankycoin.requests.post", side_effect=requests.exceptions.RequestException()) as patched_requests:

            transaction = Transaction("source", "destination", 0, 0)
            node = FullNode("127.0.0.1", "reward_address")
            node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"}

            node.broadcast_transaction(transaction)

            patched_request_nodes_from_all.assert_called_once()
            patched_requests.assert_has_calls([
                call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'})
            ], True)
test_node.py 文件源码 项目:crankycoin 作者: cranklin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_request_block_whenIndexIsNumeric_thenRequestsCorrectBlockFromNode(self):
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = '{"nonce": 12345, "index": 29, "transactions": [], "timestamp": 1234567890, "current_hash": "current_hash", "previous_hash": "previous_hash"}'

        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch("crankycoin.node.Block.current_hash", new_callable=PropertyMock) as patched_block_current_hash, \
                patch("crankycoin.requests.get", return_value=mock_response) as patched_requests:
            patched_block_current_hash.return_value = "current_hash"
            node = FullNode("127.0.0.1", "reward_address")

            block = node.request_block("127.0.0.2", "30013", 29)

            self.assertIsNotNone(block)
            self.assertEqual(block.index, 29)
            self.assertEqual(block.transactions, [])
            self.assertEqual(block.previous_hash, "previous_hash")
            self.assertEqual(block.current_hash, "current_hash")
            self.assertEqual(block.timestamp, 1234567890)
            self.assertEqual(block.nonce, 12345)
            patched_requests.assert_called_once_with('http://127.0.0.2:30013/block/29')
test_slot.py 文件源码 项目:pg2kinesis 作者: handshake 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test__enter__(slot):
    # returns its self

    with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
            patch.object(slot, '_send_keepalive') as mock_ka:
        assert slot == slot.__enter__(), 'Returns itself'
        assert mock_gc.call_count == 2

        assert call.set_isolation_level(0) in slot._normal_conn.method_calls, 'make sure we are in autocommit'
        assert call.cursor() in slot._repl_conn.method_calls, 'we opened a cursor'

        assert not mock_ka.called, "with no window we didn't start keep alive"

    slot._keepalive_window = 1
    with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
            patch.object(slot, '_send_keepalive') as mock_ka:
        slot.__enter__()
        assert mock_ka.called, " we started up keepalive"
test_slot.py 文件源码 项目:pg2kinesis 作者: handshake 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_delete_slot(slot):
    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=psycopg2.errorcodes.UNDEFINED_OBJECT):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.drop_replication_slot = Mock(side_effect=pe)
        slot.delete_slot()
    slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=-1):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
        with pytest.raises(psycopg2.ProgrammingError) as e_info:
            slot.delete_slot()
            slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

            assert e_info.value.pgcode == -1

    slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
    with pytest.raises(Exception):
        slot.delete_slot()
        slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
test_stream.py 文件源码 项目:pg2kinesis 作者: handshake 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test__init__():
    mock_client = Mock()
    with patch.object(boto3, 'client', return_value=mock_client):
        error_response = {'Error': {'Code': 'ResourceInUseException'}}
        mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))

        StreamWriter('blah')
        assert mock_client.create_stream.called
        assert call.get_waiter('stream_exists') in mock_client.method_calls, "We handled stream existence"

        error_response = {'Error': {'Code': 'Something else'}}
        mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))

        mock_client.reset_mock()
        with pytest.raises(ClientError):
            StreamWriter('blah')
            assert mock_client.create_stream.called
            assert call.get_waiter('stream_exists') not in mock_client.method_calls, "never reached"
test_updates.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_noop_model_direct_update(self):
        """
        Tests that calling update on a model with no changes will do nothing.
        """
        m0 = TestUpdateModel.create(self.conn, count=5, text='monkey')

        with patch.object(self.conn.session, 'execute') as execute:
            m0.update(self.conn)
        assert execute.call_count == 0

        with patch.object(self.conn.session, 'execute') as execute:
            m0.update(self.conn, count=5)
        assert execute.call_count == 0

        with self.assertRaises(ValidationError):
            m0.update(self.conn, partition=m0.partition)

        with self.assertRaises(ValidationError):
            m0.update(self.conn, cluster=m0.cluster)


问题


面经


文章

微信
公众号

扫码关注公众号