python类FakeLogger()的实例源码

base.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
base.py 文件源码 项目:deb-oslo.vmware 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
base.py 文件源码 项目:bilean 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setup_logging(self):
        # Assign default logs to self.LOG so we can still
        # assert on bilean logs.
        default_level = logging.INFO
        if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
            default_level = logging.DEBUG

        self.LOG = self.useFixture(
            fixtures.FakeLogger(level=default_level, format=_LOG_FORMAT))
        base_list = set([nlog.split('.')[0]
                         for nlog in logging.Logger.manager.loggerDict])
        for base in base_list:
            if base in TEST_DEFAULT_LOGLEVELS:
                self.useFixture(fixtures.FakeLogger(
                    level=TEST_DEFAULT_LOGLEVELS[base],
                    name=base, format=_LOG_FORMAT))
            elif base != 'bilean':
                self.useFixture(fixtures.FakeLogger(
                    name=base, format=_LOG_FORMAT))
base.py 文件源码 项目:python-zunclient 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
test_build_doubles.py 文件源码 项目:acceptable 作者: canonical-ols 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_logs_on_no_permissions(self):
        workdir = self.useFixture(fixtures.TempDir())
        fake_logger = self.useFixture(fixtures.FakeLogger())

        bad_path = os.path.join(workdir.path, 'path_not_readable')
        with open(bad_path, 'w') as f:
            f.write("# You can't read me")
        os.chmod(bad_path, 0)
        result = _build_doubles.extract_schemas_from_file(bad_path)

        self.assertIsNone(result)
        self.assertThat(
            fake_logger.output,
            Contains('Extracting schemas from %s' % bad_path))
        self.assertThat(
            fake_logger.output,
            Contains('Cannot extract schemas: Permission denied'))
test_build_doubles.py 文件源码 项目:acceptable 作者: canonical-ols 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_logs_on_syntax_error(self):
        workdir = self.useFixture(fixtures.TempDir())
        fake_logger = self.useFixture(fixtures.FakeLogger())

        bad_path = os.path.join(workdir.path, 'foo.py')
        with open(bad_path, 'w') as f:
            f.write("not valid pyton")

        result = _build_doubles.extract_schemas_from_file(bad_path)

        self.assertIsNone(result)
        self.assertThat(
            fake_logger.output,
            Contains('Extracting schemas from %s' % bad_path))
        self.assertThat(
            fake_logger.output,
            Contains('Cannot extract schemas: invalid syntax (foo.py, line 1)')
        )
test_executable.py 文件源码 项目:systemfixtures 作者: testing-cabal 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_listen(self):
        logger = self.useFixture(FakeLogger(format="%(asctime)s %(message)s"))
        self.executable.listen(6666)
        self.executable.sleep(1)
        self.executable.spawn()

        # Try to connect to the socket
        sock = socket.socket()
        transient = (errno.ECONNREFUSED, errno.ECONNRESET)

        attempts = 10
        for i in range(1, attempts + 1):
            try:
                sock.connect(("127.0.0.1", self.executable.port))
            except socket.error as error:  # pragma: no cover
                if error.errno in transient and i != attempts:
                    time.sleep(0.05 * i)
                    continue
                logging.error("connection attempt %d failed", i)
                raise error
            break
        self.assertEqual("127.0.0.1", sock.getsockname()[0])
test_subnet.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test__logs_if_suggests_previously_observed_neighbour(self):
        # Note: 10.0.0.0/30 --> 10.0.0.1 and 10.0.0.0.2 are usable.
        subnet = self.make_Subnet(
            cidr="10.0.0.0/30", gateway_ip=None, dns_servers=None)
        rackif = factory.make_Interface(vlan=subnet.vlan)
        now = datetime.now()
        yesterday = now - timedelta(days=1)
        factory.make_Discovery(
            ip="10.0.0.1", interface=rackif, updated=now)
        factory.make_Discovery(
            ip="10.0.0.2", interface=rackif, updated=yesterday)
        logger = self.useFixture(FakeLogger("maas"))
        ip = subnet.get_next_ip_for_allocation()
        self.assertThat(ip, Equals("10.0.0.2"))
        self.assertThat(logger.output, DocTestMatches(
            "Next IP address...observed previously..."
        ))
