python类basicConfig()的实例源码

data_pipeline.py 文件源码 项目:search-MjoLniR 作者: wikimedia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main(argv=None):
    args = parse_arguments(argv)
    if args['very_verbose']:
        logging.basicConfig(level=logging.DEBUG)
    elif args['verbose']:
        logging.basicConfig(level=logging.INFO)
    else:
        logging.basicConfig()
    del args['verbose']
    del args['very_verbose']
    sc = SparkContext(appName="MLR: data collection pipeline")
    # spark info logging is incredibly spammy. Use warn to have some hope of
    # human decipherable output
    sc.setLogLevel('WARN')
    sqlContext = HiveContext(sc)
    run_pipeline(sc, sqlContext, **args)
odl.py 文件源码 项目:functest 作者: opnfv 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    """Entry point"""
    logging.basicConfig()
    odl = ODLTests()
    parser = ODLParser()
    args = parser.parse_args(sys.argv[1:])
    try:
        result = odl.run_suites(ODLTests.default_suites, **args)
        if result != robotframework.RobotFramework.EX_OK:
            return result
        if args['pushtodb']:
            return odl.push_to_db()
        else:
            return result
    except Exception:  # pylint: disable=broad-except
        return robotframework.RobotFramework.EX_RUN_ERROR
cli.py 文件源码 项目:waybackscraper 作者: abrenaut 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main():
    args = parse_args()

    logging.basicConfig(level=(logging.WARN if args.quiet else logging.INFO))

    # Don't allow more than 10 concurrent requests to the wayback machine
    concurrency = min(args.concurrency, 10)

    # Scrape results are stored in a temporary folder if no folder specified
    target_folder = args.target_folder if args.target_folder else tempfile.gettempdir()
    logger.info('Writing scrape results in the folder {target_folder}'.format(target_folder=target_folder))

    # Parse the period entered by the user (throws an exception if the dates are not correctly formatted)
    from_date = datetime.strptime(args.from_date, CLI_DATE_FORMAT)
    to_date = datetime.strptime(args.to_date, CLI_DATE_FORMAT)

    # The scraper downloads the elements matching the given xpath expression in the target folder
    scraper = Scraper(target_folder, args.xpath)

    # Launch the scraping using the scraper previously instantiated
    scrape_archives(args.website_url, scraper.scrape, from_date, to_date, args.user_agent, timedelta(days=args.delta),
                    concurrency)
__init__.py 文件源码 项目:seqlog 作者: tintoy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def log_to_console(level=logging.WARNING, override_root_logger=False, **kwargs):
    """
    Configure the logging system to send log entries to the console.

    Note that the root logger will not log to Seq by default.

    :param level: The minimum level at which to log.
    :param override_root_logger: Override the root logger, too?
                                 Note - this might cause problems if third-party components try to be clever
                                 when using the logging.XXX functions.
    """

    logging.setLoggerClass(StructuredLogger)

    if override_root_logger:
        _override_root_logger()

    logging.basicConfig(
        style='{',
        handlers=[
            ConsoleStructuredLogHandler()
        ],
        level=level,
        **kwargs
    )
testing.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(self, result=None):
        logger = logging.getLogger()
        if not logger.handlers:
            logging.basicConfig()
        handler = logger.handlers[0]
        if (len(logger.handlers) > 1 or
                not isinstance(handler, logging.StreamHandler)):
            # Logging has been configured in a way we don't recognize,
            # so just leave it alone.
            super(LogTrapTestCase, self).run(result)
            return
        old_stream = handler.stream
        try:
            handler.stream = StringIO()
            gen_log.info("RUNNING TEST: " + str(self))
            old_error_count = len(result.failures) + len(result.errors)
            super(LogTrapTestCase, self).run(result)
            new_error_count = len(result.failures) + len(result.errors)
            if new_error_count != old_error_count:
                old_stream.write(handler.stream.getvalue())
        finally:
            handler.stream = old_stream
