def instantiate(p):
print("*** instantiate ***")
print(p)
with rlock:
global logger
logger = logging.getLogger("freepydius-logger")
logger.setLevel(logging.INFO)
handler = TimedRotatingFileHandler(_LOG_FILE,
when="midnight",
interval=1)
formatter = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
log = Log("INSTANCE")
log.log(( ('Response', 'created'), ))
# return 0 for success or -1 for failure
return 0
python类getLogger()的实例源码
def monitor():
"""Wrapper to call console with a loop."""
devicelist = (
{
"address": "3c4fc5",
"cat": 0x05,
"subcat": 0x0b,
"firmware": 0x00
},
{
"address": "43af9b",
"cat": 0x02,
"subcat": 0x1a,
"firmware": 0x00
}
)
log = logging.getLogger(__name__)
loop = asyncio.get_event_loop()
asyncio.async(console(loop, log, devicelist))
loop.run_forever()
def __init__(self):
"""Create the Protocol object."""
self.log = logging.getLogger(__name__)
self._codelist = []
self.add(0x50, name='INSTEON Standard Message Received', size=11)
self.add(0x51, name='INSTEON Extended Message Received', size=25)
self.add(0x52, name='X10 Message Received', size=4)
self.add(0x53, name='ALL-Linking Completed', size=10)
self.add(0x54, name='Button Event Report', size=3)
self.add(0x55, name='User Reset Detected', size=2)
self.add(0x56, name='ALL-Link CLeanup Failure Report', size=2)
self.add(0x57, name='ALL-Link Record Response', size=10)
self.add(0x58, name='ALL-Link Cleanup Status Report', size=3)
self.add(0x60, name='Get IM Info', size=2, rsize=9)
self.add(0x61, name='Send ALL-Link Command', size=5, rsize=6)
self.add(0x62, name='INSTEON Fragmented Message', size=8, rsize=9)
self.add(0x64, name='Start ALL-Linking', size=4, rsize=5)
self.add(0x65, name='Cancel ALL-Linking', size=4)
self.add(0x67, name='Reset the IM', size=2, rsize=3)
self.add(0x69, name='Get First ALL-Link Record', size=2)
self.add(0x6a, name='Get Next ALL-Link Record', size=2)
self.add(0x73, name='Get IM Configuration', size=2, rsize=6)
def __init__(self, strat):
self.log = logging.getLogger('strategy.StrategyIterator({})'.format(str(strat)))
self.strategy = strat
sig = inspect.signature(strat.generate)
params = sig.parameters
kws = {}
self.log.debug('init')
for kw in params:
if kw in strat._kws:
self.log.debug('add keyword {kw}'.format(kw=kw))
kws[kw] = strat._kws[kw]
elif params[kw].kind == inspect.Parameter.VAR_KEYWORD:
self.log.debug('merge keywords on VAR_KEYWORD {kw}'.format(kw=kw))
kws.update(strat._kws)
break
self._generator = strat.generate(strat._depth, *strat._args, **kws)
def add_stderr_logger(level=logging.DEBUG):
"""
Helper for quickly adding a StreamHandler to the logger. Useful for
debugging.
Returns the handler after adding it.
"""
# This method needs to be in this __init__.py to get the __name__ correct
# even if urllib3 is vendored within another package.
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
logger.addHandler(handler)
logger.setLevel(level)
logger.debug('Added a stderr logging handler to logger: %s', __name__)
return handler
# ... Clean up.
def main():
"""Run newer stuffs."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
parser.add_argument("--elb-subnet", help="Subnetnet type, e.g. external, internal", required=True)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
spinnakerapps = SpinnakerDns(
app=args.app, env=args.env, region=args.region, prop_path=args.properties, elb_subnet=args.elb_subnet)
spinnakerapps.create_elb_dns()
def main():
"""Destroy any DNS related resources of an application
Records in any Hosted Zone for an Environment will be deleted.
"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
assert destroy_dns(**vars(args))
def main():
"""Command to create IAM Instance Profiles, Roles, Users, and Groups.
IAM Roles will retain any attached Managed Policies. Inline Policies that do
not match the name *iam-project_repo_policy* will also be left untouched.
**WARNING**: Inline Policies named *iam-project_repo_policy* will be
rewritten.
"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
assert create_iam_resources(**args.__dict__)
def main():
"""CLI entrypoint for scaling policy creation"""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_properties(parser)
add_env(parser)
add_region(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
asgpolicy = AutoScalingPolicy(app=args.app, prop_path=args.properties, env=args.env, region=args.region)
asgpolicy.create_policy()
def main():
"""Entry point for ELB creation"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description='Example with non-optional arguments')
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
elb = SpinnakerELB(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
elb.create_elb()
def main():
"""Send Slack notification to a configured channel."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_env(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split(".")[0]).setLevel(args.debug)
log.debug('Parsed arguements: %s', args)
if "prod" not in args.env:
log.info('No slack message sent, not a production environment')
else:
log.info("Sending slack message, production environment")
slacknotify = SlackNotification(app=args.app, env=args.env, prop_path=args.properties)
slacknotify.post_message()
def __init__(self, app='', trigger_job='', prop_path='', base='', runway_dir=''):
self.log = logging.getLogger(__name__)
self.header = {'content-type': 'application/json'}
self.here = os.path.dirname(os.path.realpath(__file__))
self.runway_dir = os.path.expandvars(os.path.expanduser(runway_dir or ''))
self.base = base
self.trigger_job = trigger_job
self.generated = get_details(app=app)
self.app_name = self.generated.app_name()
self.group_name = self.generated.project
self.settings = get_properties(prop_path)
self.environments = self.settings['pipeline']['env']
def main():
"""Create Lambda events."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
add_properties(parser)
add_region(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
lambda_function = LambdaFunction(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
lambda_function.create_lambda_function()
lambda_event = LambdaEvent(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
lambda_event.create_lambda_events()
def main():
"""Create any API Gateway event related resources."""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
apigateway = APIGateway(**vars(args))
apigateway.setup_lambda_api()
def __init__(self, app='', env='', region='', rules={}, prop_path=''):
self.log = logging.getLogger(__name__)
self.generated = get_details(app=app, env=env)
self.trigger_settings = rules
self.app_name = self.generated.app_name()
self.env = env
self.account_id = get_env_credential(env=self.env)['accountId']
self.region = region
self.properties = get_properties(properties_file=prop_path, env=self.env)
session = boto3.Session(profile_name=env, region_name=region)
self.client = session.client('apigateway')
self.lambda_client = session.client('lambda')
self.api_version = self.lambda_client.meta.service_model.api_version
self.api_id = self.find_api_id()
self.resource_id, self.parent_id = self.find_resource_ids()
def __init__(self):
self.db = cms.config['db']
self.logger = logging.getLogger('redberry')
def __init__(self, methodName):
super(RedTestCase, self).__init__(methodName)
self.test_client = app.test_client()
self.db = db
self.logger = logging.getLogger('redberry.tests')
self.url_prefix = url_prefix
def init_logger():
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')
logger = logging.getLogger('redberry')
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
console.setFormatter(formatter)
logger.addHandler(console)
def __init__(self, *args):
self.log = logging.getLogger(resource_filename(__name__, __file__))
self.cluster = Cluster(args[0])
self.connection = self.cluster.connect()
self.connection.row_factory = tuple_factory
self.connection.execute("CREATE KEYSPACE IF NOT EXISTS public \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")
self.connection.execute("CREATE KEYSPACE IF NOT EXISTS internal \
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };")
self.connection.execute("CREATE TABLE IF NOT EXISTS public.users ( \
name text PRIMARY KEY, \
n text, \
e text, \
secret text);")
self.connection.execute("CREATE TABLE IF NOT EXISTS public.contracts ( \
id uuid PRIMARY KEY, \
owner text, \
package text, \
template blob, \
example blob);")