python类Logger()的实例源码

yar.py 文件源码 项目:fibratus 作者: rabbitstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_invalid_path(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': None,
        }):
            from fibratus.binding.yar import YaraBinding
            with patch('os.path.exists', return_value=False), \
                 patch('os.path.isdir', return_value=False):
                    with pytest.raises(BindingError) as e:
                        YaraBinding(outputs,
                                    Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules-invalid')
                        assert 'C:\\yara-rules-invalid rules path does not exist' in str(e.value)
yar.py 文件源码 项目:fibratus 作者: rabbitstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_yara_python_not_installed(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': None,
        }):
            from fibratus.binding.yar import YaraBinding
            with pytest.raises(BindingError) as e:
                YaraBinding(outputs,
                            Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
                assert 'yara-python package is not installed' in str(e.value)
yar.py 文件源码 项目:fibratus 作者: rabbitstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_run(self, outputs):
        with patch.dict('sys.modules', **{
            'yara': MagicMock(),
        }):
            from fibratus.binding.yar import YaraBinding
            with patch('os.path.exists', return_value=True), \
                 patch('os.path.isdir', return_value=True), \
                 patch('glob.glob', return_value=['silent_banker.yar']), \
                 patch('yara.compile'):
                yara_binding = YaraBinding(outputs,
                                           Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
                yara_binding.run(thread_info=Mock(spec_set=ThreadInfo), kevent=Mock(spec_set=KEvent))
                assert yara_binding._rules.match.called
conftest.py 文件源码 项目:python-logbook-systemd 作者: vivienm 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def logger():
    return Logger('testlogger')
bot.py 文件源码 项目:GAFBot 作者: DiNitride 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):

        log = Logger("GAF Bot")
        log.handlers.append(StreamHandler(sys.stdout, bubble=True))
        log.handlers.append(FileHandler("bot/logs/last-run.log", bubble=True, mode="w"))
        self.logger = log
        self.logger.notice("Logging started")
        self.logger.notice("Bot process started")

        with open("bot/config/defaults/default.guildconfig.json") as f:
            self.default_guild_config = json.load(f)
            self.logger.debug("Loaded default guild config")

        self.logger.debug("Connecting to DB")
        self.db_conn = sqlite3.connect("bot/config/guild_configs.db")
        self.logger.notice("DB Connection Established")
        self.db_cursor = self.db_conn.cursor()
        self.db_cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='serverSettings'")
        exists = self.db_cursor.fetchone()
        if not exists[0]:
            self.logger.error("No table found in DB! Creating new one now")
            self.db_cursor.execute('''CREATE TABLE serverSettings (id bigint, settings long)''')
            self.logger.debug("Table created")

        self.load_extension("bot.modules.core")
        self.logger.notice("Loaded core module")

        self.logger.notice("Loading other modules")
        # This bar and the time.sleep() stuff is entirely useless
        # Like completely
        # Don't do this
        # It just looks cool and that makes me happy but really this is terrible
        # and a complete waste of time
        time.sleep(0.5)
        for cog in tqdm.tqdm(self.config["modules"].keys(),
                             desc="Loading modules"
                             ):
            self.load_extension(f"bot.modules.{cog.lower()}")
            time.sleep(0.2)
        time.sleep(0.5)
        self.logger.debug("Completed loading modules")

        self.logger.notice("Logging into Discord")
        super().run(self.config["token"], reconnect=True)
cli.py 文件源码 项目:pingparsing 作者: thombashi 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    options = parse_option()

    initialize_log_handler(options.log_level)

    logger = logbook.Logger("pingparsing cli")
    logger.level = options.log_level
    pingparsing.set_log_level(options.log_level)

    output = {}
    if is_use_stdin():
        from concurrent import futures

        pingparsing.set_log_level(options.log_level)

        max_workers = (multiprocessing.cpu_count() * 2
                       if options.max_workers is None else options.max_workers)
        count, deadline = get_ping_param(options)
        logger.debug("max-workers={}, count={}, deadline={}".format(
            max_workers, count, deadline))

        try:
            with futures.ProcessPoolExecutor(max_workers) as executor:
                future_list = []
                for dest_or_file in options.destination_or_file:
                    logger.debug("start {}".format(dest_or_file))
                    future_list.append(executor.submit(
                        parse_ping, logger, dest_or_file, options.interface,
                        count, deadline))

                for future in futures.as_completed(future_list):
                    key, ping_data = future.result()
                    output[key] = ping_data
        finally:
            logger.debug("shutdown ProcessPoolExecutor")
            executor.shutdown()
    else:
        ping_result_text = sys.stdin.read()
        ping_parser = pingparsing.PingParsing()
        ping_parser.parse(ping_result_text)
        output = ping_parser.as_dict()

    if options.indent <= 0:
        print(json.dumps(output))
    else:
        print(json.dumps(output, indent=options.indent))

    return 0


问题


面经


文章

微信
公众号

扫码关注公众号