test_views.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test__handle_uncaught_exception_logs_other_failure(self):
        handler = views.WebApplicationHandler()
        request = make_request()
        request.path = factory.make_name("path")

        # Capture an exc_info tuple with traceback.
        exc_type = factory.make_exception_type()
        exc_msg = factory.make_name("message")
        try:
            raise exc_type(exc_msg)
        except exc_type:
            exc_info = sys.exc_info()

        with FakeLogger(views.__name__, logging.ERROR) as logger:
            handler.handle_uncaught_exception(
                request=request, resolver=get_resolver(None),
                exc_info=exc_info)

        self.assertThat(
            logger.output, DocTestMatches("""\
            500 Internal Server Error @ %s
            Traceback (most recent call last):
            ...
            maastesting.factory.TestException#...: %s
            """ % (request.path, exc_msg)))
test_subnet.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test__scan_as_admin_logs_the_fact_that_a_scan_happened(self):
        user = factory.make_admin()
        handler = SubnetHandler(user, {})
        subnet = factory.make_Subnet(version=4)
        rack = factory.make_RackController()
        factory.make_Interface(node=rack, subnet=subnet)
        logger = self.useFixture(FakeLogger())
        cidr = subnet.get_ipnetwork()
        handler.scan({
            "id": subnet.id,
        })
        # Use MatchesRegex here rather than DocTestMatches because usernames
        # can contain characters that confuse DocTestMatches (e.g. periods).
        self.assertThat(logger.output, MatchesRegex(
            "User '%s' initiated a neighbour discovery scan against subnet: %s"
            % (re.escape(user.username), re.escape(str(cidr)))))
test_dhcp.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_raises_error_when_omshell_not_connected(self):
        error = ExternalProcessError(
            returncode=2, cmd=("omshell",), output="")
        self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.')
        omshell = Mock()
        omshell.remove.side_effect = error
        mac = factory.make_mac_address()
        with FakeLogger("maas.dhcp") as logger:
            error = self.assertRaises(
                exceptions.CannotRemoveHostMap, dhcp._remove_host_map,
                omshell, mac)
        # The CannotCreateHostMap exception includes a message describing the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not remove host map for %s: "
            "The DHCP server could not be reached." % (mac),
            str(error))
        # A message is also written to the maas.dhcp logger that describes the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not remove host map for %s: "
            "The DHCP server could not be reached." % (mac),
            logger.output)
test_dhcp.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_raises_error_when_omshell_crashes(self):
        error_message = factory.make_name("error").encode("ascii")
        omshell = Mock()
        omshell.create.side_effect = ExternalProcessError(
            returncode=2, cmd=("omshell",), output=error_message)
        mac = factory.make_mac_address()
        ip = factory.make_ip_address()
        with FakeLogger("maas.dhcp") as logger:
            error = self.assertRaises(
                exceptions.CannotCreateHostMap, dhcp._create_host_map,
                omshell, mac, ip)
        # The CannotCreateHostMap exception includes a message describing the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: ..." % (mac, ip),
            str(error))
        # A message is also written to the maas.dhcp logger that describes the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: ..." % (mac, ip),
            logger.output)
test_dhcp.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_raises_error_when_omshell_not_connected(self):
        error = ExternalProcessError(
            returncode=2, cmd=("omshell",), output="")
        self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.')
        omshell = Mock()
        omshell.create.side_effect = error
        mac = factory.make_mac_address()
        ip = factory.make_ip_address()
        with FakeLogger("maas.dhcp") as logger:
            error = self.assertRaises(
                exceptions.CannotCreateHostMap, dhcp._create_host_map,
                omshell, mac, ip)
        # The CannotCreateHostMap exception includes a message describing the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: "
            "The DHCP server could not be reached." % (mac, ip),
            str(error))
        # A message is also written to the maas.dhcp logger that describes the
        # problematic mapping.
        self.assertDocTestMatches(
            "Could not create host map for %s -> %s: "
            "The DHCP server could not be reached." % (mac, ip),
            logger.output)
