python类Logger()的实例源码

py23.py 文件源码 项目:otRebuilder 作者: Pal3love 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def callHandlers(self, record):
        # this is the same as Python 3.5's logging.Logger.callHandlers
        c = self
        found = 0
        while c:
            for hdlr in c.handlers:
                found = found + 1
                if record.levelno >= hdlr.level:
                    hdlr.handle(record)
            if not c.propagate:
                c = None  # break out
            else:
                c = c.parent
        if (found == 0):
            if logging.lastResort:
                if record.levelno >= logging.lastResort.level:
                    logging.lastResort.handle(record)
            elif logging.raiseExceptions and not self.manager.emittedNoHandlerWarning:
                sys.stderr.write("No handlers could be found for logger"
                                 " \"%s\"\n" % self.name)
                self.manager.emittedNoHandlerWarning = True
log.py 文件源码 项目:virtualbmc 作者: umago 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, debug=False, logfile=None):
        logging.Logger.__init__(self, 'VirtualBMC')
        try:
            if logfile is not None:
                self.handler = logging.FileHandler(logfile)
            else:
                self.handler = logging.StreamHandler()

            formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
            self.handler.setFormatter(formatter)
            self.addHandler(self.handler)

            if debug:
                self.setLevel(logging.DEBUG)
            else:
                self.setLevel(logging.INFO)

        except IOError, e:
            if e.errno == errno.EACCES:
                pass
log_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def setUp(self):
        self.formatter = LogFormatter(color=False)
        # Fake color support.  We can't guarantee anything about the $TERM
        # variable when the tests are run, so just patch in some values
        # for testing.  (testing with color off fails to expose some potential
        # encoding issues from the control characters)
        self.formatter._colors = {
            logging.ERROR: u("\u0001"),
        }
        self.formatter._normal = u("\u0002")
        # construct a Logger directly to bypass getLogger's caching
        self.logger = logging.Logger('LogFormatterTest')
        self.logger.propagate = False
        self.tempdir = tempfile.mkdtemp()
        self.filename = os.path.join(self.tempdir, 'log.out')
        self.handler = self.make_handler(self.filename)
        self.handler.setFormatter(self.formatter)
        self.logger.addHandler(self.handler)
factories.py 文件源码 项目:aio-service-client 作者: alfred82santa 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __call__(self, spec=None, spec_loader=None, plugins=None,
                 parser=None, serializer=None, logger=None, **kwargs):
        if spec_loader:
            spec = load_spec_by_spec_loader(spec_loader, self.loader)

        try:
            plugins = self.iter_loaded_item_list(plugins, BasePlugin)
        except TypeError:  # pragma: no cover
            pass

        if isinstance(parser, str):
            parser = self.loader.load_class(parser)

        if isinstance(serializer, str):
            serializer = self.loader.load_class(serializer)

        try:
            logger = self.load_item(logger, Logger)
        except TypeError:  # pragma: no cover
            pass

        return super(ServiceClientFactory, self).__call__(spec=spec, plugins=plugins, parser=parser,
                                                          serializer=serializer, logger=logger, **kwargs)
directory_okta_test.py 文件源码 项目:user-sync.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setUp(self):
        class MockResponse:
            def __init__(self, status_code, data):
                self.status_code = status_code
                self.text = json.dumps(data)

        self.mock_response = MockResponse

        self.orig_directory_init = OktaDirectoryConnector.__init__

        OktaDirectoryConnector.__init__ = mock.Mock(return_value=None)
        directory = OktaDirectoryConnector({})
        directory.options = {'all_users_filter': None, 'group_filter_format': '{group}'}
        directory.logger = mock.create_autospec(logging.Logger)
        directory.groups_client = okta.UserGroupsClient('example.com', 'xyz')

        self.directory = directory
directory_okta_test.py 文件源码 项目:user-sync.py 作者: adobe-apiplatform 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def setUp(self):
        class MockResponse:
            def __init__(self, status_code, data):
                self.status_code = status_code
                self.text = json.dumps(data)

        self.mock_response = MockResponse

        self.orig_directory_init = OktaDirectoryConnector.__init__

        OktaDirectoryConnector.__init__ = mock.Mock(return_value=None)
        directory = OktaDirectoryConnector({})

        directory.logger = mock.create_autospec(logging.Logger)
        directory.groups_client = okta.UserGroupsClient('example.com', 'xyz')

        self.directory = directory
