def __init__(self, base='.', create=True, backup=True,
dry=False, safe=False, quiet=False, diff=True):
self.create = create
self.backup = backup
self.dry = dry
self.safe = safe
self.base = base
self.quiet = quiet
self.diff = diff
self.comparing = False
self.log = Logger()
python类Logger()的实例源码
def __init__(self, key, action):
self.key = key
self.action = action
self.log = Logger()
def recoverIOC(self, data_path, api_keys):
logger = Logger()
logger.info("DummyRecovery.recoverIOC")
def recoverIOC(self, data_path, api_keys):
logger = Logger()
logger.info("OTXRecovery.recoverIOC")
# Record the Starting Time
startTime = time.time()
dataPath = data_path + "/" + Format.OPENIOC_10.value + "/"
key = OTX_KEY
if KEY_NAME in api_keys:
key = api_keys[KEY_NAME]
# Create data dir
if not os.path.exists(dataPath):
os.makedirs(dataPath)
otx = OTXv2(key)
pulses = otx.getall()
logger.info("Download complete - %s events received" % len(pulses) )
# For each pulse get all ioc
for pulse in pulses:
n = json_normalize(pulse)
url = OTX_GET_URL.format(n.id[0])
file_name = dataPath + n.id[0] + ".ioc"
# HTTP Request
headers = {'X-OTX-API-KEY': key, 'User-Agent': OTX_USR_AGT, "Content-Type": "application/json"}
data = {}
params = {'format': Format.OPENIOC_10.value}
response = requests.post(url, params=params, data=json.dumps(data), headers=headers)
with open(file_name, "wb") as code:
code.write(response.content)
logger.debug("Download OpenIOC ioc file: " + n.id[0] + " - " + n["name"][0] + " -> " + file_name)
# Trace the end time and calculate the duration
endTime = time.time() - startTime
logger.info('OTXRecovery finished on: ' + str(endTime) + ' seconds')
def parseIndicator(self, iocFile, iocFileName):
# Record the Starting Time
startTime = time.time()
indicator_to_return = []
# Read file
try:
xmldoc = minidom.parseString(iocFile)
except Exception:
logger = Logger()
logger.info("Ignore IOC file {}".format(iocFile))
else:
# Principal Indicator
id = os.path.splitext(iocFileName)[0]
parent_indicator = Indicator(id, self.getFormat())
description = self.getChildrenByTagName( xmldoc._get_firstChild(), "description")
parent_indicator.description = description
indicator_to_return.append(parent_indicator)
children_indicators = []
itemlist = xmldoc._get_firstChild().getElementsByTagName("Indicator")
for item in itemlist:
children_indicator = Indicator(item.attributes['id'].value, self.getFormat())
children_indicator.operator = item.attributes['operator'].value
children_indicator.evidences = self.__getChildrenEvidences__(item)
children_indicator.parent = parent_indicator
children_indicators.append(children_indicator)
parent_indicator.children = children_indicators
return indicator_to_return
def extractIndicatorOfCompromise(self):
logger = Logger()
logger.info('Start to extract indicator of compromise from data repository: {}'.format(self.data_path))
# Record the Starting Time
startTime = time.time()
indicators_to_return = []
formats = Format.getFormats()
for format in formats:
iocDB = IocHandler(self.data_path + "/" + format.value + "/")
try:
all_iocs = iocDB.get_all_ioc(format)
except InvalidDataPath:
logger.info("Ignore IOC format {}".format(format.value))
continue
logger.info("Getting IOC files with format: " + format.value)
parser = IOCParserFactory.createParser(format)
for iocFileName in all_iocs:
iocFile = iocDB.get_ioc_file(iocFileName)
indicators = parser.parseIndicator(iocFile, iocFileName)
indicators_to_return = indicators_to_return + indicators
# Trace the end time and calculate the duration
endTime = time.time() - startTime
logger.info('Extract ({}) IOCs finished on: {} seconds'.format(indicators_to_return.__len__(), str(endTime)))
return indicators_to_return
def checkEvidences(self, indicators):
self.evidences = []
self.__getEvidences__(indicators, DnsHostScanner.getEvidenteType())
for evidence in self.evidences:
host = evidence.value
for entry_key in self.dns_entries.keys():
entry_value = self.dns_entries[entry_key]
if host in entry_value:
logger = Logger()
logger.warn( "Host MATCH: %s" % host)
evidence.compromised = True
evidence.proof.append(entry_value)
pass
pass
def checkEvidences(self, filePath):
filename = os.path.basename(filePath)
if filename.lower() in self.evidences:
logger = Logger()
logger.warn( "File Name MATCH: %s" % filename)
evidence = self.evidences[filename]
evidence.compromised = True
evidence.proof.append(filePath)
def enable_logging(self, logger_obj):
if isinstance(logger_obj, Logger):
self.logger = logger_obj
def __init__(self):
self.sc = SlackClient(SLACK_TOKEN)
self.logger = Logger()
self.logger.log("Bot initialized")
def __init__(self):
self.logger = Logger()
self.logger.log("New instance of scraper created")
browser = webdriver.PhantomJS('phantomjs')
# browser = webdriver.Chrome('./chromedriver')
browser.get(URL)
self.browser = browser
self.logger.log("Navigated to url")
time.sleep(5)
def __init__(self):
self.logger = Logger()
self.db = sqlite3.connect('dmv_db')
self.cur = self.db.cursor()
self.cur.execute('''
CREATE table IF NOT EXISTS appointment(ts TEXT, location TEXT, appt_time TEXT);
''')
self.db.commit()
def __init__(self):
self.db = DB()
self.logger = Logger()
self.bot = Bot()
def __init__(self, num_actions, discount, exploration_prob, step_size, logging=True):
self.actions = range(num_actions)
self.discount = discount
self.exploration_prob = exploration_prob
self.step_size = step_size
self.num_iters = 1
self.weights = collections.Counter()
self.logger = logger.Logger(agent_name='QLearningAgent', logging=logging)
self.prev_state = None
self.prev_action = None
def test_log_epoch_empty_log(self):
l = logger.Logger(agent_name='test')
l.log_epoch(epoch=0)
log_dir = l.log_dir
self.assertTrue(os.path.isfile(os.path.join(log_dir, 'actions.npz')))
self.assertTrue(os.path.isfile(os.path.join(log_dir, 'rewards.npz')))
self.assertTrue(os.path.isfile(os.path.join(log_dir, 'losses.npz')))
shutil.rmtree(log_dir)
# class TestMovingAverage(unittest.TestCase):
# def test_moving_average_single_item_window(self):
# arr = [1,2,3]
# actual = logger.moving_average(arr, 1)
# self.assertSequenceEqual(actual, arr)
# def test_moving_average_small_window(self):
# arr = [1,2,3,4,5,6,7]
# actual = logger.moving_average(arr, 2)
# expected = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5]
# self.assertSequenceEqual(actual, expected)
# def test_moving_average_small_window_large_variance(self):
# arr = [0,9,0,9,0]
# actual = logger.moving_average(arr, 3)
# expected = [3, 3, 6, 3, 3]
# self.assertSequenceEqual(actual, expected)
# def test_moving_average_large_window_large_variance(self):
# arr = [0,9,0,9,0]
# actual = logger.moving_average(arr, 4)
# expected = [2.25, 2.25, 4.5, 4.5, 2.25]
# self.assertSequenceEqual(actual, expected)
def __init__(self):
self.log = Logger(prog_dir)
self.last_hwconfig = status.get("last_hwconfig")
self.last_position = status.get("last_position")
self.last_line_count = status.get("last_line_count")
self.raft_multi = status.get("raft_multi")
def main():
if len(sys.argv) < 2:
# GUI mode
gui = GUI()
gui.show_gui()
else:
parser = argparse.ArgumentParser()
parser.add_argument("file", help="Path to g-code file to process")
parser.add_argument("hw_config", help="Extruder/hotend configuration", choices=HW_CONFIGS)
parser.add_argument("--debug", help="Show debug prints", action="store_true")
parser.add_argument("--lines", help="Purge lines to print after filament change", type=int,
default=LINE_COUNT_DEFAULT)
parser.add_argument("--position", help="Purge tower position. Default Auto. Auto will try to find a position with enough free space for the tower",
choices=TOWER_POSITIONS, default=AUTO)
args = parser.parse_args()
options = Settings()
options.hw_config = args.hw_config
options.purge_lines = args.lines
options.tower_position = args.position
log = Logger(prog_dir, gui=False, debug=args.debug)
print_type = detect_file_type(args.file, log)
pf = print_type(log, options)
result_file = pf.process(args.file)
log.info("New file saved: %s" % result_file)
def __init__(self,host_addr,timeout):
import logger,tcp_server
self.logger=logger.Logger()
self.tcp_server=tcp_server.TcpServer(host_addr,timeout,self.logger)
self.tcp_server.set_app_data_callback(self.on_app_data)
def __init__(self):
import loop
self._loop = loop.EventLoop(0.1, logger.Logger())
pass
test_logger.py 文件源码
项目:cloudformation-validation-pipeline
作者: awslabs
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_init(self, config):
config.return_value = None
instance = Logger()
self.assertEqual(instance.job_id, None)
self.assertEqual(instance.request_id, 'CONTAINER_INIT')
self.assertEqual(instance.original_job_id, None)
config.assert_called()