def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
python类config()的实例源码
def setup_logging(default_path='logging.json',
default_level=logging.INFO, env_key='LOG_CFG'):
"""Setup logging configuration
"""
path = default_path
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
with open(path, 'rt') as f:
config = json.load(f)
logging.config.dictConfig(config)
else:
logging.basicConfig(level=default_level)
socketHandler = logging.handlers.DatagramHandler(
'localhost', logging.handlers.DEFAULT_UDP_LOGGING_PORT)
rootLogger = logging.getLogger('')
rootLogger.addHandler(socketHandler)
def main(argv):
""" MySQL binlog to Google Pub/Sub entry point
Args:
argv (list): list of command line arguments
"""
args = _setup_arg_parser(argv)
conf_file = args.conf
if conf_file:
os.environ['BINLOG2GPUBSUB_CONF_FILE'] = conf_file
if args.logconf:
logging.config.fileConfig(args.logconf, disable_existing_loggers=False)
else:
logging.basicConfig()
if args.loglevel:
logging.root.setLevel(logging.getLevelName(args.loglevel.upper()))
import mysqlbinlog2gpubsub
mysqlbinlog2gpubsub.start_publishing()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def extract_batch(dataset, config):
with tf.device("/cpu:0"):
bboxer = PriorBoxGrid(config)
data_provider = slim.dataset_data_provider.DatasetDataProvider(
dataset, num_readers=2,
common_queue_capacity=512, common_queue_min=32)
if args.segment:
im, bbox, gt, seg = data_provider.get(['image', 'object/bbox', 'object/label',
'image/segmentation'])
else:
im, bbox, gt = data_provider.get(['image', 'object/bbox', 'object/label'])
seg = tf.expand_dims(tf.zeros(tf.shape(im)[:2]), 2)
im = tf.to_float(im)/255
bbox = yxyx_to_xywh(tf.clip_by_value(bbox, 0.0, 1.0))
im, bbox, gt, seg = data_augmentation(im, bbox, gt, seg, config)
inds, cats, refine = bboxer.encode_gt_tf(bbox, gt)
return tf.train.shuffle_batch([im, inds, refine, cats, seg],
args.batch_size, 2048, 64, num_threads=4)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()
def __init__(self, pathname, **settings):
"""initial config for singleton baka framework
:param import_name: the name of the application package
:param settings: *optional dict settings for pyramid configuration
"""
self.import_name = pathname
self.settings = settings
self.__include = {}
self.__trafaret = trafaret_yaml
# Only set up a default log handler if the
# end-user application didn't set anything up.
if not (logging.root.handlers and log.level == logging.NOTSET and settings.get('LOGGING')):
formatter = logging.Formatter(logging_format)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)
def resource(self, path, **kwargs):
def decorator(wrapped, depth=1):
route_name = kwargs.pop("route_name", None)
route_name = route_name or wrapped.__name__
route_name = kwargs.pop("name", route_name)
wrapped.route_name = route_name
def callback(scanner, name, cls):
config = scanner.config.with_package(info.module)
config.add_route(route_name, path, factory=cls)
config.add_view(default_options_view, route_name=route_name,
request_method='OPTIONS', permission=NO_PERMISSION_REQUIRED)
config.add_view(unsupported_method_view, route_name=route_name, renderer='json')
for method in METHODS:
setattr(wrapped, method, type('ViewDecorator%s' % method,
(ViewDecorator, object),
{'request_method': method,
'state': wrapped,
'kwargs': kwargs}))
info = venusian.attach(wrapped, callback, 'pyramid', depth=depth)
return wrapped
return decorator
def run(self, host=None, port=None, **options):
""" application runner server for development stage. not for production.
:param host: url host application server
:param port: number of port
:param options: dict options for werkzeug wsgi server
"""
settings = self.config.get_settings()
_host = '127.0.0.1'
_port = 5000
host = host or _host
port = int(port or _port)
options.setdefault('use_reloader', settings.get('debug_all'))
options.setdefault('use_debugger', settings.get('debug_all'))
from werkzeug.serving import run_simple
run_simple(host, port, self.config.make_wsgi_app(), **options)
def _logging_config(config_parser, disable_existing_loggers=False):
"""
Helper that allows us to use an existing ConfigParser object to load logging
configurations instead of a filename.
Note: this code is essentially copy pasta from `logging.config.fileConfig` except
we skip loading the file.
"""
formatters = logging.config._create_formatters(config_parser)
# critical section
logging._acquireLock()
try:
logging._handlers.clear()
del logging._handlerList[:]
# Handlers add themselves to logging._handlers
handlers = logging.config._install_handlers(config_parser, formatters)
logging.config._install_loggers(config_parser, handlers, disable_existing_loggers)
finally:
logging._releaseLock()
def main(args=None):
args = get_parser_args(args)
_logging_config(args.config)
logger = logging.getLogger(__name__)
logger.info("Loading configurations")
slackbot_config = resources.SlackBotConfig.from_config(args.config)
# Since we can't inject the settings into the bot, let's load all the settings
# into the module
slackbot_config.load_into_settings_module(slackbot.settings)
# Load the config into the settings...
# TODO: PR to be able to inject settings instead of auto magically loading them from a module
slackbot.settings.SLACK_JIRA_CONF = args.config
logger.info("Starting slackbot")
bot = slackbot.bot.Bot()
bot.run()
def my_log():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[logging.FileHandler('message.log', 'a', 'utf-8')])
# ?????_?????__
_log = logging.getLogger('app.' + __name__)
host = '10.0.0.175'
port = 8080
# ??? 'xxxx' % (aa, bb)????????
_log.error('error to connect to %s:%d', host, port)
_log.addFilter(FilterFunc('foo')) # ?????foo()???????
lgg = logging.getLogger('app.network.client')
lgg.propagate = False # ??????
lgg.error('do you see me?') # ????????
lgg.setLevel(logging.CRITICAL)
lgg.error('now you see me?')
logging.disable(logging.DEBUG) # ????????
# ??log??????main?????????
config.fileConfig('applogcfg.ini')
def setup_logging(config_file_path, log_level=logging.INFO):
"""
Logging configuration helper.
:param config_file_path: file path to logging configuration file.
https://docs.python.org/3/library/logging.config.html#object-connections
:param log_level: defaults to logging.INFO
:return: None - access the logger by name as described in the config--or the "root" logger as a backup.
"""
try:
with open(config_file_path, 'rt') as f:
config = json.load(f)
logging.config.dictConfig(config)
except (ValueError, IOError, OSError):
# json.JSONDecodeError is throwable in Python3.5+ -- subclass of ValueError
logging.basicConfig(log_level=log_level)
logging.root.exception(
"Could not load specified logging configuration '{}'. Verify the filepath exists and is compliant with: "
"[https://docs.python.org/3/library/logging.config.html#object-connections]".format(config_file_path))
def _load_config():
# Fills the global CONFIG dictionary using default and custom config
# Returns an error if the custom config is invalid
global CONFIG
try:
cfg = _load_default_config()
custom_cfg = _load_custom_config()
if custom_cfg:
CONFIG = _merge(cfg, custom_cfg)
else:
CONFIG = cfg
except yaml.YAMLError as exc:
# Try to point to the line that threw an error
if hasattr(exc, 'problem_mark'):
mark = exc.problem_mark
return 'Error in YAML at position: ({}:{})'.format(mark.line + 1,
mark.column + 1)
def get(entity, param):
"""
Returns the configuration value belonging to a specified entity
(e.g. neo4j) and parameter (e.g. host).
:param entity: The configuration entity
:param param: The configuration parameter
:return: The configuration value
:raises ValueError if a requested parameter is not configured
"""
try:
value = _get_config()[entity][param]
LOGGER.debug('Found config: {}:{} => {}'.format(entity, param, value))
return _get_config()[entity][param]
except KeyError:
# Should _never_ happen in production!
msg = 'Parameter {} is not present for entity {}!'.format(param,
entity)
LOGGER.critical(msg)
raise ValueError(msg)
def _setup_requests(app):
def _init_request():
session = request.environ['beaker.session']
session.save()
_setup_connector(
app=current_app,
app_config=current_app.config,
session=session
)
@app.before_request
def before_request():
init_request = _init_request()
return init_request
def make_web_app():
logging.config.dictConfig(config.LOGGING_CONFIG)
settings = {
'debug': constants.DEBUG,
'template_path': os.path.join(
os.path.dirname(__file__), "web", "templates"
),
'static_path': os.path.join(
os.path.dirname(__file__), 'web', 'static'
),
'default_handler_class ': BaseHandler
}
app = tornado.web.Application(url_handlers, **settings)
return app
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url)
with context.begin_transaction():
context.run_migrations()