python类Logger()的实例源码

proxyIPCLayer.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, logFile, hostIDList):
        threading.Thread.__init__(self)

        self.threadCmdLock = threading.Lock()
        self.threadCmdQueue = []

        self.threadCallbackQueue = {}
        self.threadCallbackLock = threading.Lock()

        self.sharedBufferArray = shared_buffer_array()
        self.log = logger.Logger(logFile, "core IPC Thread")
        self.hostList = hostIDList
        self.nPendingCallbacks = 0
        self.controlCenterID = -1
        self.hostIDtoPowerSimID = None
        self.powerSimIDtohostID = None
        self.transportLayer = None
        self.nHosts = len(self.hostList)
        self.init_shared_ipc_buffers()
basicHostIPCLayer.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, hostID, logFile,sharedBufferArray):
        threading.Thread.__init__(self)

        self.threadCmdLock = threading.Lock()
        self.threadCallbackQueue = {}
        self.threadCallbackLock = threading.Lock()
        self.nPendingCallbacks = 0

        self.threadCmdQueue = []
        self.hostID = hostID
        self.sharedBufferArray = sharedBufferArray
        self.log = logger.Logger(logFile, "Host " + str(hostID) + " IPC Thread")
        self.hostIDtoPowerSimID = None
        self.powerSimIDtohostID = None
        self.attackLayer = None
        self.raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
        self.init_shared_ipc_buffer()

    # self.raw_sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
config.py 文件源码 项目:dotdrop 作者: deadc0de6 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, cfgpath):
        if not os.path.exists(cfgpath):
            raise ValueError('config file does not exist')
        self.cfgpath = cfgpath
        self.log = Logger()
        # link inside content
        self.configs = {}
        # link inside content
        self.profiles = {}
        # not linked to content
        self.dotfiles = {}
        # not linked to content
        self.actions = {}
        # not linked to content
        self.prodots = {}
        if not self._load_file():
            raise ValueError('config is not valid')
stix_parser.py 文件源码 项目:shcft 作者: celiadominguez 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parseIndicator(self, iocFile, iocFileName):
        indicator_to_return = []

        # Read file
        try:
            xmldoc = minidom.parseString(iocFile)
        except Exception:
            logger = Logger()
            logger.info("Ignore IOC file {}".format(iocFile))
        else:
            itemlist = xmldoc.getElementsByTagName('stix:STIX_Package')

            for item in itemlist:
                id = item.attributes['id'].value
                indicator = Indicator(id, self.getFormat())
                indicator.title = self.getChildrenByTagName(item, 'stix:Title')

                indicator.evidences = self.__getChildrenEvidences__(item)
                indicator_to_return.append(indicator)

        return indicator_to_return
md5_scanner.py 文件源码 项目:shcft 作者: celiadominguez 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def checkEvidences(self, filePath):

        # Calculate the MD5
        try:
            with open(filePath, 'rb') as file:
                file_data = file.read()
            hashValue = hashlib.md5(file_data).hexdigest()
            logger = Logger()
            logger.warn(("Hash file {}: {}").format( filePath, hashValue) )

            if hashValue in self.evidences:
                logger = Logger()
                logger.warn("Hash md5 MATCH: %s" % filePath)
                evidence = self.evidences[hashValue]
                evidence.compromised = True
                evidence.proof.append(filePath)

        except Exception:
            traceback.print_exc()
        pass
client.py 文件源码 项目:SimpleReactor 作者: LightCong 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, timeout,max_msg_count,msg,connect_num,dst_addr):
        import logger, tcp_client
        self.logger = logger.Logger()
        self.tcp_client = tcp_client.TcpClient(timeout, self.logger)
        self.tcp_client.set_app_data_callback(self.on_app_data)
        self.io_thread = threading.Thread(target=self.io_thread_func)
        self.io_thread.setDaemon(True)
        self.io_thread.start()


        self._max_msg_count=max_msg_count#?????????
        self._msg=msg
        self._connect_num=connect_num
        self._dst_addr=dst_addr

        self.start_time = 0#???????
        self.end_time = 0  # ??????

        self.recv_bytes=0#??????
        self.recv_msg_count=0