bot.py 文件源码 项目:sketal 作者: vk-brain 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def init_logger(self, logger):
        if not logger:
            logger = logging.Logger("sketal", level=logging.DEBUG if self.settings.DEBUG else logging.INFO)

        formatter = logging.Formatter(fmt=u'%(filename)-10s [%(asctime)s] %(levelname)-8s: %(message)s',
                                      datefmt='%y.%m.%d %H:%M:%S')

        file_handler = logging.FileHandler('logs.txt')
        file_handler.setLevel(logging.DEBUG)
        file_handler.setFormatter(formatter)

        self.logger_file = file_handler

        stream_handler = logging.StreamHandler()
        stream_handler.setLevel(level=logging.DEBUG if self.settings.DEBUG else logging.INFO)
        stream_handler.setFormatter(formatter)

        logger.addHandler(file_handler)
        logger.addHandler(stream_handler)

        self.logger = logger
vk_api.py 文件源码 项目:sketal 作者: vk-brain 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, vk_client, logger=None):
        if logger:
            self.logger = logger
        else:
            self.logger = logging.Logger("vk_reqque")

        self.vk_client = vk_client

        self.hold = False
        self.release = False
        self.processing = False

        self._requests_done = 0
        self.requests_done_clear_time = 0

        self.queue = asyncio.Queue()
utils.py 文件源码 项目:discover-books 作者: shispt 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_logger(name, filename, level=logging.DEBUG, fmt=None):
    logger = logging.Logger(name)

    fmt = fmt or '%(asctime)s-%(name)s-%(levelname)-10s%(message)s'
    formatter = logging.Formatter(fmt=fmt, datefmt='%Y-%m-%d %H:%M:%S')

    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(formatter)

    file_handler = logging.FileHandler(filename)
    file_handler.setFormatter(formatter)

    logger.addHandler(stream_handler)
    logger.addHandler(file_handler)

    logger.setLevel(level)

    return logger
embedding.py 文件源码 项目:embeddings 作者: vzhong 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def ensure_file(name, url=None, force=False, logger=logging.getLogger(), postprocess=None):
        """
        Ensures that the file requested exists in the cache, downloading it if it does not exist.

        Args:
            name (str): name of the file.
            url (str): url to download the file from, if it doesn't exist.
            force (bool): whether to force the download, regardless of the existence of the file.
            logger (logging.Logger): logger to log results.
            postprocess (function): a function that, if given, will be applied after the file is downloaded. The function has the signature ``f(fname)``

        Returns:
            str: file name of the downloaded file.

        """
        fname = Embedding.path(name)
        if not path.isfile(fname) or force:
            if url:
                logger.critical('Downloading from {} to {}'.format(url, fname))
                Embedding.download_file(url, fname)
                if postprocess:
                    postprocess(fname)
            else:
                raise Exception('{} does not exist!'.format(fname))
        return fname
dispatch.py 文件源码 项目:Brightside 作者: BrighterCommand 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def __init__(self,
                 connection: Connection,
                 consumer: BrightsideConsumerConfiguration,
                 consumer_factory: Callable[[Connection, BrightsideConsumerConfiguration, logging.Logger], BrightsideConsumer],
                 command_processor_factory: Callable[[str], CommandProcessor],
                 mapper_func: Callable[[BrightsideMessage], Request]) -> None:
        """
        The configuration parameters for one consumer - can create one or more performers from this, each of which is
        a message pump reading froma queue
        :param connection: The connection to the broker
        :param consumer: The consumer we want to create (routing key, queue etc)
        :param consumer_factory: A factory to create a consumer to read from a broker, a given implementation i.e. arame
        the command processor factory creates a command procesoor configured for a pipeline
        :param mapper_func: Maps between messages on the queue and requests (commnands/events)
        """
        self._connection = connection
        self._consumer = consumer
        self._consumer_factory = consumer_factory
        self._command_processor_factory = command_processor_factory
        self._mapper_func = mapper_func
__init__.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_fallback_logger(stream=None):
    global _fallback_logger
    if _fallback_logger:
        return _fallback_logger

    log_format = '%(asctime)s:%(levelname)s:%(message)s'
    formatter = logging.Formatter(log_format)

    level = logging.WARNING
    handler = logging.StreamHandler(stream)
    handler.setLevel(level)
    handler.setFormatter(formatter)

    logger = logging.Logger('powerline')
    logger.setLevel(level)
    logger.addHandler(handler)
    _fallback_logger = PowerlineLogger(None, logger, '_fallback_')
    return _fallback_logger