testing.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(self, result=None):
        logger = logging.getLogger()
        if not logger.handlers:
            logging.basicConfig()
        handler = logger.handlers[0]
        if (len(logger.handlers) > 1 or
                not isinstance(handler, logging.StreamHandler)):
            # Logging has been configured in a way we don't recognize,
            # so just leave it alone.
            super(LogTrapTestCase, self).run(result)
            return
        old_stream = handler.stream
        try:
            handler.stream = StringIO()
            gen_log.info("RUNNING TEST: " + str(self))
            old_error_count = len(result.failures) + len(result.errors)
            super(LogTrapTestCase, self).run(result)
            new_error_count = len(result.failures) + len(result.errors)
            if new_error_count != old_error_count:
                old_stream.write(handler.stream.getvalue())
        finally:
            handler.stream = old_stream
testing.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self, result=None):
        logger = logging.getLogger()
        if not logger.handlers:
            logging.basicConfig()
        handler = logger.handlers[0]
        if (len(logger.handlers) > 1 or
                not isinstance(handler, logging.StreamHandler)):
            # Logging has been configured in a way we don't recognize,
            # so just leave it alone.
            super(LogTrapTestCase, self).run(result)
            return
        old_stream = handler.stream
        try:
            handler.stream = StringIO()
            gen_log.info("RUNNING TEST: " + str(self))
            old_error_count = len(result.failures) + len(result.errors)
            super(LogTrapTestCase, self).run(result)
            new_error_count = len(result.failures) + len(result.errors)
            if new_error_count != old_error_count:
                old_stream.write(handler.stream.getvalue())
        finally:
            handler.stream = old_stream
twitter_news_bot.py 文件源码 项目:twitter-news-bot 作者: aaronshaver 项目源码 文件源码 阅读 59 收藏 0 点赞 0 评论 0
def __init__(self):
        date_time_name = datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
        logging.basicConfig(filename=date_time_name + '.log', level=logging.INFO)

        path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
        self.config = configparser.ConfigParser()
        self.config.read(os.path.join(path, "configuration.txt"))
        self.sleep_time = int(self.config.get("settings", "time_between_retweets"))
        self.search_term = self.config.get("settings", "search_query")
        self.tweet_language = self.config.get("settings", "tweet_language")
        self.max_age_in_minutes = int(self.config.get("settings", "max_age_in_minutes"))

        self.last_id_file = self.build_save_point()
        self.savepoint = self.retrieve_save_point(self.last_id_file)

        auth = tweepy.OAuthHandler(self.config.get("twitter", "consumer_key"), self.config.
                                   get("twitter", "consumer_secret"))
        auth.set_access_token(self.config.get("twitter", "access_token"), self.config.
                              get("twitter", "access_token_secret"))
        self.api = tweepy.API(auth)
run.py 文件源码 项目:Round1 作者: general-ai-challenge 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup_logging(
    default_path='logging.ini',
    default_level=logging.INFO,
    env_key='LOG_CFG'
):
    """Setup logging configuration

    """
    path = default_path
    value = os.getenv(env_key, None)
    if value:
        path = value
    if os.path.exists(path):
        logging.config.fileConfig(default_path)
    else:
        logging.basicConfig(level=default_level)
log.py 文件源码 项目:integration 作者: mendersoftware 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def setup_custom_logger(name, testname):
    log_format = "%(asctime)s [%(levelname)s]: >> %(message)s"

    logging.basicConfig(format=log_format, level=logging.INFO)
    logger = logging.getLogger(name)

    for h in list(logger.handlers):
        logger.removeHandler(h)

    consoleHandler = logging.StreamHandler()
    logFormatter = logging.Formatter(log_format)
    logFormatter._fmt = testname + " -- " + logFormatter._fmt
    consoleHandler.setFormatter(logFormatter)
    logger.addHandler(consoleHandler)
    logging.getLogger(name).addHandler(consoleHandler)
    logger.propagate = False
    return logger
