def test_init_invalid_path(self, outputs):
with patch.dict('sys.modules', **{
'yara': None,
}):
from fibratus.binding.yar import YaraBinding
with patch('os.path.exists', return_value=False), \
patch('os.path.isdir', return_value=False):
with pytest.raises(BindingError) as e:
YaraBinding(outputs,
Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules-invalid')
assert 'C:\\yara-rules-invalid rules path does not exist' in str(e.value)
python类Logger()的实例源码
def test_init_yara_python_not_installed(self, outputs):
with patch.dict('sys.modules', **{
'yara': None,
}):
from fibratus.binding.yar import YaraBinding
with pytest.raises(BindingError) as e:
YaraBinding(outputs,
Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
assert 'yara-python package is not installed' in str(e.value)
def test_run(self, outputs):
with patch.dict('sys.modules', **{
'yara': MagicMock(),
}):
from fibratus.binding.yar import YaraBinding
with patch('os.path.exists', return_value=True), \
patch('os.path.isdir', return_value=True), \
patch('glob.glob', return_value=['silent_banker.yar']), \
patch('yara.compile'):
yara_binding = YaraBinding(outputs,
Mock(spec_set=Logger), output='amqp', path='C:\\yara-rules')
yara_binding.run(thread_info=Mock(spec_set=ThreadInfo), kevent=Mock(spec_set=KEvent))
assert yara_binding._rules.match.called
def logger():
return Logger('testlogger')
def run(self):
log = Logger("GAF Bot")
log.handlers.append(StreamHandler(sys.stdout, bubble=True))
log.handlers.append(FileHandler("bot/logs/last-run.log", bubble=True, mode="w"))
self.logger = log
self.logger.notice("Logging started")
self.logger.notice("Bot process started")
with open("bot/config/defaults/default.guildconfig.json") as f:
self.default_guild_config = json.load(f)
self.logger.debug("Loaded default guild config")
self.logger.debug("Connecting to DB")
self.db_conn = sqlite3.connect("bot/config/guild_configs.db")
self.logger.notice("DB Connection Established")
self.db_cursor = self.db_conn.cursor()
self.db_cursor.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='serverSettings'")
exists = self.db_cursor.fetchone()
if not exists[0]:
self.logger.error("No table found in DB! Creating new one now")
self.db_cursor.execute('''CREATE TABLE serverSettings (id bigint, settings long)''')
self.logger.debug("Table created")
self.load_extension("bot.modules.core")
self.logger.notice("Loaded core module")
self.logger.notice("Loading other modules")
# This bar and the time.sleep() stuff is entirely useless
# Like completely
# Don't do this
# It just looks cool and that makes me happy but really this is terrible
# and a complete waste of time
time.sleep(0.5)
for cog in tqdm.tqdm(self.config["modules"].keys(),
desc="Loading modules"
):
self.load_extension(f"bot.modules.{cog.lower()}")
time.sleep(0.2)
time.sleep(0.5)
self.logger.debug("Completed loading modules")
self.logger.notice("Logging into Discord")
super().run(self.config["token"], reconnect=True)
def main():
options = parse_option()
initialize_log_handler(options.log_level)
logger = logbook.Logger("pingparsing cli")
logger.level = options.log_level
pingparsing.set_log_level(options.log_level)
output = {}
if is_use_stdin():
from concurrent import futures
pingparsing.set_log_level(options.log_level)
max_workers = (multiprocessing.cpu_count() * 2
if options.max_workers is None else options.max_workers)
count, deadline = get_ping_param(options)
logger.debug("max-workers={}, count={}, deadline={}".format(
max_workers, count, deadline))
try:
with futures.ProcessPoolExecutor(max_workers) as executor:
future_list = []
for dest_or_file in options.destination_or_file:
logger.debug("start {}".format(dest_or_file))
future_list.append(executor.submit(
parse_ping, logger, dest_or_file, options.interface,
count, deadline))
for future in futures.as_completed(future_list):
key, ping_data = future.result()
output[key] = ping_data
finally:
logger.debug("shutdown ProcessPoolExecutor")
executor.shutdown()
else:
ping_result_text = sys.stdin.read()
ping_parser = pingparsing.PingParsing()
ping_parser.parse(ping_result_text)
output = ping_parser.as_dict()
if options.indent <= 0:
print(json.dumps(output))
else:
print(json.dumps(output, indent=options.indent))
return 0