python类patch()的实例源码

test_form.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_resource_form_create_valid(self, mock_open):
        dataset = Dataset()

        app = self._get_test_app()
        env, response = _get_resource_new_page_as_sysadmin(app, dataset['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'valid.csv', VALID_CSV)

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            submit_and_follow(app, form, env, 'save', upload_files=[upload])

        dataset = call_action('package_show', id=dataset['id'])

        assert_equals(dataset['resources'][0]['validation_status'], 'success')
        assert 'validation_timestamp' in dataset['resources'][0]
test_retry.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _test_should_sleep(self, seconds_left, slept):
        attempt = 5
        timeout = 20
        interval = 3
        randint = 2
        deadline = self.now + seconds_left
        retry = h_retry.Retry(mock.Mock(), timeout=timeout, interval=interval)

        with mock.patch('random.randint') as m_randint, \
                mock.patch('time.sleep') as m_sleep:
            m_randint.return_value = randint

            ret = retry._sleep(deadline, attempt, _EX2())

            self.assertEqual(slept, ret)
            m_randint.assert_called_once_with(1, 2 ** attempt - 1)
            m_sleep.assert_called_once_with(slept)
app_test.py 文件源码 项目:service-fabric-cli 作者: Azure 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
        """Upload copies files to non-native store correctly with no
        progress"""
        import shutil
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
        temp_src_dir = os.path.dirname(temp_file.name)
        temp_dst_dir = tempfile.mkdtemp()
        shutil_mock = MagicMock()
        shutil_mock.copyfile.return_value = None
        with patch('sfctl.custom_app.shutil', new=shutil_mock):
            sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
            shutil_mock.copyfile.assert_called_once()
        temp_file.close()
        shutil.rmtree(os.path.dirname(temp_file.name))
        shutil.rmtree(temp_dst_dir)
test_deploy.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_reboot_and_finish_deploy_force_reboot(self, power_action_mock,
                                                   get_pow_state_mock):
        d_info = self.node.driver_info
        d_info['deploy_forces_oob_reboot'] = True
        self.node.driver_info = d_info
        self.node.save()
        self.config(group='ansible',
                    post_deploy_get_power_state_retry_interval=0)
        self.node.provision_state = states.DEPLOYING
        self.node.save()

        with task_manager.acquire(self.context, self.node.uuid) as task:
            with mock.patch.object(task.driver, 'network') as net_mock:
                self.driver.reboot_and_finish_deploy(task)
                net_mock.remove_provisioning_network.assert_called_once_with(
                    task)
                net_mock.configure_tenant_networks.assert_called_once_with(
                    task)
            expected_power_calls = [((task, states.POWER_OFF),),
                                    ((task, states.POWER_ON),)]
            self.assertEqual(expected_power_calls,
                             power_action_mock.call_args_list)
        get_pow_state_mock.assert_not_called()
test_auth.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_requires_usergroup_no_acc(self):
        """
            Test requires usergroup decorator if the user has no access
        """
        with patch("ownbot.auth.User") as user_mock:
            user_mock.return_value.has_access.return_value = False

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update = self.__get_dummy_update()
            called = my_command_handler(bot_mock, update)

            self.assertIsNone(called)
test_auth.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 61 收藏 0 点赞 0 评论 0
def test_requires_usergroup_acc(self):
        """
            Test requires usergroup decorator if the user has access
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock:
            user_mock = user_mock.return_value
            user_mock.has_acces.return_value = True

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            called = my_command_handler(bot_mock, update_mock)

            self.assertTrue(called)
test_auth.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_requires_usergroup_self(self):
        """
            Test requires usergroup decorator with self as first argument.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock:
            user_mock = user_mock.return_value
            user_mock.has_acces.return_value = True

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(self, bot, update):
                """Dummy command handler"""
                print(self, bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            called = my_command_handler(None, bot_mock, update_mock)

            self.assertTrue(called)
test_auth.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_assign_first_to(self):
        """
            Test assign first to decorator.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock,\
                patch("ownbot.auth.UserManager") as usrmgr_mock:

            user_mock = user_mock.return_value
            usrmgr_mock.return_value.group_is_empty.return_value = True

            @ownbot.auth.assign_first_to("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            my_command_handler(bot_mock, update_mock)

            self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
            self.assertTrue(user_mock.save.called)
test_auth.py 文件源码 项目:ownbot 作者: michaelimfeld 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_assign_first_to_with_self(self):
        """
            Test assign first to decorator with self as first argument.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock,\
                patch("ownbot.auth.UserManager") as usrmgr_mock:

            user_mock = user_mock.return_value
            usrmgr_mock.return_value.group_is_empty.return_value = True

            @ownbot.auth.assign_first_to("foo")
            def my_command_handler(self, bot, update):
                """Dummy command handler"""
                print(self, bot, update)

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            my_command_handler(None, bot_mock, update_mock)

            self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
            self.assertTrue(user_mock.save.called)
test_driver.py 文件源码 项目:networking-huawei 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_default_security_group_rest_callback(self):
        try:
            kwargs = test_sg_create
            with mock.patch.object(self.securityGroupDb,
                                   'get_security_group',
                                   return_value=security_group):
                resp = self._mock_req_resp(requests.codes.all_good)
                with mock.patch('requests.request', return_value=resp):
                    self.secGroupSub.\
                        create_security_group(None, None, None, **kwargs)
                resp = self._mock_req_resp(requests.codes.no_content)
                with mock.patch('requests.request', return_value=resp):
                    self.secGroupSub.\
                        create_security_group(None, None, None, **kwargs)
                resp = self._mock_req_resp(requests.codes.not_implemented)
                with mock.patch('requests.request', return_value=resp):
                    self.secGroupSub.\
                        create_security_group(None, None, None, **kwargs)

        except Exception:
            pass
test_form.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_resource_form_create_invalid(self, mock_open):
        dataset = Dataset()

        app = self._get_test_app()
        env, response = _get_resource_new_page_as_sysadmin(app, dataset['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'invalid.csv', INVALID_CSV)

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            response = webtest_submit(
                form, 'save', upload_files=[upload], extra_environ=env)

        assert_in('validation', response.body)
        assert_in('missing-value', response.body)
        assert_in('Row 2 has a missing value in column 4', response.body)
test_form.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_resource_form_update_valid(self, mock_open):

        dataset = Dataset(resources=[
            {
                'url': 'https://example.com/data.csv'
            }
        ])

        app = self._get_test_app()
        env, response = _get_resource_update_page_as_sysadmin(
            app, dataset['id'], dataset['resources'][0]['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'valid.csv', VALID_CSV)

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            submit_and_follow(app, form, env, 'save', upload_files=[upload])

        dataset = call_action('package_show', id=dataset['id'])

        assert_equals(dataset['resources'][0]['validation_status'], 'success')
        assert 'validation_timestamp' in dataset['resources'][0]
test_form.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_resource_form_update_invalid(self, mock_open):

        dataset = Dataset(resources=[
            {
                'url': 'https://example.com/data.csv'
            }
        ])

        app = self._get_test_app()
        env, response = _get_resource_update_page_as_sysadmin(
            app, dataset['id'], dataset['resources'][0]['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'invalid.csv', INVALID_CSV)

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            response = webtest_submit(
                form, 'save', upload_files=[upload], extra_environ=env)

        assert_in('validation', response.body)
        assert_in('missing-value', response.body)
        assert_in('Row 2 has a missing value in column 4', response.body)
test_utils.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_delete_file_not_deleted_if_resources_first(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}'.format(resource_id)

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.get_local_upload_path',
                        return_value=path):
            delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)
        assert os.path.exists('/doesnt_exist/resources')

        patcher.tearDown()
test_utils.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_delete_passes_if_os_exeception(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}/{}/{}'.format(
            resource_id[0:3], resource_id[3:6], resource_id[6:]
        )

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.os.remove',
                        side_effect=OSError):

            delete_local_uploaded_file(resource_id)


        patcher.tearDown()
test_logic.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_validation_fails_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(INVALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            with assert_raises(t.ValidationError) as e:

                    call_action(
                        'resource_create',
                        package_id=dataset['id'],
                        format='CSV',
                        upload=mock_upload
                    )

        assert 'validation' in e.exception.error_dict
        assert 'missing-value' in str(e.exception)
        assert 'Row 2 has a missing value in column 4' in str(e.exception)
test_logic.py 文件源码 项目:ckanext-validation 作者: frictionlessdata 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_validation_passes_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(VALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            resource = call_action(
                'resource_create',
                package_id=dataset['id'],
                format='CSV',
                upload=mock_upload
            )

        assert_equals(resource['validation_status'], 'success')
        assert 'validation_timestamp' in resource
test_swift_context.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_swift_hash_env(self, mock_config, mock_service_name):
        mock_config.return_value = None
        mock_service_name.return_value = "testsvc"
        tmpfile = tempfile.mktemp()
        swift_context.SWIFT_HASH_FILE = tmpfile
        with mock.patch('lib.swift_context.os.environ.get') as mock_env_get:
            mock_env_get.return_value = str(uuid.uuid4())
            hash_ = swift_context.get_swift_hash()
            mock_env_get.assert_has_calls([
                mock.call('JUJU_MODEL_UUID'),
                mock.call('JUJU_ENV_UUID',
                          mock_env_get.return_value)
            ])

        with open(tmpfile, 'r') as fd:
            self.assertEqual(hash_, fd.read())

        self.assertTrue(mock_config.called)
base_consumer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_get_log_message(
        self,
        log_consumer_instance,
        publish_log_messages,
        log_message,
        log_topic
    ):
        with mock.patch(
            'yelp_kafka.discovery.get_region_cluster',
            return_value=get_config().cluster_config
        ):
            with log_consumer_instance as consumer:
                publish_log_messages(log_topic, log_message, count=1)
                asserter = ConsumerAsserter(
                    consumer=consumer,
                    expected_message=log_message
                )
                _message = consumer.get_message(blocking=True, timeout=TIMEOUT)
                asserter.assert_messages([_message], expected_count=1)
base_consumer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_base_consumer_without_cluster_name(
        self,
        topic,
        consumer_init_kwargs
    ):
        with mock.patch(
            'yelp_kafka.discovery.get_kafka_cluster'
        ) as mock_get_kafka_cluster, mock.patch(
            'kafka_utils.util.config.ClusterConfig.__init__',
            return_value=None
        ) as mock_cluster_config_init:
            consumer = BaseConsumer(
                topic_to_consumer_topic_state_map={topic: None},
                auto_offset_reset='largest',
                **consumer_init_kwargs
            )
            consumer._region_cluster_config
            assert mock_get_kafka_cluster.call_count == 0
            config = get_config()
            mock_cluster_config_init.assert_called_once_with(
                type='standard',
                name='data_pipeline',
                broker_list=config.kafka_broker_list,
                zookeeper=config.kafka_zookeeper
            )
copy_table_to_blackhole_table_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_setup_connections(
        self,
        base_path,
        refresh_batch,
        cluster
    ):
        with mock.patch(
            base_path + '.TransactionManager'
        ) as mock_manager, mock.patch.object(
            refresh_batch,
            'get_connection_set_from_cluster'
        ) as mock_get_conn:
            refresh_batch.setup_connections()
            mock_manager.assert_called_once_with(
                cluster_name=cluster,
                ro_replica_name=cluster,
                rw_replica_name=cluster,
                connection_set_getter=mock_get_conn
            )
copy_table_to_blackhole_table_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_after_row_processing(self, refresh_batch, write_session, rw_conn):
        with mock.patch.object(
            refresh_batch,
            'throttle_to_replication'
        ) as throttle_mock, mock.patch.object(
            refresh_batch,
            '_wait_for_throughput',
            return_value=None
        ) as mock_wait:
            # count can be anything since self.avg_throughput_cap is set to None
            refresh_batch.unlock_tables(write_session)
            refresh_batch.throttle_throughput(count=0)

        assert write_session.rollback.call_count == 1
        write_session.execute.assert_called_once_with('UNLOCK TABLES')
        assert write_session.commit.call_count == 1
        throttle_mock.assert_called_once_with(rw_conn)
        assert mock_wait.call_count == 1
        assert refresh_batch.avg_rows_per_second_cap == refresh_batch.DEFAULT_AVG_ROWS_PER_SECOND_CAP
copy_table_to_blackhole_table_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_create_table_from_src_table(
        self,
        refresh_batch,
        fake_original_table,
        fake_new_table,
        show_table_query,
        write_session
    ):
        with mock.patch.object(
            refresh_batch,
            '_execute_query',
            autospec=True
        ) as mock_execute:
            mock_execute.return_value.fetchone.return_value = [
                'test_db',
                fake_original_table
            ]
            refresh_batch.create_table_from_src_table(write_session)
            calls = [
                mock.call(write_session, show_table_query),
                mock.call(write_session, fake_new_table)
            ]
            mock_execute.assert_has_calls(calls, any_order=True)
producer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_log_error_on_exception(self, message):
        with mock.patch.object(
            data_pipeline._clog_writer.clog,
            'log_line',
            side_effect=RandomException()
        ) as mock_log_line, mock.patch.object(
            data_pipeline._clog_writer,
            'logger'
        ) as mock_logger:
            writer = ClogWriter()
            writer.publish(message)

        call_args = "Failed to scribe message - {}".format(str(message))

        assert mock_log_line.called
        assert mock_logger.error.call_args_list[0] == mock.call(call_args)
producer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_meteorite_on_off(
        self,
        create_message,
        registered_schema,
        producer,
        enable_meteorite,
        expected_call_count
    ):
        with mock.patch.object(
            data_pipeline.tools.meteorite_wrappers.StatsCounter,
            'process',
            autospec=True
        ) as mock_stats_counter:
            producer.enable_meteorite = enable_meteorite
            m = create_message(registered_schema, timeslot=1.0)
            producer.publish(m)
            assert mock_stats_counter.call_count == expected_call_count
producer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_sensu_process_called_once_inside_window(
        self,
        create_message,
        registered_schema,
        producer,
        message_count
    ):
        with mock.patch.object(
            data_pipeline.tools.sensu_ttl_alerter.SensuTTLAlerter,
            'process',
            autospec=True,
            return_value=None
        ) as mock_sensu_ttl_process:
            producer.enable_sensu = True
            m1 = create_message(registered_schema, timeslot=1.0)
            for i in range(message_count):
                producer.publish(m1)
            assert mock_sensu_ttl_process.call_count == 1
producer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_ensure_messages_published_fails_when_overpublished(
        self, topic, messages, producer, topic_offsets
    ):
        for message in messages:
            producer.publish(message)
        producer.flush()

        with pytest.raises(
            PublicationUnensurableError
        ), mock.patch.object(
            data_pipeline.producer,
            'logger'
        ) as mock_logger:
            producer.ensure_messages_published(messages[:2], topic_offsets)

            self._assert_logged_info_correct(
                mock_logger,
                len(messages),
                topic,
                topic_offsets,
                message_count=len(messages[:2])
            )
producer_test.py 文件源码 项目:data_pipeline 作者: Yelp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_publish_fails_after_retry(self, message, producer):
        # TODO(DATAPIPE-606|clin) investigate better way than mocking response
        with mock.patch.object(
            producer._kafka_producer.kafka_client,
            'send_produce_request',
            side_effect=[FailedPayloadsError]
        ) as mock_send_request, capture_new_messages(
            message.topic
        ) as get_messages, pytest.raises(
            MaxRetryError
        ):
            orig_topic_to_offset_map = self.get_orig_topic_to_offset_map(producer)
            producer.publish(message)
            producer.flush()

            messages = get_messages()
            assert len(messages) == 0
            assert mock_send_request.call_count == self.max_retry_count
            self.assert_new_topic_to_offset_map(
                producer,
                message.topic,
                orig_topic_to_offset_map,
                published_message_count=0
            )
buy_states_test.py 文件源码 项目:bitcoin-trading-system 作者: vinicius-ronconi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        client = mock.MagicMock()
        bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)
        self.system.get_pending_orders = mock.MagicMock(return_value=[])

        self.set_next_operation = mock.MagicMock()
        next_operation_patcher = mock.patch(
            'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
        )
        self.addCleanup(next_operation_patcher.stop)
        next_operation_patcher.start()
trailing_orders_test.py 文件源码 项目:bitcoin-trading-system 作者: vinicius-ronconi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _setup_operation(self, next_operation):
        client = mock.Mock()
        bootstrap = TrailingOrdersFactory().make_fake_bootstrap(
            TrailingOrderSetup.make(
                next_operation=next_operation,
                start_value=self.START_VALUE,
                stop_value=self.STOP_VALUE,
                reversal=self.REVERSAL,
                stop_loss=self.STOP_LOSS,
                operational_cost=0.2,
                profit=0.5,
            )
        )

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)


问题


面经


文章

微信
公众号

扫码关注公众号