__init__.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_logger(self):
        '''Create logger

        This function is used to create logger unless it was already specified 
        at initialization.

        :return: Three objects:

            #. :py:class:`logging.Logger` instance.
            #. :py:class:`PowerlineLogger` instance.
            #. Function, output of :py:func:`gen_module_attr_getter`.
        '''
        return create_logger(
            common_config=self.common_config,
            use_daemon_threads=self.use_daemon_threads,
            ext=self.ext,
            imported_modules=self.imported_modules,
            stream=self.default_log_stream,
        )
case.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def __enter__(self):
        if isinstance(self.logger_name, logging.Logger):
            logger = self.logger = self.logger_name
        else:
            logger = self.logger = logging.getLogger(self.logger_name)
        formatter = logging.Formatter(self.LOGGING_FORMAT)
        handler = _CapturingHandler()
        handler.setFormatter(formatter)
        self.watcher = handler.watcher
        self.old_handlers = logger.handlers[:]
        self.old_level = logger.level
        self.old_propagate = logger.propagate
        logger.handlers = [handler]
        logger.setLevel(self.level)
        logger.propagate = False
        return handler.watcher
Cli.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def run(self, argv = None, data = None, logger = None):
        """
        Runs the function
        """
        if not logger is None:
            assert isinstance(logger, logging.Logger), "logger is not a valid logging.Logger"
            self.logger = logger
        if not data is None:
            assert isinstance(data, Configuration), "data is not a valid QXSConsolas.Configuration.Configuration"
            self.data = data

        self.options, self.arguments = self._argparser.parseArguments(argv)
        if self._argparser.loglevel == 1:
            self._configureConsoleLoggers(logging.NOTSET, True)
        elif self._argparser.loglevel == -1:
            self._configureConsoleLoggers(logging.CRITICAL, False)
        try:
            self._argparser.validateRequiredArguments()
            return self._app(ApplicationData(self))
        except Exception as e:
            logger.exception(e)
            return 1
DeployRoles.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setRoleInfo(self, logger, envname, envconfig, rolename, roleconfig):
    """
        sets the role info during deployment and backup/restores
        logger    logging.Logger
                  a logger
        envname   string
                  name of the environment, where the app should be deployed
        envconfig QXSConsolas.Configuration.Configuration
                  configuration of the environment, where the app should be deployed
        rolename   string
                   name of the environment, where the app should be deployed
        roleconfig QXSConsolas.Configuration.Configuration
                   configuration of the role, where the app shoukd be deplyed
    """
        if isinstance(logger, logging.Logger):
            self._logger = logger
        else:
            self._logger = logging.getLogger()
        self._envname = envname
        self._envconfig = envconfig
        self._rolename = rolename
        self._roleconfig = roleconfig
Cli.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def run(self, argv = None, data = None, logger = None):
        """
        Runs the function
        """
        if not logger is None:
            assert isinstance(logger, logging.Logger), "logger is not a valid logging.Logger"
            self.logger = logger
        if not data is None:
            assert isinstance(data, Configuration), "data is not a valid QXSConsolas.Configuration.Configuration"
            self.data = data

        self.options, self.arguments = self._argparser.parseArguments(argv)
        if self._argparser.loglevel == 1:
            self._configureConsoleLoggers(logging.NOTSET, True)
        elif self._argparser.loglevel == -1:
            self._configureConsoleLoggers(logging.CRITICAL, False)
        try:
            self._argparser.validateRequiredArguments()
            return self._app(ApplicationData(self))
        except Exception as e:
            logger.exception(e)
            return 1
log.py 文件源码 项目:virtualbmc 作者: openstack 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def __init__(self, debug=False, logfile=None):
        logging.Logger.__init__(self, 'VirtualBMC')
        try:
            if logfile is not None:
                self.handler = logging.FileHandler(logfile)
            else:
                self.handler = logging.StreamHandler()

            formatter = logging.Formatter(DEFAULT_LOG_FORMAT)
            self.handler.setFormatter(formatter)
            self.addHandler(self.handler)

            if debug:
                self.setLevel(logging.DEBUG)
            else:
                self.setLevel(logging.INFO)

        except IOError as e:
            if e.errno == errno.EACCES:
                pass
