python类LoggerAdapter()的实例源码

server.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def server_main(args):
  """
  Implement the server side of the rpkk-router protocol.  Other than
  one PF_UNIX socket inode, this doesn't write anything to disk, so it
  can be run with minimal privileges.  Most of the work has already
  been done by the database generator, so all this server has to do is
  pass the results along to a client.
  """

  logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))

  logger.debug("[Starting]")

  if args.rpki_rtr_dir:
    try:
      os.chdir(args.rpki_rtr_dir)
    except OSError, e:
      sys.exit(e)

  kickme = None
  try:
    server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
    kickme = rpki.rtr.server.KickmeChannel(server = server)
    asyncore.loop(timeout = None)
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
  except KeyboardInterrupt:
    sys.exit(0)
  finally:
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
    if kickme is not None:
      kickme.cleanup()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, sock = None):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asynchat.async_chat.__init__(self, sock)
    self.buffer = []
    self.timer = rpki.async.timer(self.handle_timeout)
    self.restart()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, hostport):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    self.hostport = hostport
    self.client = None
    self.logger.debug("Created")
    self.queue = []
server.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def server_main(args):
  """
  Implement the server side of the rpkk-router protocol.  Other than
  one PF_UNIX socket inode, this doesn't write anything to disk, so it
  can be run with minimal privileges.  Most of the work has already
  been done by the database generator, so all this server has to do is
  pass the results along to a client.
  """

  logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))

  logger.debug("[Starting]")

  if args.rpki_rtr_dir:
    try:
      os.chdir(args.rpki_rtr_dir)
    except OSError, e:
      sys.exit(e)

  kickme = None
  try:
    server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
    kickme = rpki.rtr.server.KickmeChannel(server = server)
    asyncore.loop(timeout = None)
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
  except KeyboardInterrupt:
    sys.exit(0)
  finally:
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
    if kickme is not None:
      kickme.cleanup()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def __init__(self, sock = None):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asynchat.async_chat.__init__(self, sock)
    self.buffer = []
    self.timer = rpki.async.timer(self.handle_timeout)
    self.restart()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, hostport):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    self.hostport = hostport
    self.client = None
    self.logger.debug("Created")
    self.queue = []
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, sock = None):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asynchat.async_chat.__init__(self, sock)
    self.buffer = []
    self.timer = rpki.async.timer(self.handle_timeout)
    self.restart()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, hostport):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    self.hostport = hostport
    self.client = None
    self.logger.debug("Created")
    self.queue = []
server.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def server_main(args):
  """
  Implement the server side of the rpkk-router protocol.  Other than
  one PF_UNIX socket inode, this doesn't write anything to disk, so it
  can be run with minimal privileges.  Most of the work has already
  been done by the database generator, so all this server has to do is
  pass the results along to a client.
  """

  logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))

  logger.debug("[Starting]")

  if args.rpki_rtr_dir:
    try:
      os.chdir(args.rpki_rtr_dir)
    except OSError, e:
      sys.exit(e)

  kickme = None
  try:
    server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
    kickme = rpki.rtr.server.KickmeChannel(server = server)
    asyncore.loop(timeout = None)
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
  except KeyboardInterrupt:
    sys.exit(0)
  finally:
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
    if kickme is not None:
      kickme.cleanup()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, hostport):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    self.hostport = hostport
    self.client = None
    self.logger.debug("Created")
    self.queue = []
server.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def server_main(args):
  """
  Implement the server side of the rpkk-router protocol.  Other than
  one PF_UNIX socket inode, this doesn't write anything to disk, so it
  can be run with minimal privileges.  Most of the work has already
  been done by the database generator, so all this server has to do is
  pass the results along to a client.
  """

  logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))

  logger.debug("[Starting]")

  if args.rpki_rtr_dir:
    try:
      os.chdir(args.rpki_rtr_dir)
    except OSError, e:
      sys.exit(e)

  kickme = None
  try:
    server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
    kickme = rpki.rtr.server.KickmeChannel(server = server)
    asyncore.loop(timeout = None)
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
  except KeyboardInterrupt:
    sys.exit(0)
  finally:
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
    if kickme is not None:
      kickme.cleanup()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, sock = None):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asynchat.async_chat.__init__(self, sock)
    self.buffer = []
    self.timer = rpki.async.timer(self.handle_timeout)
    self.restart()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def __init__(self, hostport):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    self.hostport = hostport
    self.client = None
    self.logger.debug("Created")
    self.queue = []