test_dhcp.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test__does_log_other_exceptions_when_restarting(self):
        self.patch_sudo_write_file()
        self.patch_restartService().side_effect = (
            factory.make_exception("DHCP is on strike today"))
        failover_peers = [make_failover_peer_config()]
        shared_networks = fix_shared_networks_failover(
            [make_shared_network()], failover_peers)
        with FakeLogger("maas") as logger:
            with ExpectedException(exceptions.CannotConfigureDHCP):
                yield self.configure(
                    factory.make_name('key'),
                    failover_peers, shared_networks,
                    [make_host()], [make_interface()],
                    make_global_dhcp_snippets())
        self.assertDocTestMatches(
            "DHCPv... server failed to restart: "
            "DHCP is on strike today", logger.output)
test_power.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_query_all_nodes_swallows_PowerActionFail(self):
        node1, node2 = self.make_nodes(2)
        new_state_2 = self.pick_alternate_state(node2['power_state'])
        get_power_state = self.patch(power, 'get_power_state')
        error_msg = factory.make_name("error")
        get_power_state.side_effect = [
            fail(exceptions.PowerActionFail(error_msg)),
            succeed(new_state_2),
        ]
        suppress_reporting(self)

        with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
            yield power.query_all_nodes([node1, node2])

        self.assertDocTestMatches(
            """\
            hostname-...: Could not query power state: %s.
            hostname-...: Power state has changed from ... to ...
            """ % error_msg,
            maaslog.output)
test_power.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_query_all_nodes_swallows_PowerError(self):
        node1, node2 = self.make_nodes(2)
        new_state_2 = self.pick_alternate_state(node2['power_state'])
        get_power_state = self.patch(power, 'get_power_state')
        error_msg = factory.make_name("error")
        get_power_state.side_effect = [
            fail(PowerError(error_msg)),
            succeed(new_state_2),
        ]
        suppress_reporting(self)

        with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
            yield power.query_all_nodes([node1, node2])

        self.assertDocTestMatches(
            """\
            %s: Could not query power state: %s.
            %s: Power state has changed from %s to %s.
            """ % (node1['hostname'], error_msg,
                   node2['hostname'], node2['power_state'], new_state_2),
            maaslog.output)
test_power.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 59 收藏 0 点赞 0 评论 0
def test_query_all_nodes_swallows_NoSuchNode(self):
        node1, node2 = self.make_nodes(2)
        new_state_2 = self.pick_alternate_state(node2['power_state'])
        get_power_state = self.patch(power, 'get_power_state')
        get_power_state.side_effect = [
            fail(exceptions.NoSuchNode()),
            succeed(new_state_2),
        ]
        suppress_reporting(self)

        with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
            yield power.query_all_nodes([node1, node2])

        self.assertDocTestMatches(
            """\
            hostname-...: Could not update power state: no such node.
            hostname-...: Power state has changed from ... to ...
            """,
            maaslog.output)
test_service_monitor.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test___performServiceAction_logs_error_if_action_fails(self):
        service = make_fake_service(SERVICE_STATE.ON)
        service_monitor = self.make_service_monitor()
        mock_execSystemDServiceAction = self.patch(
            service_monitor, "_execSystemDServiceAction")
        error_output = factory.make_name("error")
        mock_execSystemDServiceAction.return_value = (1, "", error_output)
        action = factory.make_name("action")
        with FakeLogger(
                "maas.service_monitor", level=logging.ERROR) as maaslog:
            with ExpectedException(ServiceActionError):
                yield service_monitor._performServiceAction(service, action)

        self.assertDocTestMatches(
            "Service '%s' failed to %s: %s" % (
                service.name, action, error_output),
            maaslog.output)
