def configure_logging(path_to_log_directory):
"""
Configure logger
:param path_to_log_directory: path to directory to write log file in
:return:
"""
log_filename = datetime.datetime.now().strftime('%Y-%m-%d') + '.log'
importer_logger = logging.getLogger('importer_logger')
importer_logger.setLevel(LOG_LEVEL)
formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
fh = logging.FileHandler(filename=os.path.join(path_to_log_directory, log_filename))
fh.setLevel(LOG_LEVEL)
fh.setFormatter(formatter)
importer_logger.addHandler(fh)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(LOG_LEVEL)
sh.setFormatter(formatter)
importer_logger.addHandler(sh)
python类getLogger()的实例源码
def create_logger():
"""
Setup the logging environment
"""
log = logging.getLogger() # root logger
log.setLevel(logging.INFO)
format_str = '%(asctime)s - %(levelname)-8s - %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
if HAVE_COLORLOG and os.isatty(2):
cformat = '%(log_color)s' + format_str
colors = {'DEBUG': 'reset',
'INFO': 'reset',
'WARNING': 'bold_yellow',
'ERROR': 'bold_red',
'CRITICAL': 'bold_red'}
formatter = colorlog.ColoredFormatter(cformat, date_format,
log_colors=colors)
else:
formatter = logging.Formatter(format_str, date_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
log.addHandler(stream_handler)
return logging.getLogger(__name__)
def setup_logging(log_level=logging.INFO):
"""Set up the logging."""
logging.basicConfig(level=log_level)
fmt = ("%(asctime)s %(levelname)s (%(threadName)s) "
"[%(name)s] %(message)s")
colorfmt = "%(log_color)s{}%(reset)s".format(fmt)
datefmt = '%Y-%m-%d %H:%M:%S'
# Suppress overly verbose logs from libraries that aren't helpful
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('aiohttp.access').setLevel(logging.WARNING)
try:
from colorlog import ColoredFormatter
logging.getLogger().handlers[0].setFormatter(ColoredFormatter(
colorfmt,
datefmt=datefmt,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
))
except ImportError:
pass
logger = logging.getLogger('')
logger.setLevel(log_level)
def kas(argv):
"""
The main entry point of kas.
"""
create_logger()
parser = kas_get_argparser()
args = parser.parse_args(argv)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.info('%s %s started', os.path.basename(sys.argv[0]), __version__)
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, interruption)
atexit.register(_atexit_handler)
for plugin in getattr(kasplugin, 'plugins', []):
if plugin().run(args):
return
parser.print_help()
def __init__(self, pool_names, max_restarts=0, options=None):
self.names = pool_names
self.queue = multiprocessing.Queue()
self.pool = dict()
self.max_restarts = max_restarts
self.options = options or dict()
self.dog_path = os.curdir
self.dog_handler = LiveReload(self)
# self.dog_observer = Observer()
# self.dog_observer.schedule(self.dog_handler, self.dog_path, recursive=True)
if multiprocessing.get_start_method() != 'fork': # pragma: no cover
root_logger = logging.getLogger()
self.log_listener = QueueListener(self.queue, *root_logger.handlers)
# TODO: Find out how to get the watchdog + livereload working on a later moment.
# self.dog_observer.start()
self._restarts = dict()
def init_logger(logger_name):
# initialize logger
log = logging.getLogger(logger_name)
_h = logging.FileHandler('%s/%s' % (
cfg.CONF.service.service_log_path,
cfg.CONF.service.service_log_filename))
_h.setFormatter(logging.Formatter("'%(asctime)s - %(pathname)s:"
"%(lineno)s - %(levelname)s"
" - %(message)s'"))
log.addHandler(_h)
if cfg.CONF.service.enable_debug_log_entries:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
return log
def configure_logging(self):
"""
Configure logging to log to std output as well as to log file
"""
log_level = logging.DEBUG
log_filename = datetime.now().strftime('%Y-%m-%d') + '.log'
sp_logger = logging.getLogger('sp_logger')
sp_logger.setLevel(log_level)
formatter = logging.Formatter('%(asctime)s : %(levelname)s : %(message)s')
fh = logging.FileHandler(filename=self.log_dir + log_filename)
fh.setLevel(log_level)
fh.setFormatter(formatter)
sp_logger.addHandler(fh)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(log_level)
sh.setFormatter(formatter)
sp_logger.addHandler(sh)
def get_audit_actions(self, date_modified, offset=0, page_length=100):
"""
Get all actions created after a specified date. If the number of actions found is more than 100, this function will
page until it has collected all actions
:param date_modified: ISO formatted date/time string. Only actions created after this date are are returned.
:param offset: The index to start retrieving actions from
:param page_length: How many actions to fetch for each page of action results
:return: Array of action objects
"""
logger = logging.getLogger('sp_logger')
actions_url = self.api_url + 'actions/search'
response = self.authenticated_request_post(
actions_url,
data=json.dumps({
"modified_at": {"from": str(date_modified)},
"offset": offset,
"status": [0, 10, 50, 60]
})
)
result = self.parse_json(response.content) if response.status_code == requests.codes.ok else None
self.log_http_status(response.status_code, 'GET actions')
if result is None or None in [result.get('count'), result.get('offset'), result.get('total'), result.get('actions')]:
return None
return self.get_page_of_actions(logger, date_modified, result, offset, page_length)
def init(verbose=2, sendto=True, backupCount=5):
"""
Set's up some simple default handling to make it
easier for those wrapping this library.
You do not need to call this function if you
don't wnat to; ideally one might want to set up
things their own way.
"""
# Add our handlers at the parent level
add_handler(
logging.getLogger(SQLALCHEMY_LOGGER),
sendto=True,
backupCount=backupCount,
)
add_handler(
logging.getLogger(NEWSREAP_LOGGER),
sendto=True,
backupCount=backupCount,
)
if verbose:
set_verbosity(verbose=verbose)
def forwarder(tasks, interval, batch_size, source, dest):
'''Forward items from one storage to another.'''
from .utils import RunFlag, load_manager, redis_client
from .store import QueueStore
log = logging.getLogger('dsq.forwarder')
if not tasks and not source:
print('--tasks or --source must be provided')
sys.exit(1)
s = QueueStore(redis_client(source)) if source else load_manager(tasks).queue
d = QueueStore(redis_client(dest))
run = RunFlag()
while run:
batch = s.take_many(batch_size)
if batch['schedule'] or batch['queues']:
try:
d.put_many(batch)
except Exception:
s.put_many(batch)
log.exception('Forward error')
raise
else:
time.sleep(interval)
def main():
import argparse
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
parser = argparse.ArgumentParser(description="Linux distro info tool")
parser.add_argument(
'--json',
'-j',
help="Output in machine readable format",
action="store_true")
args = parser.parse_args()
if args.json:
logger.info(json.dumps(info(), indent=4, sort_keys=True))
else:
logger.info('Name: %s', name(pretty=True))
distribution_version = version(pretty=True)
if distribution_version:
logger.info('Version: %s', distribution_version)
distribution_codename = codename()
if distribution_codename:
logger.info('Codename: %s', distribution_codename)
def add_stderr_logger(level=logging.DEBUG):
"""
Helper for quickly adding a StreamHandler to the logger. Useful for
debugging.
Returns the handler after adding it.
"""
# This method needs to be in this __init__.py to get the __name__ correct
# even if urllib3 is vendored within another package.
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
logger.addHandler(handler)
logger.setLevel(level)
logger.debug('Added a stderr logging handler to logger: %s', __name__)
return handler
# ... Clean up.
def configureLogging(level, console, file):
logger = logging.getLogger()
logger.setLevel(level)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
if console:
cons = logging.StreamHandler()
cons.setLevel(level)
cons.setFormatter(formatter)
logger.addHandler(cons)
print("logging to console")
if file:
f = logging.FileHandler(file)
f.setLevel(level)
f.setFormatter(formatter)
logger.addHandler(f)
print("logging to file {0}".format(file))
def init_logging(logfile, debug=True, level=None):
"""
Simple configuration of logging.
"""
if debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# allow user to override exact log_level
if level:
log_level = level
logging.basicConfig(level=log_level,
format='%(asctime)s %(levelname)-8s [%(name)s] %(message)s',
filename=logfile,
filemode='a')
return logging.getLogger("circus")
def add_stderr_logger(level=logging.DEBUG):
"""
Helper for quickly adding a StreamHandler to the logger. Useful for
debugging.
Returns the handler after adding it.
"""
# This method needs to be in this __init__.py to get the __name__ correct
# even if urllib3 is vendored within another package.
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
logger.addHandler(handler)
logger.setLevel(level)
logger.debug('Added a stderr logging handler to logger: %s', __name__)
return handler
# ... Clean up.
def main():
import argparse
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
parser = argparse.ArgumentParser(description="Linux distro info tool")
parser.add_argument(
'--json',
'-j',
help="Output in machine readable format",
action="store_true")
args = parser.parse_args()
if args.json:
logger.info(json.dumps(info(), indent=4, sort_keys=True))
else:
logger.info('Name: %s', name(pretty=True))
distribution_version = version(pretty=True)
if distribution_version:
logger.info('Version: %s', distribution_version)
distribution_codename = codename()
if distribution_codename:
logger.info('Codename: %s', distribution_codename)
def add_stderr_logger(level=logging.DEBUG):
"""
Helper for quickly adding a StreamHandler to the logger. Useful for
debugging.
Returns the handler after adding it.
"""
# This method needs to be in this __init__.py to get the __name__ correct
# even if urllib3 is vendored within another package.
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
logger.addHandler(handler)
logger.setLevel(level)
logger.debug('Added a stderr logging handler to logger: %s', __name__)
return handler
# ... Clean up.
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the MyQ garage door."""
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
logger = logging.getLogger(__name__)
if username is None or password is None:
logger.error("MyQ Cover - Missing username or password.")
return
try:
brand = BRAND_MAPPINGS[config.get(CONF_BRAND)];
except KeyError:
logger.error("MyQ Cover - Missing or unsupported brand. Supported brands: %s", ', '.join(SUPPORTED_BRANDS))
return
myq = MyQAPI(username, password, brand, logger)
add_devices(MyQCoverDevice(myq, door) for door in myq.get_garage_doors())
def __init__(self, parent_orb, execparams, poa):
# The CORBA name this object is registered under
self.naming_service_name = execparams['NAME_BINDING']
# The parent ORB for this object
self.parent_orb = parent_orb
# The CORBA portable object adapter
self.poa = poa
# The uuid assigned to this instance of the component
self.uuid = execparams['COMPONENT_IDENTIFIER']
# The storage of property values that don't have getters/setters
self.propertySet = {}
execparams_value = " ".join(["%s %s" % x for x in execparams.items()])
self.propertySet[getId("execparams")] = CF.DataType(id=getId("execparams"), value=omniORB.any.to_any(execparams_value))
# The PID of the child process
self._pid = None
self._log = logging.getLogger(self.naming_service_name)
######################################
# Implement the Resource interface
def __init__(self, devmgr=None, uuid=None, label=None, softwareProfile=None):
self.props = {}
self.uuid = uuid
self._devmgr = devmgr
self._label = label
self._usageState = CF.Device.IDLE
self._adminState = CF.Device.UNLOCKED
self._operationalState = CF.Device.ENABLED
self._softwareProfile = softwareProfile
self._compositeDevice = None
self._log = logging.getLogger(label)
if self._devmgr:
self._devmgr.registerDevice(self._this())
# Helper Methods
def main():
# Set up a console logger.
console = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(name)-12s:%(levelname)-8s: %(message)s")
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
logging.getLogger().setLevel(logging.INFO)
kw = {}
longopts = ['domainname=', 'verbose']
opts, args = getopt.getopt(sys.argv[1:], 'v', longopts)
for opt, val in opts:
if opt == '--domainname':
kw['domainName'] = val
if opt in ['-v', '--verbose']:
kw['verbose'] = True
a = QApplication(sys.argv)
QObject.connect(a,SIGNAL("lastWindowClosed()"),a,SLOT("quit()"))
w = BrowseWindow(**kw)
w.show()
a.exec_()
def __init__(self, resource=None ):
self._mgr_lock = threading.Lock()
self._ecm = None
self._logger = logging.getLogger("ossie.events.Manager")
self._logger.setLevel(logging.INFO)
self._allow = True
self._registrations=[]
if resource :
try:
self._logger.debug("Requesting Domain Manager Access....")
dom = resource.getDomainManager()
self._logger.debug("Requesting EventChannelManager Access....")
self._ecm = dom.getRef()._get_eventChannelMgr()
self._logger.debug("Acquired reference to EventChannelManager")
except:
#print traceback.format_exc()
self._logger.warn("EventChannelManager - unable to resolve DomainManager's EventChannelManager ")
pass
def run(self, args=None, namespace=None):
options = self.parser.parse_args(args=args, namespace=namespace)
enable_pretty_logging()
logger = logging.getLogger(__name__)
# todo configure_logger() method ?
if options.debug:
logging.getLogger('root').setLevel(logging.INFO)
if options.verbose:
if options.verbose >= 1:
logging.getLogger('root').setLevel(logging.DEBUG)
if options.verbose >= 2:
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO if options.verbose < 2 else logging.DEBUG)
try:
handler = options.handler
except AttributeError as e:
if not callable(self.default_handler):
raise
handler = None
return (handler or self.default_handler)(logger, options)
def setup(name=__name__, level=logging.INFO):
logger = logging.getLogger(name)
if logger.handlers:
return logger
logger.setLevel(level)
try:
# check if click exists to swap the logger
import click # noqa
formatter = ColorFormatter('[.] %(message)s')
except ImportError:
formatter = CustomFormatter('[.] %(message)s')
handler = logging.StreamHandler(None)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def merge_files(groups, outdir):
"""
Merge files that belong to the same filename group.
Merged files are created in the output directory.
Args:
groups: Dictionary of filename groups from `group_filenames`.
outdir: Output path for merged files.
"""
logger = logging.getLogger("mergeFQs." + "merge")
for groupname, filenames in groups.iteritems():
logger.info("Merging group " + groupname + " with " +
str(len(filenames)) + " files...")
outpath = os.path.join(outdir, groupname)
logger.info("Creating merge file " + outpath + "...")
with open(outpath, "wb") as outfile:
for filename in filenames:
logger.info("Adding " + filename + "...")
with open(filename, "rb") as fq_file:
shutil.copyfileobj(fq_file, outfile)
def main():
"""
Run main code
1. Get arguments
2. Setup logging
3. Group filenames
4. Merge files
"""
args = get_args()
setup_logging(args.outdir)
logger = logging.getLogger("mergeFQs." + __name__)
logger.info(str(len(args.fastqs)) + " input files provided")
logger.info("Filename pattern is " + args.pattern)
pattern = args.pattern.split(args.separator)
ex_file = args.fastqs[0]
ex_merge = merge_filename(ex_file, pattern, args.separator)
logger.info("Example merge: " + ex_file + " -> " +
os.path.join(args.outdir, ex_merge))
file_groups = group_filenames(args.fastqs, pattern, args.separator)
logger.info(str(len(file_groups)) + " file groups found...")
merge_files(file_groups, args.outdir)
def __init__(
self,
interval_in_seconds,
service_name,
result_dict,
max_delay_seconds,
disable=False
):
super(SensuAlertManager, self).__init__(interval_in_seconds)
self._service_name = service_name
self._setup_ok_result_dict(result_dict)
self._setup_delayed_result_dict()
self._setup_disabled_alert_dict()
self._log = logging.getLogger('{}.util.sensu_alert_manager'.format(service_name))
self._disable = disable
self._should_send_sensu_disabled_message = False
self._max_delay = timedelta(seconds=max_delay_seconds)
def __init__(self, session, api_id, api_hash,
proxy=None, timeout=timedelta(seconds=5)):
"""Initializes the Telegram client with the specified API ID and Hash.
Session must always be a Session instance, and an optional proxy
can also be specified to be used on the connection.
"""
self.session = session
self.api_id = int(api_id)
self.api_hash = api_hash
self.proxy = proxy
self._timeout = timeout
self._logger = logging.getLogger(__name__)
# Cache "exported" senders 'dc_id: TelegramBareClient' and
# their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process.
self._cached_clients = {}
# These will be set later
self.dc_options = None
self._sender = None
# endregion
# region Connecting
def run_daemon(server, pidfile, daemonize=True):
"""Run the server as a daemon
:param server: cutlery (a Spoon or Spork)
:param pidfile: the file to keep the parent PID
:param daemonize: if True fork the processes into
a daemon.
:return:
"""
logger = logging.getLogger(server.server_logger)
if daemonize:
detach(pidfile=pidfile, logger=logger)
elif pidfile:
with open(pidfile, "w+") as pidf:
pidf.write("%s\n" % os.getpid())
try:
server.serve_forever()
finally:
try:
os.remove(pidfile)
except OSError:
pass
def __init__(self, address):
self.log = logging.getLogger(self.server_logger)
self.socket = None
if ":" in address[0]:
self.address_family = socket.AF_INET6
else:
self.address_family = socket.AF_INET
self.log.debug("Listening on %s", address)
super(_SpoonMixIn, self).__init__(address, self.handler_klass,
bind_and_activate=False)
self.load_config()
self._setup_socket()
# Finally, set signals
if self.signal_reload is not None:
signal.signal(self.signal_reload, self.reload_handler)
if self.signal_shutdown is not None:
signal.signal(self.signal_shutdown, self.shutdown_handler)