main.py 文件源码 项目:Remote-Integrity-Tool 作者: DearBytes 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def dispatch_remote_integrity_checker(args):
    """
    Dispatch the main remote integrity tool
    :param args: Arguments passed to the script
    :return: None
    """
    config = load_config(path=args.config)

    server = Server(config=config)
    server.connect()
    output  = server.acquire_checksum_list()

    logger = Logger(config=config)
    integrity = Integrity(config=config)

    integrity.on_events_detected += logger.dispatch_syslog
    integrity.on_events_detected += logger.dispatch_events_mail
    integrity.on_events_detected += logger.dispatch_telegram_msg

    integrity.load_database()
    integrity.identify(output)
    integrity.print_statistics()

    database.commit()
iptables.py 文件源码 项目:SCUTUM 作者: K4YT3X 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, log=False):
        """
        Keyword Arguments:
            log {object} -- object of logger (default: {False})

        Raises:
            FileNotFoundError -- raised when UFW not installed
        """
        self.log = log
        if log is False:
            from logger import Logger
            self.log = Logger()

        if not os.path.isfile('/usr/sbin/ufw'):  # Detect if ufw installed
            print(avalon.FM.BD + avalon.FG.R + '\nWe have detected that you don\'t have UFW installed!' + avalon.FM.RST)
            print('UFW Firewall function requires UFW to run')
            if not self.sysInstallPackage("ufw"):
                avalon.error("ufw is required for this function. Exiting...")
                raise FileNotFoundError("File: \"/usr/sbin/ufw\" not found")
adapter.py 文件源码 项目:SCUTUM 作者: K4YT3X 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, interface, log=False):
        """
        Arguments:
            interface {string} -- name of interface to handle
            log {object} -- object of logger (default: {False})

        Raises:
            FileNotFoundError -- raised when arptables not installed
        """
        self.log = log
        if log is False:
            from logger import Logger
            self.log = Logger()
        self.interface = interface
        installer = Installer()
        if not os.path.isfile('/usr/bin/arptables') and not os.path.isfile('/sbin/arptables'):  # Detect if arptables installed
            print(avalon.FM.BD + avalon.FG.R + '\nWe have detected that you don\'t have arptables installed!' + avalon.FM.RST)
            print('SCUTUM requires arptables to run')
            if not installer.sysInstallPackage("arptables"):
                avalon.error("arptables is required for scutum. Exiting...")
                raise FileNotFoundError("File: \"/usr/bin/arptables\" and \"/sbin/arptables\" not found")
test_logger.py 文件源码 项目:cloudformation-validation-pipeline 作者: awslabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_config(self):
        instance = Logger()
        self.assertEqual(instance.config(request_id='request_id', original_job_id="original_job_id", job_id='job_id',
                        artifact_revision_id='artifact_revision_id', pipeline_execution_id='pipeline_execution_id',
                        pipeline_action='pipeline_action', stage_name='stage_name', pipeline_name='pipeline_name',
                        loglevel='loglevel', botolevel='botolevel'), None)
        self.assertEqual(type(instance.log), logging.LoggerAdapter)
        self.assertEqual(logging.getLogger('boto3').level, 40)
        self.assertEqual(instance.log.logger.level, 20)
        self.assertEqual(instance.request_id, 'request_id')
        self.assertEqual(instance.original_job_id, 'original_job_id')
        self.assertEqual(instance.job_id, 'job_id')
        self.assertEqual(instance.pipeline_execution_id, 'pipeline_execution_id')
        self.assertEqual(instance.artifact_revision_id, 'artifact_revision_id')
        self.assertEqual(instance.pipeline_action, 'pipeline_action')
        self.assertEqual(instance.stage_name, 'stage_name')
