def init(self):
import DataStore, readconf, logging, sys
self.conf.update({ "debug": None, "logging": None })
self.conf.update(DataStore.CONFIG_DEFAULTS)
args, argv = readconf.parse_argv(self.argv, self.conf, strict=False)
if argv and argv[0] in ('-h', '--help'):
print self.usage()
return None, []
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG, format="%(message)s")
if args.logging is not None:
import logging.config as logging_config
logging_config.dictConfig(args.logging)
store = DataStore.new(args)
return store, argv
# Abstract hex-binary conversions for eventual porting to Python 3.
python类config()的实例源码
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def configure_logging(debug=False, verbose=True, stderr=True):
config = copy.deepcopy(LOG_CONFIG)
for handler in config["handlers"].values():
if verbose:
handler["level"] = "INFO"
if debug:
handler["level"] = "DEBUG"
if verbose:
config["handlers"]["stderr"]["formatter"] = "verbose"
if debug:
config["handlers"]["stderr"]["formatter"] = "debug"
if stderr:
config["loggers"][LOG_NAMESPACE]["handlers"].append("stderr")
logging.config.dictConfig(config)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def setup_logging(self, default_path=PATH_LOGGING, default_level=logging.INFO, env_key='LOG_CFG'):
path = default_path
self.logconf = None
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
with open(path, 'rt') as f:
config = json.load(f)
self.logconf = logging.config.dictConfig(config)
elif os.path.exists(path.replace("../", "")):
with open(path.replace("../", ""), 'rt') as f:
config = json.load(f)
self._changePath(config["handlers"])
self.logconf = logging.config.dictConfig(config)
else:
print("Configurazione log non trovata (\"%s\"): applico le impostazioni predefinite" % path)
self.logconf = logging.basicConfig(level=default_level)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def _get_config(key, default_value=None, required=False):
"""
Gets config from environment variables
Will return default_value if key is not in environment variables
:param key: the key of the env variable you are looking for
:param default_value: value to return if key not in os.environ.
:param required: if true and key is not set, will raise InvalidConfigException
:return: os.environ[key] if key in os.environ els default_value
:exception InvalidConfigException - raised when a required config key is not properly set
"""
if required and key not in os.environ:
raise InvalidConfigException("Invalid ENV variable. Please check {0}".format(key))
to_return = os.environ.get(key, default_value)
if isinstance(to_return, basestring):
try:
to_return = _string_to_bool(to_return)
except NonBooleanStringException:
pass
os.environ[key] = str(to_return)
return to_return
def setup_logging(
default_path='logging.ini',
default_level=logging.INFO,
env_key='LOG_CFG'
):
"""Setup logging configuration
"""
path = default_path
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
logging.config.fileConfig(default_path)
else:
logging.basicConfig(level=default_level)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def configure_logging(logging_config, logging_settings):
if not sys.warnoptions:
# Route warnings through python logging
logging.captureWarnings(True)
# RemovedInNextVersionWarning is a subclass of DeprecationWarning which
# is hidden by default, hence we force the "default" behavior
warnings.simplefilter("default", RemovedInNextVersionWarning)
if logging_config:
# First find the logging configuration function ...
logging_config_func = import_string(logging_config)
logging.config.dictConfig(DEFAULT_LOGGING)
# ... then invoke it with the logging settings
if logging_settings:
logging_config_func(logging_settings)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def _applyConfigurationToValues(self, parser, config, values):
for name, value, filename in config:
if name in option_blacklist:
continue
try:
self._processConfigValue(name, value, values, parser)
except NoSuchOptionError, exc:
self._file_error(
"Error reading config file %r: "
"no such option %r" % (filename, exc.name),
name=name, filename=filename)
except optparse.OptionValueError, exc:
msg = str(exc).replace('--' + name, repr(name), 1)
self._file_error("Error reading config file %r: "
"%s" % (filename, msg),
name=name, filename=filename)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def process_alert(self, route):
recipients = list(set(route["recipient_ids"]))
logging.debug("Processing alert for {}, recipients {}".format(
str(route), str(recipients)
))
if "*" in self.data:
recipients.append("*")
for recipient_id in recipients:
if not recipient_id in self.data:
continue
recipient = self.data[recipient_id]
if len(recipient["routes"]) < recipient["config"]["max_routes"]:
recipient["routes"].append(route)
else:
logging.debug("Discarding route {} for {}: buffer full ".format(
route["prefix"], recipient_id
))
def _flush_recipient(self, recipient):
if not isinstance(recipient["config"]["info"]["email"], list):
email_addresses = [recipient["config"]["info"]["email"]]
else:
email_addresses = list(set(recipient["config"]["info"]["email"]))
logging.info("Sending email to {} ({}) for {}".format(
recipient["id"],
", ".join(email_addresses),
", ".join([route["prefix"] for route in recipient["routes"]])
))
data = {
"id": recipient["id"],
"from_addr": self.from_addr,
"subject": self.subject,
"routes_list": self._format_list_of_routes(recipient["routes"])
}
msg = MIMEText(self.template.format(**data))
msg['Subject'] = self.subject
msg['From'] = self.from_addr
msg['To'] = ", ".join(email_addresses)
self._send_email(self.from_addr, email_addresses, msg.as_string())
def process_alert(self, route):
recipients = list(set(route["recipient_ids"]))
logging.debug("Processing alert for {}, recipients {}".format(
str(route), str(recipients)
))
if "*" in self.data:
recipients.append("*")
for recipient_id in recipients:
if not recipient_id in self.data:
continue
recipient = self.data[recipient_id]
if len(recipient["routes"]) < recipient["config"]["max_routes"]:
recipient["routes"].append(route)
else:
logging.debug("Discarding route {} for {}: buffer full ".format(
route["prefix"], recipient_id
))
def _flush_recipient(self, recipient):
if not isinstance(recipient["config"]["info"]["email"], list):
email_addresses = [recipient["config"]["info"]["email"]]
else:
email_addresses = list(set(recipient["config"]["info"]["email"]))
logging.info("Sending email to {} ({}) for {}".format(
recipient["id"],
", ".join(email_addresses),
", ".join([route["prefix"] for route in recipient["routes"]])
))
data = {
"id": recipient["id"],
"from_addr": self.from_addr,
"subject": self.subject,
"routes_list": self._format_list_of_routes(recipient["routes"])
}
msg = MIMEText(self.template.format(**data))
msg['Subject'] = self.subject
msg['From'] = self.from_addr
msg['To'] = ", ".join(email_addresses)
self._send_email(self.from_addr, email_addresses, msg.as_string())
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def setup_logging(verbosity_level, save_debug_log):
logging.captureWarnings(True)
# if config['logging']['config_file']:
# # Logging config from file must be read before other handlers are
# # added. If not, the other handlers will have no effect.
# try:
# path = config['logging']['config_file']
# logging.config.fileConfig(path, disable_existing_loggers=False)
# except Exception as e:
# # Catch everything as logging does not specify what can go wrong.
# logger.error('Loading logging config %r failed. %s', path, e)
setup_console_logging(verbosity_level)
if save_debug_log:
print('Here we would call setup_debug_logging_to_file(config)')
# setup_debug_logging_to_file(config)
_delayed_handler.release()
def setup_logger():
from colorlog import ColoredFormatter
from gettext import gettext as _ # noqa
try:
"""Return a logging obj with a default ColoredFormatter."""
formatter = ColoredFormatter(
"%(asctime)s %(name)-12s (%(threadName)-9s) %(log_color)s%(levelname)-8s%(reset)s (%(funcName)-5s) %(message_log_color)s%(message)s", # noqa
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
'TRACE': 'purple'
},
secondary_log_colors={
'message': {
'ERROR': 'red',
'CRITICAL': 'red',
'DEBUG': 'yellow',
'INFO': 'yellow,bg_blue'
}
},
style='%'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.getLogger('').addHandler(handler)
logging.root.setLevel(logging.DEBUG)
except ImportError:
# No color available, use default config
logging.basicConfig(format='%(levelname)s: %(message)s')
logging.warn("Disabling color, you really want to install colorlog.")
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()