utils.py 文件源码 项目:LoLVRSpectate 作者: Fire-Proof 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setup_logging(debug=False, os_info=True):
    if os.environ.get("LOLVRSPECTATE_DEBUG") == "1":
        debug = True

    if not debug:
        format_ = '%(asctime)-15s || %(message)s'
        logging.basicConfig(filename="LoLVRSpectate.log", format=format_, level=logging.INFO, filemode="w")
        logging.getLogger().addHandler(logging.StreamHandler())  # Log both to file and console
    else:
        logging.basicConfig(level=logging.DEBUG)

    if os_info:
        logging.info("Win platform = {}".format(platform.platform()))
        if 'PROGRAMFILES(X86)' in os.environ:
            logging.info("System Arch = {}".format("64 bit"))
        else:
            logging.info("System Arch = {}".format("32 bit"))
        logging.info("Python version = {}".format(sys.version))
        logging.info("VorpX exclusion = {}".format(is_excluded()))
sum_and_product.py 文件源码 项目:robograph 作者: csparpa 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def logged_sum_and_product(list_of_numbers):
    v = value.Value(value=list_of_numbers)
    s = apply.Apply(function=sum)
    m = apply.Apply(function=lambda c: reduce(lambda x, y: x * y, c))
    b = buffers.Buffer()

    logging.basicConfig(level=logging.ERROR)
    p = printer.LogPrinter(logger=logging.getLogger(__name__),
                           loglevel=logging.ERROR)

    g = graph.Graph('logged_sum_and_product', [v, s, m, b, p])

    g.connect(p, b, 'message')
    g.connect(b, s, 'sum value')
    g.connect(b, m, 'product value')
    g.connect(s, v, 'argument')
    g.connect(m, v, 'argument')

    return g
sum_and_product.py 文件源码 项目:robograph 作者: csparpa 项目源码 文件源码 阅读 94 收藏 0 点赞 0 评论 0
def logged_sum_and_product(list_of_numbers):
    v = value.Value(value=list_of_numbers)
    s = apply.Apply(function=sum)
    m = apply.Apply(function=lambda c: reduce(lambda x, y: x * y, c))
    b = buffers.Buffer()

    logging.basicConfig(level=logging.ERROR)
    p = printer.LogPrinter(logger=logging.getLogger(__name__),
                           loglevel=logging.ERROR)

    g = graph.Graph('logged_sum_and_product', [v, s, m, b, p])

    g.connect(p, b, 'message')
    g.connect(b, s, 'sum value')
    g.connect(b, m, 'product value')
    g.connect(s, v, 'argument')
    g.connect(m, v, 'argument')

    return g
test_intent.py 文件源码 项目:bothub-sdk-python 作者: bothub-studio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_dispatch_should_trigger_intent_and_default():
    logging.basicConfig(level=logging.DEBUG)
    bot = MockBot()
    intent_slots = fixture_intent_slots()
    state = IntentState(bot, intent_slots)
    dispatcher = DefaultDispatcher(bot, state)
    dispatcher.dispatch({'content': '/intent credentials', 'channel': 'fakechannel'}, None)
    dispatcher.dispatch({'content': 'my token', 'channel': 'fakechannel'}, None)
    dispatcher.dispatch({'content': 'my secret token', 'channel': 'fakechannel'}, None)
    dispatcher.dispatch({'content': 'hello', 'channel': 'fakechannel'}, None)
    assert len(bot.executed) == 2
    executed = bot.executed.pop(0)
    assert executed == Executed('set_credentials', ('my token', 'my secret token'))
    executed = bot.executed.pop(0)
    assert executed == Executed('on_default', ({'content': 'hello', 'channel': 'fakechannel'},
                                               None))
main.py 文件源码 项目:Wall-EEG 作者: neurotechuoft 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def set_logging(args):
    '''
    Sets up logging capability
    :param args: argparse.Namespace
    :return: None
    '''
    if args.log:
        print("Logging Enabled: " + str(args.log))
        logging.basicConfig(filename="OBCI.log",
                            format='%(asctime)s - %(levelname)s : %(message)s',
                            level=logging.DEBUG)
        logging.getLogger('yapsy').setLevel(logging.DEBUG)
        logging.info('---------LOG START-------------')
        logging.info(args)
    else:
        print("main.py: Logging Disabled.")