test_checksum_manager.py 文件源码 项目:chaos-monitor 作者: mikefeneley 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):

        # Database settings.
        self.host = 'localhost'
        self.port = '3306'

        self.test_database_name = "TEST_CHECKSUM_MANAGER_DB"
        self.test_table_name = "TEST_TABLE"
        self.empty_table_name = ""
        self.bad_schema_table = "BAD_SCHEMA_TABLE"

        # Setup connection information that lets the tester
        # setup different states for testing.
        self.connect = mysql.connector.connect( user='root', passwd='', host=self.host, port=self.port)
        self.cursor = self.connect.cursor()
        self.db_connector = DBConnector(db_name=self.test_database_name)
        self.logger = Logger()
        self.delete_test_db()
monitor_cli.py 文件源码 项目:chaos-monitor 作者: mikefeneley 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self):
        """
        Initialisation of Parser
        """
        self.parser = argparse.ArgumentParser()
        self.help_af = "To add a file to checksum tuple database, type: 'cmon -af <filename>'"
        self.help_rf = "To remove a file from checksum tuple database, type: 'cmon -rf <filename>'"
        self.help_lf = "To get all tuples from checksum tuple database, type: 'cmon -lf'"
        self.help_ar = "To add an email to recipient database, type: 'cmon -ar <email>'"
        self.help_rr = "To remove an email from recipient database, type: 'cmon -rr <email>'"
        self.help_lr = "To get list of recipients from recipient database, type: 'cmon -lr'"
        self.start = "To start the daemon, type: 'cmon --start'"
        self.stop = "To stop the daemon, type: 'cmon --stop'"
        self.status = "To get the status of daemon, type: 'cmon --status'"
        self.restart = "To restart the daemon, type: 'cmon --restart'"
        self.build_parser()
        self.logger = Logger()
notification.py 文件源码 项目:chaos-monitor 作者: mikefeneley 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, email_server="127.0.0.1", email_port=587,
                 email_username="", email_pwd=""):
        """
        Set up connection information and authentication tokens to allow user to
        access smtp server.

        :param email_server: IP Address of SMTP server for sending mail.
        :type email_server: string
        :param email_port: Port to use to send email
        :type email_port: int
        :param email_username: Authentication username for SMTP server.
        :type email_username: string
        :param email_pwd: Authentication username for SMTP server.
        :type email_pwd: string
        """
        self.email_port = email_port
        self.email_server = email_server
        self.gmail_user = email_username
        self.gmail_pwd = email_pwd
        self.logger = Logger()
test_logger.py 文件源码 项目:aws-service-catalog-validation-pipeline 作者: awslabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_config(self):
        instance = Logger()
        self.assertEqual(instance.config(request_id='request_id', original_job_id="original_job_id", job_id='job_id',
                        artifact_revision_id='artifact_revision_id', pipeline_execution_id='pipeline_execution_id',
                        pipeline_action='pipeline_action', stage_name='stage_name', pipeline_name='pipeline_name',
                        loglevel='loglevel', botolevel='botolevel'), None)
        self.assertEqual(type(instance.log), logging.LoggerAdapter)
        self.assertEqual(logging.getLogger('boto3').level, 40)
        self.assertEqual(instance.log.logger.level, 20)
        self.assertEqual(instance.request_id, 'request_id')
        self.assertEqual(instance.original_job_id, 'original_job_id')
        self.assertEqual(instance.job_id, 'job_id')
        self.assertEqual(instance.pipeline_execution_id, 'pipeline_execution_id')
        self.assertEqual(instance.artifact_revision_id, 'artifact_revision_id')
        self.assertEqual(instance.pipeline_action, 'pipeline_action')
        self.assertEqual(instance.stage_name, 'stage_name')
