def __init__(self, filename, user_input, verbose, address, username, password, configuration, upload):
self.user_input = user_input
self.verbose = verbose
self.bam_client = None
self.logger = logging.getLogger(__name__)
self.id_list = {}
self.address = address
self.username = username
self.password = password
self.configuration = configuration
self.filename = filename
self.upload = upload
if self.verbose:
coloredlogs.install(logger=self.logger, level='DEBUG', format="%(levelname)s:%(msg)s")
else:
coloredlogs.install(logger=self.logger, level='WARNING', format="%(levelname)s:%(msg)s")
# <summary>
# Main entry point for the program
# </summary>
# <param name="export" type="boolean">
# Determines application execution path (e.g. whether to import or export data to/from the BAM service)
python类install()的实例源码
def __init__(self):
parser = argparse.ArgumentParser(
description="Deploys to kubernetes with azk.io",
usage='''deploy <command> [<args>]
Commands:
deploy Builds and pushes new images, and syncs Kubernetes.
push Builds and pushes new images, but doesn't update Kubernetes.
sync Creates/updates kubernetes resources, but doesn't build/push anything.
loadconfig Just check the kubernetes config and connectivity.
loadsource Just check that the project source is mounted properly, and generate proposed Kubernetes resources.
'''
)
parser.add_argument('command', help='Subcommand to run')
self.target_setup = None
self.api = None
args = parser.parse_args(sys.argv[1:])
coloredlogs.install(level=('DEBUG'), fmt='%(message)s', isatty=True)
if not hasattr(self, args.command):
parser.print_help()
exit(1)
getattr(self, args.command)()
def main():
parser = ArgParser(default_config_files=['/etc/factoriomcd.ini', '~/.factoriomcd.ini'])
parser.add('-d', '--debug', action='store_true')
parser.add('-v', '--verbose', action='store_true')
parser.add('--log-file', default="/opt/factorio/server.out")
parser.add('--server-id', default="1")
parser.add('--rcon-host', default="localhost")
parser.add('--rcon-password', default="asdasd")
parser.add('--rcon-port', default=31337)
parser.add('--ws-url', default="ws://127.0.0.1:8000/ws_v1/server_callback/1/")
parser.add('--ws-password', default="asdasd")
options = parser.parse_args()
if options.verbose:
coloredlogs.install(level='DEBUG')
logger.debug("FactorioMCd initializing...")
else:
coloredlogs.install(level='INFO')
FactorioMCd(options).run()
def configure_logging(config):
logging.config.dictConfig(config)
# By default the install() function installs a handler on the root logger,
# this means that log messages from your code and log messages from the
# libraries that you use will all show up on the terminal.
coloredlogs.install(level=logging.DEBUG,
fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%I:%S')
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3.connectionpool').setLevel(logging.WARNING)
if sys.stdout.isatty():
colorama.init(autoreset=True)
else:
# We're being piped, so skip colors
colorama.init(strip=True)
def setup_logger(options):
level = 'INFO'
if options.verbose:
level = 'DEBUG'
coloredlogs.install(
level=level,
level_styles={
'warn': {
'color': 'yellow'
},
'error': {
'color': 'red',
'bold': True,
},
},
fmt='%(message)s',
isatty=True
)
log.debug("Logger set to DEBUG")
def log_warn_only():
"""Drop to warning level and down to get around gen.generate() log.info
output"""
coloredlogs.install(
level='WARNING',
level_styles={
'warn': {
'color': 'yellow'
},
'error': {
'color': 'red',
'bold': True,
},
},
fmt='%(message)s',
isatty=True
)
def __init__(self, logging_queue=None, debug=False):
# self.regular_formatter = logging.Formatter(logging.BASIC_FORMAT)
self.json_formatter = logging.Formatter('\
{"time": "%(asctime)s", "level": "%(levelname)s", "data": "%(message)s"}')
# initalize logging if isn't initialized yet
if not hasattr(self, 'log'):
self.log = logging.getLogger(self.__class__.__name__)
# initialize regular logger
# handler = logging.StreamHandler()
# handler.setFormatter(self.regular_formatter)
# self.log.addHandler(handler)
level = 'DEBUG' if debug else 'INFO'
coloredlogs.install(level=level)
# initialize webui communication
if logging_queue is not None:
interface_handler = InterfaceHandler(logging_queue)
interface_handler.setFormatter(self.json_formatter)
self.log.addHandler(interface_handler)
# self.log.setLevel(logging.DEBUG)
def __init__(self, ws_root, config=None, ws_name=None, log_level=None):
self._ws_root = ws_root
self._ws_config = dict()
if config:
self._ws_config = config
else:
self.load_default_config()
if ws_name:
self.config['name'] = ws_name
if log_level:
self.config['log_level'] = log_level
coloredlogs.install(level=self.config['log_level'])
def __init__(self, workspace, preload=False):
# Assign parameters
coloredlogs.install(level=workspace.log_level)
self._workspace = workspace
self._schemas_local_master = workspace.schemas_local_master
self._schemas_remote_master = workspace.schemas_remote_master
self._schemas = {}
# Configure location for schemas
self.config_schema_locations()
# Keep a library of loaded schemas to avoid re-loading
self._schemas_library = dict()
self._error_msg = ''
# if preload, load local cached schema files
if preload:
self.preload_local_schemas()
def get_logger(args, name = _default_logger_name):
try:
import coloredlogs
coloredlogs.install(
isatty = True,
show_name = False,
show_severity = False,
level = logging.NOTSET,
severity_to_style = {'DEBUG': {'color': 'blue'}},
)
except:
logging.basicConfig(
stream = sys.stdout,
format = '%(asctime)s ' + ghn() + ' %(levelname)-8s %(message)s',
datefmt = "%Y-%m-%d %H:%M:%S",
level = logging.NOTSET,
)
_set_logging_level(args.quiet, args.verbose)
return logging.getLogger(name)
def log(level):
"""Log verbose setter message"""
try:
import logging
import verboselogs
except ImportError:
sys.exit("""You need logging , verboselogs!
install it from http://pypi.python.org/pypi
or run pip install logging verboselogs""")
# set logger level from parent class
logger = verboselogs.VerboseLogger('')
# add the handlers to the logger
logger.setLevel(getattr(logging, level))
return logger
def configure_logger(
logger=None,
log_level='DEBUG',
no_log=False,
file_log=False,
console_log=True,
log_file=None):
if not logger:
logger = get_logger()
if no_log:
logger.setLevel(logging.ERROR)
logger.addHandler(logging.NullHandler())
else:
logger.setLevel(log_level.upper())
fmt = (
"%(asctime)s - %(message)s"
)
fmtr = formatter()
if console_log:
if USE_COLORED_LOGS:
coloredlogs.install(level=os.environ.get('COLOREDLOGS_LOG_LEVEL', log_level.upper()),
fmt=fmt,
field_styles=FIELD_STYLES,
level_styles=LEVEL_STYLES,
overridefmt=LEVEL_FORMATS)
else:
sh = logging.StreamHandler()
sh.setFormatter(fmtr)
sh.setLevel(log_level.upper())
logger.addHandler(sh)
if file_log:
if log_file is not None:
func_log = os.path.abspath(log_file)
os.mkdir(os.path.dirname(func_log))
fh = logging.FileHandler(func_log)
fh.setFormatter(fmtr)
fh.setLevel(log_level)
logger.addHandler(fh)
def main():
if not options.syslog:
coloredlogs.install(level=logging.DEBUG if options.debug else logging.INFO,
fmt='[%(levelname).1s %(asctime)s %(module)s:%(lineno)d] %(message)s',
datefmt='%y%m%d %H:%M:%S')
else:
syslog.enable_system_logging(level=logging.DEBUG if options.debug else logging.INFO,
fmt='vj4[%(process)d] %(programname)s %(levelname).1s %(message)s')
logging.getLogger('sockjs').setLevel(logging.WARNING)
url = urllib.parse.urlparse(options.listen)
if url.scheme == 'http':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
host, port_str = url.netloc.rsplit(':', 1)
sock.bind((host, int(port_str)))
elif url.scheme == 'unix':
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove(url.path)
except FileNotFoundError:
pass
sock.bind(url.path)
else:
_logger.error('Invalid listening scheme %s', url.scheme)
return 1
for i in range(1, options.prefork):
pid = os.fork()
if not pid:
break
else:
atexit.register(lambda: os.kill(pid, signal.SIGTERM))
web.run_app(app.Application(), sock=sock, access_log=None, shutdown_timeout=0)
def set_log_level(level):
coloredlogs.install(level=level)
logging.basicConfig(level=level)
# Main menu
def set_log_level(level):
coloredlogs.install(level=level)
logging.basicConfig(level=level)
def set_log_level(level):
coloredlogs.install(level=level)
logging.basicConfig(level=level)
# Main menu
def set_colored_log_level():
global logger
coloredlogs.install(fmt=conf.LOG_FORMAT_DEBUG, datefmt="%m%d %H:%M:%S", level=verboselogs.SPAM)
logging.basicConfig(format=conf.LOG_FORMAT_DEBUG, level=verboselogs.SPAM)
logger = verboselogs.VerboseLogger("dev")
# for logger color reset during test
def main():
global logger
args = get_args()
if args.debug:
coloredlogs.install(level="DEBUG")
logger.debug("Command arguments: %s" % args)
# Adjust file limits
from_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
(soft_limit, hard_limit) = from_limit
soft_limit = min(10000, hard_limit)
to_limit = (soft_limit, hard_limit)
logger.debug("Raising open file limit from %s to %s" % (repr(from_limit), repr(to_limit)))
resource.setrlimit(resource.RLIMIT_NOFILE, to_limit)
try:
result = modes.run(args)
except KeyboardInterrupt:
logger.critical("User abort")
return 5
if result != 0:
logger.error("Command failed")
return result
def main():
args = parse_args()
loglevel = logging.INFO if args.verbose is None else logging.DEBUG
logging.basicConfig(level=loglevel)
coloredlogs.install()
import_new_features(args.node_eui, args.exclude)
def set_logging(lvl='info'):
global logger
try:
if not (isinstance(lvl, int) and lvl in LOG_LEVELS.values()):
lvl = LOG_LEVELS[lvl]
except KeyError:
return False
logging.basicConfig(format=LOG_FORMAT, level=lvl)
if coloredlogs is not None:
coloredlogs.install(lvl, fmt=LOG_FORMAT)
if logger is not None:
logger.setLevel(lvl)
return True
def __init__(self, media):
import cfscrape
self.session = cfscrape.create_scraper()
self.media = media
self.links = []
self.sources = SourceList()
self.logger = logging.getLogger(
__name__ + '.' + self.__class__.__name__
)
formatter = logging.Formatter(
'%(levelname)s - \033[92m{}\033[0m: %(message)s'.format(
self.__class__.__name__
)
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
# Prevent the messages from being propagated to the root logger
self.logger.propagate = 0
self.logger.addHandler(handler)
coloredlogs.install(level='DEBUG')
def client(ip, port, message):
coloredlogs.install()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
try:
print('sending %s to %s' % (message, (ip, port)))
sock.sendall(message)
response = sock.recv(1024)
print "Received: {}".format(response)
finally:
sock.close()
def logged(class_, use_colore=True):
"""logged decorator.
:param class_: add 'logger' attribute to class
"""
coloredlogs.install(level=logging.INFO) if use_colore else None
logging.getLogger("requests").setLevel(logging.WARNING)
# logging.basicConfig(level=logging.INFO, format='%(lineno)d %(message)s')
class_.logger = logging.getLogger(class_.__name__)
return class_
def __init__(self, workspace, prj_root, config=None):
coloredlogs.install(level=workspace.log_level)
self._prj_root = prj_root
self._workspace = workspace
if config:
self._prj_config = config
else:
self.load_default_config()
def log_level(self, value):
self.config['log_level'] = value
coloredlogs.install(level=value)
def __init__(self, workspace=None):
"""
Initialize the Validator.
A workspace may be provided for an easy parameter configuration,
such as location and extension of descriptors, verbosity level, etc.
:param workspace: SONATA workspace object
"""
self._workspace = workspace
self._syntax = True
self._integrity = True
self._topology = True
# create "virtual" workspace if not provided (don't actually create
# file structure)
if not self._workspace:
self._workspace = Workspace('.', log_level='info')
# load configurations from workspace
self._dext = self._workspace.default_descriptor_extension
self._dpath = '.'
self._log_level = self._workspace.log_level
# for package signature validation
self._pkg_signature = None
self._pkg_pubkey = None
# configure logs
coloredlogs.install(level=self._log_level)
# descriptors storage
self._storage = DescriptorStorage()
# syntax validation
self._schema_validator = SchemaValidator(self._workspace, preload=True)
# reset event logger
evtlog.reset()
self.source_id = None
self._fwgraphs = dict()
def configure(self, syntax=None, integrity=None, topology=None,
dpath=None, dext=None, debug=None, pkg_signature=None,
pkg_pubkey=None):
"""
Configure parameters for validation. It is recommended to call this
function before performing a validation.
:param syntax: specifies whether to validate syntax
:param integrity: specifies whether to validate integrity
:param topology: specifies whether to validate network topology
:param dpath: directory to search for function descriptors (VNFDs)
:param dext: extension of descriptor files (default: 'yml')
:param debug: increase verbosity level of logger
:param pkg_signature: String package signature to be validated
:param pkg_pubkey: String package public key to verify signature
"""
# assign parameters
if syntax is not None:
self._syntax = syntax
if integrity is not None:
self._integrity = integrity
if topology is not None:
self._topology = topology
if dext is not None:
self._dext = dext
if dpath is not None:
self._dpath = dpath
if debug is True:
self._workspace.log_level = 'debug'
coloredlogs.install(level='debug')
if debug is False:
self._workspace.log_level = 'info'
coloredlogs.install(level='info')
if pkg_signature is not None:
self._pkg_signature = pkg_signature
if pkg_pubkey is not None:
self._pkg_pubkey = pkg_pubkey
def __init__(self, args):
self.start_time = time.time()
self.service_experiments = list()
self.function_experiments = list()
self.generated_services = list()
# arguments
self.args = args
self.args.ped = os.path.join(os.getcwd(), self.args.ped)
self.work_dir = self.args.work_dir
self.output_dir = self.args.output_dir
# logging setup
coloredlogs.install(level="DEBUG" if args.verbose else "INFO")
LOG.info("SONATA profiling tool initialized")
LOG.debug("Arguments: %r" % self.args)
def getModuleLogger(moduleName):
logger = logging.getLogger(moduleName)
logger.setLevel(logging.INFO)
coloredlogs.install(level='INFO')
return logger
def run_pipeline():
pipeline = Pipeline(
'example1',
[
GenerateFile,
HashFile,
RemoveFile,
]
)
coloredlogs.install(logging.DEBUG)
for x in range(300):
gevent.spawn(pipeline.backend.enqueue_job, Job.new(dict(job_type='generate-file')))
pipeline.loop()