def create_logger():
"""
Setup the logging environment
"""
log = logging.getLogger() # root logger
log.setLevel(logging.INFO)
format_str = '%(asctime)s - %(levelname)-8s - %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
if HAVE_COLORLOG and os.isatty(2):
cformat = '%(log_color)s' + format_str
colors = {'DEBUG': 'reset',
'INFO': 'reset',
'WARNING': 'bold_yellow',
'ERROR': 'bold_red',
'CRITICAL': 'bold_red'}
formatter = colorlog.ColoredFormatter(cformat, date_format,
log_colors=colors)
else:
formatter = logging.Formatter(format_str, date_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
log.addHandler(stream_handler)
return logging.getLogger(__name__)
python类INFO的实例源码
def init_logger(self, args):
level = logging.INFO
if args.verbose:
level = logging.VERBOSE
if args.debug:
level = logging.DEBUG
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s',
level=level)
Rthandler = RotatingFileHandler('arbitrage.log', maxBytes=100*1024*1024,backupCount=10)
Rthandler.setLevel(level)
formatter = logging.Formatter('%(asctime)-12s [%(levelname)s] %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
def setup_logging(log_level=logging.INFO):
"""Set up the logging."""
logging.basicConfig(level=log_level)
fmt = ("%(asctime)s %(levelname)s (%(threadName)s) "
"[%(name)s] %(message)s")
colorfmt = "%(log_color)s{}%(reset)s".format(fmt)
datefmt = '%Y-%m-%d %H:%M:%S'
# Suppress overly verbose logs from libraries that aren't helpful
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('aiohttp.access').setLevel(logging.WARNING)
try:
from colorlog import ColoredFormatter
logging.getLogger().handlers[0].setFormatter(ColoredFormatter(
colorfmt,
datefmt=datefmt,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
))
except ImportError:
pass
logger = logging.getLogger('')
logger.setLevel(log_level)
def setup_logging(verbose=0, colors=False, name=None):
"""Configure console logging. Info and below go to stdout, others go to stderr.
:param int verbose: Verbosity level. > 0 print debug statements. > 1 passed to sphinx-build.
:param bool colors: Print color text in non-verbose mode.
:param str name: Which logger name to set handlers to. Used for testing.
"""
root_logger = logging.getLogger(name)
root_logger.setLevel(logging.DEBUG if verbose > 0 else logging.INFO)
formatter = ColorFormatter(verbose > 0, colors)
if colors:
colorclass.Windows.enable()
handler_stdout = logging.StreamHandler(sys.stdout)
handler_stdout.setFormatter(formatter)
handler_stdout.setLevel(logging.DEBUG)
handler_stdout.addFilter(type('', (logging.Filter,), {'filter': staticmethod(lambda r: r.levelno <= logging.INFO)}))
root_logger.addHandler(handler_stdout)
handler_stderr = logging.StreamHandler(sys.stderr)
handler_stderr.setFormatter(formatter)
handler_stderr.setLevel(logging.WARNING)
root_logger.addHandler(handler_stderr)
def __init__(self, queue, DEBUG=config.DEBUG, reset=False, socksport=None):
if not socksport:
socksport = config.SOCKS_PORT
## TODO add checks that a socks proxy is even open
## TODO add Tor checks to make sure circuits are operating
threading.Thread.__init__(self)
self.reset = reset # Whether to check if a url has been collected
self.queue = queue # Multithreading queue of urls
self.proxysettings = [
'--proxy=127.0.0.1:%s' % socksport,
'--proxy-type=socks5',
]
#self.proxysettings = [] # DEBUG
#self.ignore_ssl = ['--ignore-ssl-errors=true', '--ssl-protocols=any']
self.ignore_ssl = []
self.service_args = self.proxysettings + self.ignore_ssl
self.failcount = 0 # Counts failures
self.donecount = 0 # Counts successes
self.tor = tor.tor() # Manages Tor via control port
if DEBUG: # PhantomJS sends a lot of data if debug set to DEBUG
logging.basicConfig(level=logging.INFO)
def init_logging(logfile):
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(module)s: %(message)s',
datefmt='%m/%d/%Y %H:%M:%S' )
fh = logging.FileHandler(logfile)
# ch = logging.StreamHandler()
fh.setFormatter(formatter)
# ch.setFormatter(formatter)
# fh.setLevel(logging.INFO)
# ch.setLevel(logging.INFO)
# logging.getLogger().addHandler(ch)
logging.getLogger().addHandler(fh)
logging.getLogger().setLevel(logging.INFO)
return logging
# prepare logging.
def init_logger(logger_name):
# initialize logger
log = logging.getLogger(logger_name)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s"
" - %(message)s'"))
log.addHandler(_h)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
return log
def hidden_cursor(file):
# The Windows terminal does not support the hide/show cursor ANSI codes,
# even via colorama. So don't even try.
if WINDOWS:
yield
# We don't want to clutter the output with control characters if we're
# writing to a file, or if the user is running with --quiet.
# See https://github.com/pypa/pip/issues/3418
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
yield
else:
file.write(HIDE_CURSOR)
try:
yield
finally:
file.write(SHOW_CURSOR)
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def main():
args = get_args()
logging.basicConfig(
format='%(asctime)s %(message)s',
filename=os.path.join(args.outdir, "NanoQC.log"),
level=logging.INFO)
logging.info("NanoQC started.")
sizeRange = length_histogram(
fqin=gzip.open(args.fastq, 'rt'),
name=os.path.join(args.outdir, "SequenceLengthDistribution.png"))
fq = get_bin(gzip.open(args.fastq, 'rt'), sizeRange)
logging.info("Using {} reads for plotting".format(len(fq)))
fqbin = [dat[0] for dat in fq]
qualbin = [dat[1] for dat in fq]
logging.info("Creating plots...")
per_base_sequence_content_and_quality(fqbin, qualbin, args.outdir, args.format)
logging.info("per base sequence content and quality completed.")
logging.info("Finished!")
def init_logging(logfile, debug=True, level=None):
"""
Simple configuration of logging.
"""
if debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# allow user to override exact log_level
if level:
log_level = level
logging.basicConfig(level=log_level,
format='%(asctime)s %(levelname)-8s [%(name)s] %(message)s',
filename=logfile,
filemode='a')
return logging.getLogger("circus")
def hidden_cursor(file):
# The Windows terminal does not support the hide/show cursor ANSI codes,
# even via colorama. So don't even try.
if WINDOWS:
yield
# We don't want to clutter the output with control characters if we're
# writing to a file, or if the user is running with --quiet.
# See https://github.com/pypa/pip/issues/3418
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
yield
else:
file.write(HIDE_CURSOR)
try:
yield
finally:
file.write(SHOW_CURSOR)
def open_spinner(message):
# Interactive spinner goes directly to sys.stdout rather than being routed
# through the logging system, but it acts like it has level INFO,
# i.e. it's only displayed if we're at level INFO or better.
# Non-interactive spinner goes through the logging system, so it is always
# in sync with logging configuration.
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
spinner = InteractiveSpinner(message)
else:
spinner = NonInteractiveSpinner(message)
try:
with hidden_cursor(sys.stdout):
yield spinner
except KeyboardInterrupt:
spinner.finish("canceled")
raise
except Exception:
spinner.finish("error")
raise
else:
spinner.finish("done")
def main():
# Set up a console logger.
console = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(name)-12s:%(levelname)-8s: %(message)s")
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
logging.getLogger().setLevel(logging.INFO)
kw = {}
longopts = ['domainname=', 'verbose']
opts, args = getopt.getopt(sys.argv[1:], 'v', longopts)
for opt, val in opts:
if opt == '--domainname':
kw['domainName'] = val
if opt in ['-v', '--verbose']:
kw['verbose'] = True
a = QApplication(sys.argv)
QObject.connect(a,SIGNAL("lastWindowClosed()"),a,SLOT("quit()"))
w = BrowseWindow(**kw)
w.show()
a.exec_()
def ConvertLog4ToCFLevel( log4level ):
if log4level == logging.FATAL+1 :
return CF.LogLevels.OFF
if log4level == logging.FATAL :
return CF.LogLevels.FATAL
if log4level == logging.ERROR :
return CF.LogLevels.ERROR
if log4level == logging.WARN :
return CF.LogLevels.WARN
if log4level == logging.INFO :
return CF.LogLevels.INFO
if log4level == logging.DEBUG :
return CF.LogLevels.DEBUG
if log4level == logging.TRACE :
return CF.LogLevels.TRACE
if log4level == logging.NOTSET:
return CF.LogLevels.ALL
return CF.LogLevels.INFO
def ConvertToLog4Level( newLevel ):
level = logging.INFO
if newLevel == CF.LogLevels.OFF :
level=logging.FATAL+1
if newLevel == CF.LogLevels.FATAL :
level=logging.FATAL
if newLevel == CF.LogLevels.ERROR :
level=logging.ERROR
if newLevel == CF.LogLevels.WARN :
level=logging.WARN
if newLevel == CF.LogLevels.INFO:
level=logging.INFO
if newLevel == CF.LogLevels.DEBUG:
level=logging.DEBUG
if newLevel == CF.LogLevels.TRACE:
level=logging.TRACE
if newLevel == CF.LogLevels.ALL:
level=logging.TRACE
return level
def __init__(self, resource=None ):
self._mgr_lock = threading.Lock()
self._ecm = None
self._logger = logging.getLogger("ossie.events.Manager")
self._logger.setLevel(logging.INFO)
self._allow = True
self._registrations=[]
if resource :
try:
self._logger.debug("Requesting Domain Manager Access....")
dom = resource.getDomainManager()
self._logger.debug("Requesting EventChannelManager Access....")
self._ecm = dom.getRef()._get_eventChannelMgr()
self._logger.debug("Acquired reference to EventChannelManager")
except:
#print traceback.format_exc()
self._logger.warn("EventChannelManager - unable to resolve DomainManager's EventChannelManager ")
pass
def run(self, args=None, namespace=None):
options = self.parser.parse_args(args=args, namespace=namespace)
enable_pretty_logging()
logger = logging.getLogger(__name__)
# todo configure_logger() method ?
if options.debug:
logging.getLogger('root').setLevel(logging.INFO)
if options.verbose:
if options.verbose >= 1:
logging.getLogger('root').setLevel(logging.DEBUG)
if options.verbose >= 2:
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO if options.verbose < 2 else logging.DEBUG)
try:
handler = options.handler
except AttributeError as e:
if not callable(self.default_handler):
raise
handler = None
return (handler or self.default_handler)(logger, options)
def setup(name=__name__, level=logging.INFO):
logger = logging.getLogger(name)
if logger.handlers:
return logger
logger.setLevel(level)
try:
# check if click exists to swap the logger
import click # noqa
formatter = ColorFormatter('[.] %(message)s')
except ImportError:
formatter = CustomFormatter('[.] %(message)s')
handler = logging.StreamHandler(None)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def __init__(self, appname, dllname=None, logtype="Application"):
logging.Handler.__init__(self)
try:
import win32evtlogutil, win32evtlog
self.appname = appname
self._welu = win32evtlogutil
if not dllname:
dllname = os.path.split(self._welu.__file__)
dllname = os.path.split(dllname[0])
dllname = os.path.join(dllname[0], r'win32service.pyd')
self.dllname = dllname
self.logtype = logtype
self._welu.AddSourceToRegistry(appname, dllname, logtype)
self.deftype = win32evtlog.EVENTLOG_ERROR_TYPE
self.typemap = {
logging.DEBUG : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.INFO : win32evtlog.EVENTLOG_INFORMATION_TYPE,
logging.WARNING : win32evtlog.EVENTLOG_WARNING_TYPE,
logging.ERROR : win32evtlog.EVENTLOG_ERROR_TYPE,
logging.CRITICAL: win32evtlog.EVENTLOG_ERROR_TYPE,
}
except ImportError:
print("The Python Win32 extensions for NT (service, event "\
"logging) appear not to be available.")
self._welu = None
def main():
parser = build_cli_parser("Grab all binaries from a Cb server")
parser.add_argument('-d', '--destdir', action='store', help='Destination directory to place the events',
default=os.curdir)
# TODO: we don't have a control on the "start" value in the query yet
# parser.add_argument('--start', action='store', dest='startvalue', help='Start from result number', default=0)
parser.add_argument('-v', action='store_true', dest='verbose', help='Enable verbose debugging messages',
default=False)
args = parser.parse_args()
cb = get_cb_response_object(args)
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
# startvalue = args.startvalue
startvalue = 0
return dump_all_binaries(cb, args.destdir, startvalue)
def __init__(self, session, api_key, service_key):
"""
:param session: The Flask requests object used to connect to PD
:param api_key: The PD read-only, V2 API key
:param service_key: The PD service name which is interrogated
"""
self._api_key = api_key
self._service_key = service_key
self.timezone = 'UTC'
logging.basicConfig(level=logging.INFO)
self._s = session
self._headers = {
'Accept': 'application/vnd.pagerduty+json;version=2',
'Authorization': 'Token token=' + self._api_key
}
self._s.headers.update(self._headers)
def instantiate(p):
print("*** instantiate ***")
print(p)
with rlock:
global logger
logger = logging.getLogger("freepydius-logger")
logger.setLevel(logging.INFO)
handler = TimedRotatingFileHandler(_LOG_FILE,
when="midnight",
interval=1)
formatter = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
log = Log("INSTANCE")
log.log(( ('Response', 'created'), ))
# return 0 for success or -1 for failure
return 0
def __init__(self, apply_light_policy_interval = 10, device_detection_interval = 10, device_offline_delay = 10, logging_level = logging.INFO):
self.__yeelight_detection_thread = None
self.__device_detection_thread = None
self.__device_detection_thread_woker = {}
self.__device_detection_thread_rlock = threading.Lock()
self.__thread_rlock = threading.Lock()
self.__apply_light_policy_thread = None
self.__current_geo = None
self.__compiled_policy = []
self.__compiled_policy_date = None
self.__device_on_monitor = []
self.__device_online = []
self.__device_detection_interval = device_detection_interval
self.__apply_light_policy_interval = apply_light_policy_interval
self.__device_offline_delay = device_offline_delay
self.__config = {}
self.__RUNNING = False
# a few setups
self.register_signal_handler()
self.__setup_log(logging_level = logging_level)
self.__logger.info("Controller instance created")
def cli(ctx, registry, build_container_image, build_container_tag, build_container_net, verbose):
"""
Easily dockerize your Git repository
"""
logging_level = logging.DEBUG if verbose else logging.INFO
utils.configure_logging(name='skipper', level=logging_level)
ctx.obj['registry'] = registry
ctx.obj['build_container_image'] = build_container_image
ctx.obj['build_container_net'] = build_container_net
ctx.obj['git_revision'] = build_container_tag == 'git:revision'
ctx.obj['build_container_tag'] = git.get_hash() if ctx.obj['git_revision'] else build_container_tag
ctx.obj['env'] = ctx.default_map.get('env', {})
ctx.obj['containers'] = ctx.default_map.get('containers')
ctx.obj['volumes'] = ctx.default_map.get('volumes')
ctx.obj['workdir'] = ctx.default_map.get('workdir')
ctx.obj['container_context'] = ctx.default_map.get('container_context')
def __init__(self, args, logger=None, mode=None):
self.commandLine = CommandLine(args)
if mode is not None:
self.commandLine.mode = mode
if logger is None:
logging.basicConfig(format="%(asctime)-15s %(levelname)s [%(filename)s:%(lineno)d-%(thread)d] %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
self.logger = logger
self.mode = self.__detectMode(self.commandLine.mode)
self.config = self.__loadConfig()
self.inventory = Inventory(logger, self.config)
self.commands = {
'migrate': MigrateCommand(self),
'list-migrations': ListMigrationsCommand(self),
'version': VersionCommand(self),
'help':HelpCommand(self) }
self.defaultCommand = HelpCommand(self)
def log(self, message, level=logging.DEBUG, depth=0):
"""Prepend string to log messages to denote class."""
if depth <= 0:
prefix = 'AmazonAccountUtils: '
else:
prefix = "\t" * depth
if level == CRITICAL:
self.logger.critical(prefix + str(message))
elif level == ERROR:
self.logger.error(prefix + str(message))
elif level == WARNING:
self.logger.warning(prefix + str(message))
elif level == INFO:
self.logger.info(prefix + str(message))
else:
self.logger.debug(prefix + str(message))
def test_missing_variable(self):
"""Test if ``WriteTensorBoard`` handles missing image variables as expected."""
bad_epoch_data = {'valid': {}}
with mock.patch.dict('sys.modules', **{'cv2': cv2_mock}):
# test ignore
hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='ignore')
with LogCapture(level=logging.INFO) as log_capture:
hook.after_epoch(42, bad_epoch_data)
log_capture.check()
# test warn
warn_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='warn')
with LogCapture(level=logging.INFO) as log_capture2:
warn_hook.after_epoch(42, bad_epoch_data)
log_capture2.check(('root', 'WARNING', '`plot` not found in epoch data.'))
# test error
raise_hook = WriteTensorBoard(output_dir=self.tmpdir, model=self.get_model(), image_variables=['plot'],
on_missing_variable='error')
with self.assertRaises(KeyError):
raise_hook.after_epoch(42, bad_epoch_data)
def main(args=None):
from fontTools import configLogger
if args is None:
args = sys.argv[1:]
options = Options()
args = options.parse_opts(args)
if len(args) < 1:
print("usage: pyftmerge font...", file=sys.stderr)
return 1
configLogger(level=logging.INFO if options.verbose else logging.WARNING)
if options.timing:
timer.logger.setLevel(logging.DEBUG)
else:
timer.logger.disabled = True
merger = Merger(options=options)
font = merger.merge(args)
outfile = 'merged.ttf'
with timer("compile and save font"):
font.save(outfile)
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logging.info("LEDBAT TEST SINK starting")
loop = asyncio.get_event_loop()
listen = loop.create_datagram_endpoint(PeerProtocol, local_addr=("0.0.0.0", 6778))
transport, protocol = loop.run_until_complete(listen)
if os.name == 'nt':
def wakeup():
# Call again later
loop.call_later(0.5, wakeup)
loop.call_later(0.5, wakeup)
try:
loop.run_forever()
except KeyboardInterrupt:
pass