python类Logger()的实例源码

doubleMA.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def analyze(context=None, results=None):
    import matplotlib.pyplot as plt
    import logbook
    logbook.StderrHandler().push_application()
    log = logbook.Logger('Algorithm')

    fig = plt.figure()
    ax1 = fig.add_subplot(211)

    results.algorithm_period_return.plot(ax=ax1,color='blue',legend=u'????')
    ax1.set_ylabel(u'??')
    results.benchmark_period_return.plot(ax=ax1,color='red',legend=u'????')

    plt.show()

# capital_base is the base value of capital
#
logging.py 文件源码 项目:combine 作者: llllllllll 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _get_logger_for_contextmanager(log):
    """Get the canonical logger from a context manager.

    Parameters
    ----------
    log : Logger or None
        The explicit logger passed to the context manager.

    Returns
    -------
    log : Logger
        The logger to use in the context manager.
    """
    if log is not None:
        return log

    # We need to walk up through the context manager, then through
    # @contextmanager and finally into the top level calling frame.
    return _logger_for_frame(_getframe(3))
worker.py 文件源码 项目:DPQ 作者: DiggerPlus 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, queues, name=None, rv_ttl=500, connection=None):  # noqa
        if connection is None:
            connection = resolve_connection()
        self.connection = connection
        if isinstance(queues, Queue):
            queues = [queues]
        self._name = name
        self.queues = queues
        self.validate_queues()
        self.rv_ttl = rv_ttl
        self._state = 'starting'
        self._is_horse = False
        self._horse_pid = 0
        self._stopped = False
        self.log = Logger('worker')
        self.failed_queue = get_failed_queue(connection=self.connection)
capture.py 文件源码 项目:GSM-scanner 作者: yosriayed 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
                 decryption_key=None, encryption_type='wpa-pwd', output_file=None,
                 decode_as=None, tshark_path=None):
        self._packets = []
        self.current_packet = 0
        self.display_filter = display_filter
        self.only_summaries = only_summaries
        self.output_file = output_file
        self.running_processes = set()
        self.loaded = False
        self.decode_as = decode_as
        self.log = logbook.Logger(self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL)
        self.tshark_path = tshark_path
        self.debug = False

        self.eventloop = eventloop
        if self.eventloop is None:
            self.setup_eventloop()
        if encryption_type and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS:
            self.encryption = (decryption_key, encryption_type.lower())
        else:
            raise UnknownEncyptionStandardException("Only the following standards are supported: %s."
                                                    % ', '.join(self.SUPPORTED_ENCRYPTION_STANDARDS))
util.py 文件源码 项目:saltyrtc-server-python 作者: saltyrtc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_logger(name=None, level=None):
    """
    Return a :class:`logbook.Logger`.

    Arguments:
        - `name`: The name of a specific sub-logger.
        - `level`: A :mod:`logbook` logging level.
    """
    if _logger_convert_level_handler is None:
        _logging_error()

    # At this point, logbook is either defined or an error has been returned
    if level is None:
        level = logbook.NOTSET
    base_name = 'saltyrtc'
    name = base_name if name is None else '.'.join((base_name, name))

    # Create new logger and add to group
    logger = logbook.Logger(name=name, level=level)
    logger_group.add_logger(logger)
    return logger
heart.py 文件源码 项目:oshino 作者: CodersOfTheNight 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def register_augments(client: processor.QClient,
                      augments_cfg: list,
                      logger: Logger):
    for augment in augments_cfg:
        if not augment.is_valid():
            logger.warn("Augment '{0}' failed to pass validation"
                        .format(augment))
            continue

        inst = augment.instance

        print(inst)
        processor.register_augment(client, augment.key, inst.activate, logger)
heart.py 文件源码 项目:oshino 作者: CodersOfTheNight 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def instrumentation(client: processor.QClient,
                    logger: Logger,
                    interval: int,
                    delta: int,
                    events_count: int,
                    pending_events: int):
    send_heartbeat(client.event, logger, int(interval * 1.5))
    send_timedelta(client.event, logger, delta, interval)
    send_metrics_count(client.event, logger, events_count)
    send_pending_events_count(client.event, logger, events_count)
heart.py 文件源码 项目:oshino 作者: CodersOfTheNight 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main_loop(cfg: Config,
                    logger: Logger,
                    transport_cls: Generic[T],
                    continue_fn: callable,
                    loop: BaseEventLoop):
    riemann = cfg.riemann
    transport = transport_cls(riemann.host, riemann.port)
    client = processor.QClient(transport)
    agents = create_agents(cfg.agents)
    register_augments(client, cfg.augments, logger)
    executor = cfg.executor_class(max_workers=cfg.executors_count)
    loop.set_default_executor(executor)

    init(agents)

    while True:
        ts = time()
        (done, pending) = await step(client,
                                     agents,
                                     timeout=cfg.interval * 1.5,
                                     loop=loop)

        te = time()
        td = te - ts
        instrumentation(client,
                        logger,
                        cfg.interval,
                        td,
                        len(client.queue.events),
                        len(pending))

        await processor.flush(client, transport, logger)
        if continue_fn():
            await asyncio.sleep(cfg.interval - int(td), loop=loop)
        else:
            logger.info("Stopping Oshino")
            break

    client.on_stop()