server.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def server_main(args):
  """
  Implement the server side of the rpkk-router protocol.  Other than
  one PF_UNIX socket inode, this doesn't write anything to disk, so it
  can be run with minimal privileges.  Most of the work has already
  been done by the database generator, so all this server has to do is
  pass the results along to a client.
  """

  logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))

  logger.debug("[Starting]")

  if args.rpki_rtr_dir:
    try:
      os.chdir(args.rpki_rtr_dir)
    except OSError, e:
      sys.exit(e)

  kickme = None
  try:
    server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
    kickme = rpki.rtr.server.KickmeChannel(server = server)
    asyncore.loop(timeout = None)
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
  except KeyboardInterrupt:
    sys.exit(0)
  finally:
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
    if kickme is not None:
      kickme.cleanup()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, sock = None):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asynchat.async_chat.__init__(self, sock)
    self.buffer = []
    self.timer = rpki.async.timer(self.handle_timeout)
    self.restart()
server.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def server_main(args):
  """
  Implement the server side of the rpkk-router protocol.  Other than
  one PF_UNIX socket inode, this doesn't write anything to disk, so it
  can be run with minimal privileges.  Most of the work has already
  been done by the database generator, so all this server has to do is
  pass the results along to a client.
  """

  logger = logging.LoggerAdapter(logging.root, dict(connection = _hostport_tag()))

  logger.debug("[Starting]")

  if args.rpki_rtr_dir:
    try:
      os.chdir(args.rpki_rtr_dir)
    except OSError, e:
      sys.exit(e)

  kickme = None
  try:
    server = rpki.rtr.server.ServerChannel(logger = logger, refresh = args.refresh, retry = args.retry, expire = args.expire)
    kickme = rpki.rtr.server.KickmeChannel(server = server)
    asyncore.loop(timeout = None)
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Theorized race condition
  except KeyboardInterrupt:
    sys.exit(0)
  finally:
    signal.signal(signal.SIGINT, signal.SIG_IGN) # Observed race condition
    if kickme is not None:
      kickme.cleanup()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, sock = None):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    asynchat.async_chat.__init__(self, sock)
    self.buffer = []
    self.timer = rpki.async.timer(self.handle_timeout)
    self.restart()
http.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, hostport):
    self.logger = logging.LoggerAdapter(self.logger, dict(context = self))
    self.hostport = hostport
    self.client = None
    self.logger.debug("Created")
    self.queue = []
zenpacklib.py 文件源码 项目:docker-zenoss4 作者: krull 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, _source_location=None):

        class LogAdapter(logging.LoggerAdapter):
            def process(self, msg, kwargs):
                return '%s %s' % (self.extra['context'], msg), kwargs

        self.source_location = _source_location
        self.speclog = LogAdapter(LOG, {'context': self})
logger.py 文件源码 项目:cloudformation-validation-pipeline 作者: awslabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def config(self, request_id='CONTAINER_INIT', original_job_id=None, job_id=None,
               artifact_revision_id=None, pipeline_execution_id=None, pipeline_action=None,
               stage_name=None, pipeline_name=None, loglevel='warning', botolevel='critical'):
        """Configures logging object

        Args:
            request_id (str): lambda request id.
            original_job_id (str): [optional] pipeline job_id from first request in this run.
            job_id (str): [optional] pipeline job_id for the current invocation (differs from original_job_id if this is
                          a continuation invocation).
            artifact_revision_id (str): [optional] commit id for current revision.
            pipeline_execution_id (str): [optional] pipeline execution id (same for all actions/stages in this pipeline
                                         run).
            pipeline_action (str): [optional] pipeline action name.
            stage_name (str): [optional] pipeline stage name.
            pipeline_name (str): [optional] pipeline name.
            loglevel (str): [optional] logging verbosity, defaults to warning.
            botolevel (str): [optional] boto logging verbosity, defaults to critical.
        """

        loglevel = getattr(logging, loglevel.upper(), 20)
        botolevel = getattr(logging, botolevel.upper(), 40)
        mainlogger = logging.getLogger()
        mainlogger.setLevel(loglevel)
        logging.getLogger('boto3').setLevel(botolevel)
        logging.getLogger('botocore').setLevel(botolevel)
        logging.getLogger('nose').setLevel(botolevel)
        logging.getLogger('s3transfer').setLevel(botolevel)
        logfmt = '{"time_stamp": "%(asctime)s", "log_level": "%(levelname)s", "data": %(message)s}\n'
        if len(mainlogger.handlers) == 0:
            mainlogger.addHandler(logging.StreamHandler())
        mainlogger.handlers[0].setFormatter(logging.Formatter(logfmt))
        self.log = logging.LoggerAdapter(mainlogger, {})
        self.request_id = request_id
        self.original_job_id = original_job_id
        self.job_id = job_id
        self.pipeline_execution_id = pipeline_execution_id
        self.artifact_revision_id = artifact_revision_id
        self.pipeline_action = pipeline_action
        self.stage_name = stage_name
        self.pipeline_name = pipeline_name


问题


面经


文章

微信
公众号

扫码关注公众号