train.py 文件源码 项目:TextRankPlus 作者: zuoxiaolei 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def run():
    '''
    ????
    '''
    reload(sys)
    sys.setdefaultencoding('utf8')
    program = os.path.basename(sys.argv[0])
    logger = logging.getLogger(program)
    logging.basicConfig(format='%(asctime)s: %(levelname)s: %(message)s')
    logging.root.setLevel(level=logging.INFO)
    logger.info("running %s" % ' '.join(sys.argv))

    outp1 = r'wiki_model'
    outp2 = r'vector.txt'
    model = Word2Vec(sentences, size=400, window=5, min_count=5, workers=multiprocessing.cpu_count())
    model.save(outp1)
    model.wv.save_word2vec_format(outp2, binary=False)

    testData = ['??','??','??','??']
    for i in testData:
        temp = model.most_similar(i)
        for j in temp:
            print '%f %s'%(j[1],j[0])
        print ''
simulate_reporter.py 文件源码 项目:chainerboard 作者: koreyou 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(infile, outfile, time_interval, quiet):
    logging.basicConfig(level=logging.WARN if quiet else logging.INFO)
    logger = logging.getLogger(__name__)
    logger.info('loading input file %s ...' % infile)
    with open(infile) as fin:
        # Do not use click.File because we want close the file asap
        data = json.load(fin)
    n = len(data)
    logger.info(
        'loading input file %s done. %d data found.'% (infile, n))
    for i in xrange(len(data)):
        logger.info('Sleeping for %d sec [%d/%d] ...' % (time_interval, i+1, n))
        time.sleep(time_interval)
        with open(outfile, 'w') as fout:
            json.dump(data[:(i+1)], fout)
        logger.info('Dumped %dth/%d data to %s' % (i+1, n, outfile))
dar_backup.py 文件源码 项目:atoolbox 作者: liweitianux 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(
        description="Backup system/data using dar and par2")
    parser.add_argument("-c", "--config", dest="config", required=True,
                        help="configuration file for dar and archive. " +
                        "NOTE: the backup archive will be placed under " +
                        "the same directory as this configuration file")
    parser.add_argument("-n", "--dry-run", dest="dry_run", action="store_true",
                        help="dry run, do not perform any action")
    parser.add_argument("-v", "--verbose", dest="verbose", action="store_true",
                        help="show verbose information")
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.INFO)

    settings = DarSettings(args.config, verbose=args.verbose,
                           dry_run=args.dry_run)
    dar = DarBackup(settings)
    dar.run(dry_run=args.dry_run)
backup.py 文件源码 项目:atoolbox 作者: liweitianux 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def main():
    parser = argparse.ArgumentParser(
        description="Backup files preserving metadata")
    parser.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
                        help="dry run, do not perform actual action")
    parser.add_argument("-q", "--quiet", dest="quiet", action="store_true",
                        help="be quiet")
    parser.add_argument("-d", "--debug", dest="debug", action="store_true",
                        help="show verbose debug information")
    parser.add_argument("config", help="configuration file")
    args = parser.parse_args()

    if args.quiet and not args.dryrun:
        logging.basicConfig(level=logging.WARNING)
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)

    now = datetime.now()
    logger.info("=== %s @ %s ===" % (" ".join(sys.argv), now.isoformat()))
    if args.dryrun:
        logger.info("*** DRY RUN ***")
    backup = Backup(args.config, dryrun=args.dryrun, debug=args.debug)
    backup.backup()
    backup.cleanup()
    logger.info("=== Backup Finished! @ %s ===" % datetime.now().isoformat())
cli.py 文件源码 项目:mau-mau 作者: obestwalter 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    logging.basicConfig(format=LOG.FMT, level=logging.INFO)
    try:
        fire.Fire(Cli)
    except KeyboardInterrupt:
        log.error("\nfatal: lost game by chickening out!")
        sys.exit(1)


问题


面经


文章

微信
公众号

扫码关注公众号