simulation.py 文件源码 项目:CS-1-Programming-Fundamentals 作者: Product-College-Courses 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, population_size, vacc_percentage, virus_name,
                 mortality_rate, basic_repro_num, initial_infected=1):
        self.population_size = population_size
        self.population = []
        self.total_infected = 0
        self.current_infected = 0
        self.next_person_id = 0
        self.virus_name = virus_name
        self.mortality_rate = mortality_rate
        self.basic_repro_num = basic_repro_num
        self.file_name = "{}_simulation_pop_{}_vp_{}_infected_{}.txt".format(
            virus_name, population_size, vacc_percentage, initial_infected)

        # TODO: Create a Logger object and bind it to self.logger.  You should use this
        # logger object to log all events of any importance during the simulation.  Don't forget
        # to call these logger methods in the corresponding parts of the simulation!
        self.logger = None

        # This attribute will be used to keep track of all the people that catch
        # the infection during a given time step. We'll store each newly infected
        # person's .ID attribute in here.  At the end of each time step, we'll call
        # self._infect_newly_infected() and then reset .newly_infected back to an empty
        # list.
        self.newly_infected = []
        # TODO: Call self._create_population() and pass in the correct parameters.
        # Store the array that this method will return in the self.population attribute.
func.py 文件源码 项目:auto_proxy 作者: peablog 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_logger():
    global log_instance
    if log_instance:
        return log_instance
    log_instance = Logger('log.txt')

    return log_instance
main.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self.logger = Logger()
        self.control = ImportaController(self.logger)
urlsnuffler.py 文件源码 项目:Web-Stalker 作者: Dylan-halls 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        global log
        log = logger.Logger()
        log.info('Sniffer is starting up')
        with open('Stalker.log', 'w') as file:
            file.write('')
            file.close()
        log.info('Wiped previous session')
basicNetworkServiceLayer.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self,hostID,logFile,hostID_To_IP) :

        threading.Thread.__init__(self)

        self.threadCmdLock = threading.Lock()
        self.threadCmdQueue = []
        self.hostID = hostID
        self.IPMap = hostID_To_IP
        self.hostIP,self.listenPort = self.IPMap[self.hostID]
        self.log = logger.Logger(logFile,"Host " + str(hostID) + " Network Layer Thread")
        self.hostIDtoPowerSimID = None
        self.powerSimIDtohostID = None
        self.attackLayer = None
proxyNetworkLayer.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self,logFile,powerSimIP) :
        threading.Thread.__init__(self)

        self.threadCmdLock = threading.Lock()
        self.NetLayerRxLock = threading.Lock()
        self.NetLayerRxBuffer = []
        self.threadCmdQueue = []
        self.powerSimIP = powerSimIP 
        self.log = logger.Logger(logFile,"core Network Layer Thread")
        self.transportLayer = None
proxyTransportLayer.py 文件源码 项目:NetPower_TestBed 作者: Vignesh2208 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self,logFile,IPCLayer,NetworkServiceLayer) :
        threading.Thread.__init__(self)

        self.threadCmdLock = threading.Lock()
        self.threadCmdQueue = []
        self.threadCallbackQueue = {}
        self.threadCallbackLock = threading.Lock()

        self.log = logger.Logger(logFile,"core Transport Layer Thread")
        self.IPCLayer = IPCLayer
        self.NetServiceLayer = NetworkServiceLayer
        self.nPendingCallbacks = 0
        self.sendSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
agents.py 文件源码 项目:rogueinabox 作者: rogueinabox 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, configs):
        self.c = configs
        #init roguebox
        self.rb = RogueBox(configs)
        self.ui = UIManager.init(self.c["userinterface"], self.rb)
        self.l = Logger(log_depth=self.c["verbose"], log_targets=["file", "ui"], ui=self.ui)
        self.ui.on_key_press(self._act_callback)
agents.py 文件源码 项目:rogueinabox 作者: rogueinabox 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, configs):
        self.rb = RogueBox(configs)
        self._pending_action_timer = None
        self.ui = UIManager.init(configs["userinterface"], self.rb)
        self.l = Logger(log_depth=configs["verbose"], log_targets=["file", "ui"], ui=self.ui)
        self.ui.on_key_press(self._keypress_callback)
        self._timer_value = 100
        self._pending_action_timer = self.ui.on_timer_end(self._timer_value, self._act_callback)
