def setUp(self):
optparser = BaseOptions()
optparser.parseOptions(['dummyfile.xml', '--debug=%s' % logging._levelNames[log.level].lower()])
self.defaults = RawConfigParser()
configfiles = self.defaults.read(TESTCONF)
xmldata = """
<project name="testproj" code="01">
</project>
"""
node = etree.fromstring(xmldata)
self.project = Project()
self.project.configure_from_node(node, self.defaults, None)
python类_levelNames()的实例源码
def setUp(self):
"""
Prepare for a configuration parse test
"""
optparser = BaseOptions()
optparser.parseOptions(['dummyfile.xml', '--debug=%s' % logging._levelNames[log.level].lower()])
self.defaults = RawConfigParser()
configfiles = self.defaults.read(TESTCONF)
self.project = Project()
def setUp(self):
optparser = BaseOptions()
optparser.parseOptions(['dummyfile.xml', '--debug=%s' % logging._levelNames[log.level].lower()])
self.defaults = RawConfigParser()
configfiles = self.defaults.read(TESTCONF)
self.outfile = StringIO()
def setUp(self):
optparser = BaseOptions()
optparser.parseOptions(['dummyfile.xml', '--debug=%s' % logging._levelNames[log.level].lower()])
self.defaults = RawConfigParser()
configfiles = self.defaults.read(TESTCONF)
self.outfile = StringIO()
def setUp(self):
optparser = BaseOptions()
optparser.parseOptions(['dummyfile.xml', '--debug=%s' % logging._levelNames[log.level].lower()])
self.defaults = RawConfigParser()
configfiles = self.defaults.read(TESTCONF)
self.defaults.get('global', 'dns_domain_name')
xmldata = """
<project name="test" code="qtree">
<site name="sitea" type="primary" location="testlab">
<filer name="testfiler1" type="filer">
<vfiler name="vfiler01" rootaggr="aggr0">
<aggregate name="aggr01">
<volume name="testvol1"/>
</aggregate>
</vfiler>
</filer>
</site>
</project>
"""
node = etree.fromstring(xmldata)
self.proj = Project()
self.proj.configure_from_node(node, self.defaults, None)
self.volume = self.proj.get_volumes()[0]
def setUp(self):
optparser = BaseOptions()
optparser.parseOptions(['dummyfile.xml', '--debug=%s' % logging._levelNames[log.level].lower()])
self.defaults = RawConfigParser()
configfiles = self.defaults.read(TESTCONF)
self.site = Site()
self.site.name = "testsite"
self.site.type = "primary"
self.site.locaion = "testlab"
def setUp(self):
optparser = BaseOptions()
optparser.parseOptions(['dummyfile.xml', '--debug=%s' % logging._levelNames[log.level].lower()])
self.defaults = RawConfigParser()
configfiles = self.defaults.read(TESTCONF)
def _install_handlers(cp, formatters):
"""Install and return handlers"""
hlist = cp["handlers"]["keys"]
if not len(hlist):
return {}
hlist = hlist.split(",")
hlist = _strip_spaces(hlist)
handlers = {}
fixups = [] #for inter-handler references
for hand in hlist:
section = cp["handler_%s" % hand]
klass = section["class"]
fmt = section.get("formatter", "")
try:
klass = eval(klass, vars(logging))
except (AttributeError, NameError):
klass = _resolve(klass)
args = section["args"]
args = eval(args, vars(logging))
h = klass(*args)
if "level" in section:
level = section["level"]
h.setLevel(logging._levelNames[level])
if len(fmt):
h.setFormatter(formatters[fmt])
if issubclass(klass, logging.handlers.MemoryHandler):
target = section.get("target", "")
if len(target): #the target handler may not be loaded yet, so keep for later...
fixups.append((h, target))
handlers[hand] = h
#now all handlers are loaded, fixup inter-handler references...
for h, t in fixups:
h.setTarget(handlers[t])
return handlers
def logging_level(self, value):
if value is None:
value = self._default_logging_level
if isinstance(value, (bytes, unicode)):
try:
level = _levelNames[value.upper()]
except KeyError:
raise ValueError('Unrecognized logging level: {}'.format(value))
else:
try:
level = int(value)
except ValueError:
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)
def logging_level(self, value):
if value is None:
value = self._default_logging_level
if isinstance(value, (bytes, unicode)):
try:
level = _levelNames[value.upper()]
except KeyError:
raise ValueError('Unrecognized logging level: {}'.format(value))
else:
try:
level = int(value)
except ValueError:
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)
def main():
parser = argparse.ArgumentParser(description='Stream logs from rds for a set of db instances.')
parser.add_argument('--db_instance_ids', '-d', nargs='+', type=str, required=True,
help='list of db instance ids')
parser.add_argument('--minutes_in_the_past_to_start', '-m', type=int, default=0,
help=('if logs have not been written to since this many minutes ago, '
'ignore them'))
parser.add_argument('--api_call_delay_seconds', '-a', type=float, default=1.0,
help='time to wait before each API call')
parser.add_argument('--log_state_file', '-s', type=str, default='log_state.json',
help='file path for recording the state of log streaming')
parser.add_argument('--retention_days', '-r', type=int, default=7,
help='number of days to retain log metadata')
parser.add_argument('--log_level', '-l', type=str, default='INFO',
choices=['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'],
help="log level for this script's logs")
parser.add_argument('--log_filename', '-f', type=str, default='rds_tail_logs.log',
help="log filename for this script's logs")
parser.add_argument('--run_once', '-o', dest='run_once', action='store_true',
help="stream all new logs from all db instances and then exit")
parser.add_argument('--output_format', '-t', choices=['json', 'text'], default='json',
help="output format")
parser.add_argument('--aws_region_name', type=str, help="AWS region name")
parser.add_argument('--aws_profile_name', default='default', help='AWS credentials profile name')
args = parser.parse_args()
os.environ['TZ'] = 'UTC'
time.tzset()
logging.basicConfig(filename=args.log_filename, level=logging._levelNames[args.log_level],
format='%(asctime)s %(message)s')
logging.info('Starting rds log streaming with args: %s', args)
rds = RDS(args.api_call_delay_seconds, args.aws_region_name, args.aws_profile_name)
rds_tail_logs = LogTail(args.log_state_file, args.db_instance_ids,
args.minutes_in_the_past_to_start, rds,
args.retention_days, args.run_once, args.output_format)
rds_tail_logs.stream()
def logging_level(self, value):
if value is None:
value = self._default_logging_level
if isinstance(value, (bytes, unicode)):
try:
level = _levelNames[value.upper()]
except KeyError:
raise ValueError('Unrecognized logging level: {}'.format(value))
else:
try:
level = int(value)
except ValueError:
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)
def logging_level(self, value):
if value is None:
value = self._default_logging_level
if isinstance(value, (bytes, unicode)):
try:
level = _levelNames[value.upper()]
except KeyError:
raise ValueError('Unrecognized logging level: {}'.format(value))
else:
try:
level = int(value)
except ValueError:
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)
def logging_level(self, value):
if value is None:
value = self._default_logging_level
if isinstance(value, (bytes, unicode)):
try:
level = _levelNames[value.upper()]
except KeyError:
raise ValueError('Unrecognized logging level: {}'.format(value))
else:
try:
level = int(value)
except ValueError:
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)
def logging_level(self, value):
if value is None:
value = self._default_logging_level
if isinstance(value, (bytes, unicode)):
try:
level = _levelNames[value.upper()]
except KeyError:
raise ValueError('Unrecognized logging level: {}'.format(value))
else:
try:
level = int(value)
except ValueError:
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)
def set_log_level(level):
"""Set the log-level of the root logger of the logging module.
Args:
level: can be an integer such as 30 (logging.WARN), or a string such as 'WARN'
"""
if isinstance(level, str):
level = logging._levelNames[level]
logger = logging.getLogger() # gets root logger
logger.setLevel(level)
def set_log_level(level):
"""Set the log-level of the root logger of the logging module.
Args:
level: can be an integer such as 30 (logging.WARN), or a string such as 'WARN'
"""
if isinstance(level, str):
level = logging._levelNames[level]
logger = logging.getLogger() # gets root logger
logger.setLevel(level)
def logging_level(self, value):
if value is None:
value = self._default_logging_level
if isinstance(value, (bytes, unicode)):
try:
level = _levelNames[value.upper()]
except KeyError:
raise ValueError('Unrecognized logging level: {}'.format(value))
else:
try:
level = int(value)
except ValueError:
raise ValueError('Unrecognized logging level: {}'.format(value))
self._logger.setLevel(level)
def _install_handlers(cp, formatters):
"""Install and return handlers"""
hlist = cp.get("handlers", "keys")
if not len(hlist):
return {}
hlist = hlist.split(",")
hlist = _strip_spaces(hlist)
handlers = {}
fixups = [] #for inter-handler references
for hand in hlist:
sectname = "handler_%s" % hand
klass = cp.get(sectname, "class")
opts = cp.options(sectname)
if "formatter" in opts:
fmt = cp.get(sectname, "formatter")
else:
fmt = ""
try:
klass = eval(klass, vars(logging))
except (AttributeError, NameError):
klass = _resolve(klass)
args = cp.get(sectname, "args")
args = eval(args, vars(logging))
h = klass(*args)
if "level" in opts:
level = cp.get(sectname, "level")
h.setLevel(logging._levelNames[level])
if len(fmt):
h.setFormatter(formatters[fmt])
if issubclass(klass, logging.handlers.MemoryHandler):
if "target" in opts:
target = cp.get(sectname,"target")
else:
target = ""
if len(target): #the target handler may not be loaded yet, so keep for later...
fixups.append((h, target))
handlers[hand] = h
#now all handlers are loaded, fixup inter-handler references...
for h, t in fixups:
h.setTarget(handlers[t])
return handlers
def _install_handlers(cp, formatters):
"""Install and return handlers"""
hlist = cp.get("handlers", "keys")
if not len(hlist):
return {}
hlist = hlist.split(",")
hlist = _strip_spaces(hlist)
handlers = {}
fixups = [] #for inter-handler references
for hand in hlist:
sectname = "handler_%s" % hand
klass = cp.get(sectname, "class")
opts = cp.options(sectname)
if "formatter" in opts:
fmt = cp.get(sectname, "formatter")
else:
fmt = ""
try:
klass = eval(klass, vars(logging))
except (AttributeError, NameError):
klass = _resolve(klass)
args = cp.get(sectname, "args")
args = eval(args, vars(logging))
h = klass(*args)
if "level" in opts:
level = cp.get(sectname, "level")
h.setLevel(logging._levelNames[level])
if len(fmt):
h.setFormatter(formatters[fmt])
if issubclass(klass, logging.handlers.MemoryHandler):
if "target" in opts:
target = cp.get(sectname,"target")
else:
target = ""
if len(target): #the target handler may not be loaded yet, so keep for later...
fixups.append((h, target))
handlers[hand] = h
#now all handlers are loaded, fixup inter-handler references...
for h, t in fixups:
h.setTarget(handlers[t])
return handlers