test_service_monitor.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test___ensureService_logs_warning_in_mismatch_process_state(self):
        service = make_fake_service(SERVICE_STATE.ON)
        service_monitor = self.make_service_monitor([service])

        invalid_process_state = factory.make_name("invalid_state")
        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.return_value = succeed(
            ServiceState(SERVICE_STATE.ON, invalid_process_state))

        with FakeLogger(
                "maas.service_monitor", level=logging.WARNING) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertDocTestMatches(
            "Service '%s' is %s but not in the expected state of "
            "'%s', its current state is '%s'." % (
                service.service_name, SERVICE_STATE.ON.value,
                service_monitor.PROCESS_STATE[SERVICE_STATE.ON],
                invalid_process_state),
            maaslog.output)
test_service_monitor.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test___ensureService_logs_debug_in_expected_states(self):
        state = SERVICE_STATE.ON
        service = make_fake_service(state)
        service_monitor = self.make_service_monitor([service])

        expected_process_state = service_monitor.PROCESS_STATE[state]
        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.return_value = succeed(
            ServiceState(SERVICE_STATE.ON, expected_process_state))

        with FakeLogger(
                "maas.service_monitor", level=logging.DEBUG) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertDocTestMatches(
            "Service '%s' is %s and '%s'." % (
                service.service_name, state, expected_process_state),
            maaslog.output)
test_service_monitor.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def test___ensureService_logs_mismatch_for_dead_process_state(self):
        service = make_fake_service(SERVICE_STATE.OFF)
        service_monitor = self.make_service_monitor([service])

        invalid_process_state = factory.make_name("invalid")
        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.return_value = succeed(
            ServiceState(SERVICE_STATE.DEAD, invalid_process_state))

        with FakeLogger(
                "maas.service_monitor", level=logging.WARNING) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertDocTestMatches(
            "Service '%s' is %s but not in the expected state of "
            "'%s', its current state is '%s'." % (
                service.service_name, SERVICE_STATE.DEAD.value,
                service_monitor.PROCESS_STATE[SERVICE_STATE.DEAD],
                invalid_process_state),
            maaslog.output)
test_service_monitor.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test___ensureService_performs_start_for_off_service(self):
        service = make_fake_service(SERVICE_STATE.ON)
        service_monitor = self.make_service_monitor([service])

        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.side_effect = [
            succeed(ServiceState(SERVICE_STATE.OFF, "waiting")),
            succeed(ServiceState(SERVICE_STATE.ON, "running")),
            ]
        mock_performServiceAction = self.patch(
            service_monitor, "_performServiceAction")
        mock_performServiceAction.return_value = succeed(None)

        with FakeLogger(
                "maas.service_monitor", level=logging.INFO) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertThat(
            mock_performServiceAction, MockCalledOnceWith(service, "start"))
        self.assertDocTestMatches(
            """\
            Service '%s' is not on, it will be started.
            Service '%s' has been started and is 'running'.
            """ % (service.service_name, service.service_name),
            maaslog.output)
test_service_monitor.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test__ensureService_performs_stop_for_on_service(self):
        service = make_fake_service(SERVICE_STATE.OFF)
        service_monitor = self.make_service_monitor([service])

        mock_getServiceState = self.patch(
            service_monitor, "getServiceState")
        mock_getServiceState.side_effect = [
            succeed(ServiceState(SERVICE_STATE.ON, "running")),
            succeed(ServiceState(SERVICE_STATE.OFF, "waiting")),
            ]
        mock_performServiceAction = self.patch(
            service_monitor, "_performServiceAction")
        mock_performServiceAction.return_value = succeed(None)

        with FakeLogger(
                "maas.service_monitor", level=logging.INFO) as maaslog:
            yield service_monitor._ensureService(service)
        self.assertThat(
            mock_performServiceAction, MockCalledOnceWith(service, "stop"))
        self.assertDocTestMatches(
            """\
            Service '%s' is not off, it will be stopped.
            Service '%s' has been stopped and is 'waiting'.
            """ % (service.service_name, service.service_name),
            maaslog.output)