test_python.py 文件源码 项目:mdk 作者: datawire 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_logPassthrough(self):
        """If MDKHandler is used, logging via stdlib is passed to MDK."""
        logger = logging.Logger("mylog")
        logger.setLevel(logging.DEBUG)
        mdk, tracer = create_mdk_with_faketracer()
        session = mdk.session()
        session.trace("DEBUG")
        logger.addHandler(MDKHandler(mdk, lambda: session))

        logger.debug("debugz")
        logger.info("infoz")
        logger.warning("warnz")
        logger.error("errorz")
        logger.critical("criticalz")

        self.assertEqual(
            tracer.messages,
            [{"level": level.upper(), "category": "mylog",
              "text": level + "z", "context": session._context.traceId}
             for level in ["debug", "info", "warn", "error", "critical"]])
test_python.py 文件源码 项目:mdk 作者: datawire 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_sessions(self):
        """
        The given session's context is used; if no session is available a default
        session is used.
        """
        mdk, tracer = create_mdk_with_faketracer()
        session1, session3 = mdk.session(), mdk.session()
        def get_session(results=[session1, None, session3]):
            return results.pop(0)

        logger = logging.Logger("mylog")
        handler = MDKHandler(mdk, get_session)
        logger.addHandler(handler)
        for i in range(3):
            logger.info("hello")
        self.assertEqual([d["context"] for d in tracer.messages],
                         [s._context.traceId for s in
                          [session1, handler._default_session, session3]])
test_python.py 文件源码 项目:mdk 作者: datawire 项目源码 文件源码 阅读 79 收藏 0 点赞 0 评论 0
def tes_withinARequest(self):
        """
        When logging inside a Flask route, the MDK Session for the request is used
        if MDKLoggingHandler was set up.
        """
        logger = logging.Logger("logz")
        mdk, tracer = create_mdk_with_faketracer()
        app = make_flask_app(logger)
        mdk_setup(app, mdk=mdk)

        handler = MDKLoggingHandler(mdk)
        logger.addHandler(handler)
        client = app.test_client()
        client.get("/")
        message = tracer.messages[-1]
        self.assertEqual("hello: " + message["context"], message["text"])
heap_streamer.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, config, frame_shape, log=Logger(__name__)):
        """Creates and sets up SPEAD streams.

        The configuration of streams is passed in via the ``config`` arguent.

        The dimensions of the visibility data must be specified in order
        to initialise the payload. This is a tuple of dimensions defined in the
        ICD as:


        Args:
            config (dict): Dictionary of settings (see above).
            frame_shape (tuple): Dimensions of the payload visibility data.
            log (logging.Logger): Python logging object.
        """
        self._config = config
        self._frame_shape = frame_shape
        self._log = log
        self._heap_descriptor = self._init_heap_descriptor()
        self._streams = list()
        self._heap_counter = 0
        self._send_timer = 0
        self._heap_size = self._get_heap_size()
        self._create_streams()
        self._payload = self._init_payload()
LoggingConfig.py 文件源码 项目:robot-camera-platform 作者: danionescu0 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_logger(self) -> logging.Logger:
        formatter = logging.Formatter(fmt='%(levelname)s (%(threadName)-10s) :%(name)s: %(message)s '
                                  '(%(asctime)s; %(filename)s:%(lineno)d)',
                              datefmt="%Y-%m-%d %H:%M:%S")
        handlers = [
            logging.handlers.RotatingFileHandler(self.__filename,
                                                 encoding='utf8',
                                                 maxBytes=self.__max_bytes,
                                                 backupCount=3),
            logging.StreamHandler()
        ]
        self.__root_logger = logging.getLogger()
        if (self.__debug):
            level = logging.DEBUG
        else:
            level = logging.WARNING

        self.__root_logger.setLevel(level)
        for h in handlers:
            h.setFormatter(formatter)
            h.setLevel(level)
            self.__root_logger.addHandler(h)

        return self.__root_logger
utils.py 文件源码 项目:calmjs 作者: calmjs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def enable_pretty_logging(logger='calmjs', level=logging.DEBUG, stream=None):
    """
    Shorthand to enable pretty logging
    """

    def cleanup():
        logger.removeHandler(handler)
        logger.level = old_level

    if not isinstance(logger, logging.Logger):
        logger = logging.getLogger(logger)

    old_level = logger.level
    handler = logging.StreamHandler(stream)
    handler.setFormatter(logging.Formatter(
        u'%(asctime)s %(levelname)s %(name)s %(message)s'))
    logger.addHandler(handler)
    logger.setLevel(level)
    return cleanup