heart.py 文件源码 项目:oshino 作者: CodersOfTheNight 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def start_loop(cfg: Config, noop=False):
    handlers = []
    handlers.append(StreamHandler(sys.stdout, level=cfg.log_level))
    logger = Logger("Heart")
    logger.info("Initializing Oshino v{0}".format(get_version()))
    logger.info("Running forever in {0} seconds interval. Press Ctrl+C to exit"
                .format(cfg.interval))
    if cfg.sentry_dsn:
        try:
            client = SentryClient(cfg.sentry_dsn)
            handlers.append(SentryHandler(client,
                                          level=logbook.ERROR,
                                          bubble=True))
        except InvalidDsn:
            logger.warn("Invalid Sentry DSN '{0}' providen. Skipping"
                        .format(cfg.sentry_dsn))

    setup = NestedSetup(handlers)
    setup.push_application()

    loop = create_loop()
    try:
        loop.run_until_complete(main_loop(cfg,
                                          logger,
                                          cfg.riemann.transport(noop),
                                          forever,
                                          loop=loop))
    finally:
        loop.close()
__init__.py 文件源码 项目:oshino 作者: CodersOfTheNight 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_logger(self):
        return Logger(self.__class__.__name__)
logging.py 文件源码 项目:combine 作者: llllllllll 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def log_duration(operation, level='info', log=None):
    """Log the duration of some process.

    Parameters
    ----------
    operation : str
        What is being timed?
    level : str, optional
        The level to log the start and end messages at.
    log : Logger, optional
        The logger object to write to. By default this is the logger for the
        calling frame.
    """
    log = _get_logger_for_contextmanager(log)

    log.log(level.upper(), operation)
    start = datetime.now()
    try:
        yield
    finally:
        now = datetime.now()
        log.log(
            level.upper(),
            'completed {} (completed in {})',
            operation,
            naturaldelta(now - start),
        )
logging.py 文件源码 项目:combine 作者: llllllllll 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _logger_for_frame(f):
    """Return the memoized logger object for the given stackframe.

    Parameters
    ----------
    f : frame
        The frame to get the logger for.

    Returns
    -------
    logger : Logger
        The memoized logger object.
    """
    return _mem_logger(f.f_globals['__name__'])
fixtures.py 文件源码 项目:catalyst 作者: enigmampc 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def init_class_fixtures(cls):
        super(WithLogger, cls).init_class_fixtures()
        cls.log = Logger()
        cls.log_handler = cls.enter_class_context(
            cls.make_log_handler().applicationbound(),
        )
logbook_tests.py 文件源码 项目:apm-agent-python 作者: elastic 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def logbook_logger():
    return logbook.Logger(__name__)
base_controller.py 文件源码 项目:nflpool 作者: prcutler 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, request):
        self.request = request
        self.build_cache_id = static_cache.build_cache_id

        layout_render = pyramid.renderers.get_renderer('nflpool:templates/shared/_layout.pt')
        impl = layout_render.implementation()
        self.layout = impl.macros['layout']

        log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "")
        self.log = logbook.Logger(log_name)
log_service.py 文件源码 项目:nflpool 作者: prcutler 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_startup_log():
        return logbook.Logger("App")
worker.py 文件源码 项目:DPQ 作者: DiggerPlus 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def main_work_horse(self, job):
        """This is the entry point of the newly spawned work horse."""
        # After fork()'ing, always assure we are generating random sequences
        # that are different from the worker.
        random.seed()
        self._is_horse = True
        self.log = Logger('horse')

        success = self.perform_job(job)

        # os._exit() is the way to exit from childs after a fork(), in
        # constrast to the regular sys.exit()
        os._exit(int(not success))
helpers.py 文件源码 项目:OdooQuant 作者: haogefeifei 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_logger(name, debug=True):
    logbook.set_datetime_format('local')
    handler = StreamHandler(sys.stdout) if debug else NullHandler()
    handler.push_application()
    return Logger(os.path.basename(name))