test_image_download_service.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_logs_other_errors(self):
        service = ImageDownloadService(
            sentinel.rpc, sentinel.tftp_root, Clock())

        maybe_start_download = self.patch(service, "maybe_start_download")
        maybe_start_download.return_value = defer.fail(
            ZeroDivisionError("Such a shame I can't divide by zero"))

        with FakeLogger("maas") as maaslog, TwistedLoggerFixture() as logger:
            d = service.try_download()

        self.assertEqual(None, extract_result(d))
        self.assertDocTestMatches(
            "Failed to download images: "
            "Such a shame I can't divide by zero",
            maaslog.output)
        self.assertDocTestMatches(
            """\
            Downloading images failed.
            Traceback (most recent call last):
            Failure: builtins.ZeroDivisionError: Such a shame ...
            """,
            logger.output)
test_node_power_monitor_service.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_try_query_nodes_logs_other_errors(self):
        service = self.make_monitor_service()
        self.patch(npms, "getRegionClient").return_value = sentinel.client
        sentinel.client.localIdent = factory.make_UUID()

        query_nodes = self.patch(service, "query_nodes")
        query_nodes.return_value = fail(
            ZeroDivisionError("Such a shame I can't divide by zero"))

        with FakeLogger("maas") as maaslog, TwistedLoggerFixture():
            d = service.try_query_nodes()

        self.assertEqual(None, extract_result(d))
        self.assertDocTestMatches(
            "Failed to query nodes' power status: "
            "Such a shame I can't divide by zero",
            maaslog.output)
base.py 文件源码 项目:eclsdk 作者: nttcom 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        """Run before each test method to initialize test environment."""

        super(TestCase, self).setUp()
        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
        try:
            test_timeout = int(test_timeout)
        except ValueError:
            # If timeout value is invalid do not set a timeout.
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

        self.log_fixture = self.useFixture(fixtures.FakeLogger())
tools.py 文件源码 项目:CAL 作者: HPCC-Cloud-Computing 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        super(StandardLogging, self).setUp()

        # set root logger to debug
        root = std_logging.getLogger()
        root.setLevel(std_logging.DEBUG)

        # supports collecting debug level for local runs
        if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
            level = std_logging.DEBUG
        else:
            level = std_logging.INFO

        # Collect logs
        fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
        self.logger = self.useFixture(
            fixtures.FakeLogger(format=fs, level=None))
        # TODO(sdague): why can't we send level through the fake
        # logger? Tests prove that it breaks, but it's worth getting
        # to the bottom of.
        root.handlers[0].setLevel(level)

        if level > std_logging.DEBUG:
            # Just attempt to format debug level logs, but don't save them
            handler = NullHandler()
            self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
            handler.setLevel(std_logging.DEBUG)

            # Don't log every single DB migration step
            std_logging.getLogger(
                'migrate.versioning.api').setLevel(std_logging.WARNING)
base.py 文件源码 项目:deckhand 作者: att-comdev 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        super(DeckhandTestCase, self).setUp()
        self.useFixture(fixtures.FakeLogger('deckhand'))
obj_fixtures.py 文件源码 项目:deb-oslo.versionedobjects 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        super(StandardLogging, self).setUp()

        # set root logger to debug
        root = logging.getLogger()
        root.setLevel(logging.DEBUG)

        # supports collecting debug level for local runs
        if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
            level = logging.DEBUG
        else:
            level = logging.INFO

        # Collect logs
        fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
        self.logger = self.useFixture(
            fixtures.FakeLogger(format=fs, level=None))
        # TODO(sdague): why can't we send level through the fake
        # logger? Tests prove that it breaks, but it's worth getting
        # to the bottom of.
        root.handlers[0].setLevel(level)

        if level > logging.DEBUG:
            # Just attempt to format debug level logs, but don't save them
            handler = NullHandler()
            self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
            handler.setLevel(logging.DEBUG)
base.py 文件源码 项目:imapautofiler 作者: dhellmann 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setUp(self):
        super().setUp()
        # Capture logging
        self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
        # Capturing printing
        stdout = self.useFixture(fixtures.StringStream('stdout')).stream
        self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))


问题


面经


文章

微信
公众号

扫码关注公众号