api.py 文件源码 项目:openbrokerapi 作者: eruvanos 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def serve(services: List[Service],
          credentials: BrokerCredentials,
          logger: logging.Logger = logging.root,
          port=5000,
          debug=False):
    """
    Starts flask with the given broker

    :param services: Services that this broker provides
    :param credentials: Username and password that will be required to communicate with service broker
    :param logger: Used for api logs. This will not influence Flasks logging behavior
    :param port: Port
    :param debug: Enables debugging in flask app
    """

    from flask import Flask
    app = Flask(__name__)

    blueprint = get_blueprint(services, credentials, logger)

    logger.debug("Register openbrokerapi blueprint")
    app.register_blueprint(blueprint)

    logger.info("Start Flask on 0.0.0.0:%s" % port)
    app.run('0.0.0.0', port, debug)
loader.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def __init__(self, log=None):
        """A base class for config loaders.

        log : instance of :class:`logging.Logger` to use.
              By default loger of :meth:`traitlets.config.application.Application.instance()`
              will be used

        Examples
        --------

        >>> cl = ConfigLoader()
        >>> config = cl.load_config()
        >>> config
        {}
        """
        self.clear()
        if log is None:
            self.log = self._log_default()
            self.log.debug('Using default logger')
        else:
            self.log = log
test_logging.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def test_persistent_loggers(self):
        # Logger objects are persistent and retain their configuration, even
        #  if visible references are destroyed.
        self.root_logger.setLevel(logging.INFO)
        foo = logging.getLogger("foo")
        self._watch_for_survival(foo)
        foo.setLevel(logging.DEBUG)
        self.root_logger.debug(self.next_message())
        foo.debug(self.next_message())
        self.assert_log_lines([
            ('foo', 'DEBUG', '2'),
        ])
        del foo
        # foo has survived.
        self._assertTruesurvival()
        # foo has retained its settings.
        bar = logging.getLogger("foo")
        bar.debug(self.next_message())
        self.assert_log_lines([
            ('foo', 'DEBUG', '2'),
            ('foo', 'DEBUG', '3'),
        ])
logutil.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def set_direct_console_logger(cls, loglevel=logging.INFO):
        """
        Configure and add the handler for the direct console logger.

        Parameters:
            loglevel (int): numeric value of the logging level (e.g. DEBUG == 10)

        Returns:
            logger (Logger): the root logger's child named 'console'
        """
        logger = cls.get_root_logger().getChild("console")
        logger.setLevel(logging.DEBUG)
        consolehandler = logging.StreamHandler()
        consolehandler.setLevel(loglevel)
        logger.addHandler(consolehandler)
        logger.propagate = True
        return logger
logutil.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_module_logger(cls, mod, logdir):
        """
        Returns a logging.Logger specific to the given module.
        If the logger has not yet been configured, it will be created with default options
        by LogUtil.create_module_logger()

        Parameters:
            mod (Module): module to return a logger for
            logdir (str): the log directory path

        Returns:
            (logger): logging.Logger specific to the given ec2rlcore.module
        """
        if "{}:{}".format(mod.placement, mod.name) not in cls._module_loggers:
            cls.create_module_logger(mod, logdir)

        return logging.getLogger("ec2rl").getChild("module").getChild(mod.placement).getChild(mod.name)
logger.py 文件源码 项目:odooku-compat 作者: adaptivdesign 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def log(self, lvl, msg, *args, **kwargs):
        try:
            extra = kwargs.get("extra", None)
            if extra is not None:
                metric = extra.get(METRIC_VAR, None)
                value = extra.get(VALUE_VAR, None)
                typ = extra.get(MTYPE_VAR, None)
                if metric and value and typ:
                    if typ == GAUGE_TYPE:
                        self.gauge(metric, value)
                    elif typ == COUNTER_TYPE:
                        self.increment(metric, value)
                    elif typ == HISTOGRAM_TYPE:
                        self.histogram(metric, value)
                    else:
                        pass
        except Exception:
            logging.Logger.warning(self, "Failed to log to statsd", exc_info=True)

        if msg:
            logging.Logger.log(self, lvl, msg, *args, **kwargs)


问题


面经


文章

微信
公众号

扫码关注公众号