log.py 文件源码 项目:python_tk_adb 作者: liwanlei 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_logger(name='monkey??????????', file_log=file_stream, level=''):
    """ get logger Factory function """
    logbook.set_datetime_format('local')

    ColorizedStderrHandler(bubble=False, level=level).push_thread()
    logbook.TimedRotatingFileHandler(
            os.path.join(LOG_DIR, '%s.log' % name),
            date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread()
    return logbook.Logger(name)
INTEGRATE_ME.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def rpc_server(socket, protocol, dispatcher):
    log = Logger('rpc_server')
    log.debug('starting up...')
    while True:
        try:
            message = socket.recv_multipart()
        except Exception as e:
            log.warning('Failed to receive message from client, ignoring...')
            log.exception(e)
            continue

        log.debug('Received message %s from %r', message[-1], message[0])

        # assuming protocol is threadsafe and dispatcher is theadsafe, as long
        # as its immutable

        def handle_client(message):
            try:
                request = protocol.parse_request(message[-1])
            except RPCError as e:
                log.exception(e)
                response = e.error_respond()
            else:
                response = dispatcher.dispatch(request)
                log.debug('Response okay: %r', response)

            # send reply
            message[-1] = response.serialize()
            log.debug('Replying %s to %r', message[-1], message[0])
            socket.send_multipart(message)

        gevent.spawn(handle_client, message)
log.py 文件源码 项目:jiekou-python3 作者: liwanlei 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_logger(name='jiekou', file_log=file_stream, level=''):
    """ get logger Factory function """
    logbook.set_datetime_format('local')

    ColorizedStderrHandler(bubble=False, level=level).push_thread()
    logbook.TimedRotatingFileHandler(
            os.path.join(LOG_DIR, '%s.log' % name),
            date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread()
    return logbook.Logger(name)
sqlitebiter.py 文件源码 项目:sqlitebiter 作者: thombashi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_logger(channel_name, log_level):
    import appconfigpy

    logger = logbook.Logger(channel_name)

    if log_level == QUIET_LOG_LEVEL:
        logger.disable()

    logger.level = log_level
    ptr.set_log_level(log_level)
    simplesqlite.set_log_level(log_level)
    appconfigpy.set_log_level(log_level)

    return logger
lognew.py 文件源码 项目:FXTest 作者: liwanlei 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_logger(name='jiekou', file_log=file_stream, level=''):
    """ get logger Factory function """
    logbook.set_datetime_format('local')

    ColorizedStderrHandler(bubble=False, level=level).push_thread()
    logbook.TimedRotatingFileHandler(
            os.path.join(LOG_DIR, '%s.log' % name),
            date_format='%Y-%m-%d-%H', bubble=True, encoding='utf-8').push_thread()
    return logbook.Logger(name)
base_controller.py 文件源码 项目:cookiecutter-pyramid-talk-python-starter 作者: mikeckennedy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, request):
        self.request = request
        self.build_cache_id = static_cache.build_cache_id

        log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "")
        self.log = logbook.Logger(log_name)
log_service.py 文件源码 项目:cookiecutter-pyramid-talk-python-starter 作者: mikeckennedy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_startup_log():
        return logbook.Logger("App")
base_controller.py 文件源码 项目:cookiecutter-course 作者: mikeckennedy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, request):
        self.request = request
        self.build_cache_id = static_cache.build_cache_id

        log_name = 'Ctrls/' + type(self).__name__.replace("Controller", "")
        self.log = logbook.Logger(log_name)
log_service.py 文件源码 项目:cookiecutter-course 作者: mikeckennedy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_startup_log():
        return logbook.Logger("App")
main.py 文件源码 项目:Pixie 作者: GetRektByMe 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        super().__init__(command_prefix=when_mentioned_or(setup_file["discord"]["command_prefix"]),
                         description="A bot for weebs programmed by Recchan")

        # Set a custom user agent for Pixie
        self.http.user_agent = user_agent

        # Logging setup
        redirect_logging()
        StreamHandler(sys.stderr).push_application()
        self.logger = Logger("Pixie")
        self.logger.level = getattr(logbook, setup_file.get("log_level", "INFO"), logbook.INFO)
        logging.root.setLevel(self.logger.level)
utils.py 文件源码 项目:exconf 作者: maginetv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_logger(logger_name="magine-services"):
    return logbook.Logger(logger_name)
yar.py 文件源码 项目:fibratus 作者: rabbitstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_init(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': MagicMock(),
        }):
            from fibratus.binding.yar import YaraBinding
            with patch('os.path.exists', return_value=True), \
                 patch('os.path.isdir', return_value=True), \
                 patch('glob.glob', return_value=['silent_banker.yar']), \
                 patch('yara.compile') as yara_compile_mock:
                    YaraBinding(outputs,
                                Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
                    yara_compile_mock.assert_called_with(os.path.join('C:\\yara-rules', 'silent_banker.yar'))


问题


面经


文章

微信
公众号

扫码关注公众号