agents.py 文件源码 项目:rogueinabox 作者: rogueinabox 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, configs):
        import history, models

        # class instances
        self.rogomatic = StalkOMatic(configs)
        self.model_manager = getattr(models, configs["model_manager"])(self.rogomatic)
        self.history_manager = getattr(history, configs["history_manager"])(self)
        # configs
        self.configs = configs
        self.configs["iteration"] = 1
        self.configs["actions"] = self.rogomatic.get_actions()
        self.configs["actions_num"] = len(self.configs["actions"])
        # gui stuff
        ui = None
        log_targets = []
        if configs["logsonfile"]:
            log_targets.append("file")
        if self.configs["gui"]:
            self.ui = UIManager.init(configs["userinterface"], self.rogomatic)
            self._pending_action = None
            ui = self.ui
            log_targets.append("ui")
            self.l = Logger(log_depth=configs["verbose"], log_targets=log_targets, ui=ui)
        else:
            log_targets.append("terminal")
            self.l = Logger(log_depth=configs["verbose"], log_targets=log_targets)
        # state
        self.state = self.model_manager.reshape_initial_state(self.rogomatic.compute_state())
        self.old_state = self.state
        self.last_pos = self.rogomatic.player_pos
        self.same_pos_count = 0
        self.starting = False
agents.py 文件源码 项目:rogueinabox 作者: rogueinabox 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, configs):
        import models, history

        # class instances
        self.rb = RogueBox(configs)
        self.model_manager = getattr(models, configs["model_manager"])(self.rb)
        self.history_manager = getattr(history, configs["history_manager"])(self)
        # configs
        self.configs = configs
        self.configs["iteration"] = 1
        self.configs["actions"] = self.rb.get_actions()
        self.configs["actions_num"] = len(self.configs["actions"])
        # gui stuff
        ui = None
        log_targets = []
        if configs["logsonfile"]:
            log_targets.append("file")
        if self.configs["gui"]:
            self.ui = UIManager.init(configs["userinterface"], self.rb)
            self._pending_action = None
            ui = self.ui
            log_targets.append("ui")
            self.l = Logger(log_depth=configs["verbose"], log_targets=log_targets, ui=ui)
        else:
            log_targets.append("terminal")
            self.l = Logger(log_depth=configs["verbose"], log_targets=log_targets)
        # state
        self.state = self.model_manager.reshape_initial_state(self.rb.compute_state())
        self.old_state = self.state
        # model
        self.model = self.model_manager.build_model()
        self.target_model = self.model_manager.build_model()
        self.target_model.set_weights(self.model.get_weights())
        # resume from file
        # load weights, transitions history and parameters from assets, if any
        self._load_progress()
pedetailer.py 文件源码 项目:Malshare-Crawler 作者: roj4s 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, pe_db_address=None, logger=None, pe_db_handler=None):
        self.logger = logger
        if self.logger is None:
            self.logger = Logger()
        self.pe_db_address = pe_db_address
        if self.pe_db_address is None:
            self.pe_db_address = "pedb.db"
        self.pe_db_handler = pe_db_handler
        self.pe_db = self.pe_connect_to_database(self.pe_db_address)
manage.py 文件源码 项目:tm 作者: tulians 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, path="/opt/tm/logs/", project_name=None):
        """Tasks manager constructor"""
        self.log = Logger(path, project_name)
        if not self._check_if_exists():
            self.create_table("NotStarted", "WorkingOn", "Completed")
        # Use this queue to save the most recent new, modified, on process or
        # completed tasks. These tasks should be saved in tuples along with
        # their corresponding table. The length of this queue should not be
        # greater than 10 tasks.
        self.recent_tasks = TaskCache(10)
        # Lets the application know whether there are partial commits left to
        # push to remote repository.
        self.partials_exist = Partials(path)
validator.py 文件源码 项目:proxypool 作者: sml2h3 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self):
        self.logger = Logger('validator.py')
        self.valid_cfg = VALIDATE_CONFIG
        self.result = []
        self.proxies = {}
crawler.py 文件源码 项目:proxypool 作者: sml2h3 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self):
        self.proxylist = []
        self.logger = Logger('crawler.py')
server.py 文件源码 项目:proxypool 作者: sml2h3 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self,PORT):
        self.port = PORT
        self.logger = Logger('server.py')
        self.run()


问题


面经


文章

微信
公众号

扫码关注公众号