python类shutdown()的实例源码

config.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _config(props, category=None, disable_existing_loggers=1):
  logging.shutdown()

  # critical section
  # patterned after from logging.config... 
  logging._acquireLock()
  try:
     logging._handlers.clear()
     del logging._handlerList[:]
     # Handlers add themselves to logging._handlers
     handlers = _install_handlers(props)
     _install_loggers(props, handlers, category, disable_existing_loggers)
  except Exception as e:
    traceback.print_exc()
    raise e
  finally:
    logging._releaseLock()
logwriter.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def cancel(self):
        '''To exit cleanly, flush all write buffers, and stop all running timers.
        '''
        #self.stopflag = True
        #time.sleep(2.5)
        #self.queuetimer.cancel()
        self.finished.set()

        self.WriteToLogFile()
        self.FlushLogWriteBuffers("Flushing buffers prior to exiting")
        logging.shutdown()
        self.flushtimer.cancel()

        self.logrotatetimer.cancel()

        if self.settings['E-mail']['SMTP Send Email'] == True:
            self.emailtimer.cancel()
        if self.settings['Log Maintenance']['Delete Old Logs'] == True:
            self.oldlogtimer.cancel()
        #~ if self.settings['Timestamp']['Timestamp Enable'] == True:
            #~ self.timestamptimer.cancel()
        if self.settings['Zip']['Zip Enable'] == True:
            self.ziptimer.cancel()
log_test.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def test_it(self, mock_register, mock_get, mock_except_hook, mock_sys):
        mock_sys.argv = ['--debug']
        mock_sys.version_info = sys.version_info
        self._call()

        mock_root_logger = mock_get()
        mock_root_logger.setLevel.assert_called_once_with(logging.DEBUG)
        self.assertEqual(mock_root_logger.addHandler.call_count, 2)

        MemoryHandler = logging.handlers.MemoryHandler
        memory_handler = None
        for call in mock_root_logger.addHandler.call_args_list:
            handler = call[0][0]
            if memory_handler is None and isinstance(handler, MemoryHandler):
                memory_handler = handler
            else:
                self.assertTrue(isinstance(handler, logging.StreamHandler))
        self.assertTrue(
            isinstance(memory_handler.target, logging.StreamHandler))

        mock_register.assert_called_once_with(logging.shutdown)
        mock_sys.excepthook(1, 2, 3)
        mock_except_hook.assert_called_once_with(
            memory_handler, 1, 2, 3, debug=True, log_path=mock.ANY)
rocket.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def submit(self, fn, *args, **kwargs):
        if self._shutdown_lock.acquire():
            if self._shutdown:
                self._shutdown_lock.release()
                raise RuntimeError(
                    'Cannot schedule new futures after shutdown')

            f = WSGIFuture(self.futures)
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            self._shutdown_lock.release()
            return f
        else:
            return False
ezpy.py 文件源码 项目:ezpy 作者: jhermann 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run():
    """Execute main loop."""
    try:
        setup()
        try:
            parser, args = make_argparser()
            mainloop(parser, args)
        except KeyboardInterrupt as exc:
            sys.stderr.flush()
            sys.exit(2)
        except IOError as exc:
            if exc.errno == errno.EPIPE:  # downstream is done with our output
                sys.stderr.flush()
                sys.exit(0)
            else:
                raise
    finally:
        logging.shutdown()
vtr_plugin.py 文件源码 项目:Vector-Tiles-Reader-QGIS-Plugin 作者: geometalab 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def unload(self):
        if self._current_reader:
            self._current_reader.get_source().close_connection()
            self._current_reader = None

        try:
            self.iface.mapCanvas().xyCoordinates.disconnect(self._handle_mouse_move)
            QgsMapLayerRegistry.instance().layersWillBeRemoved.disconnect(self._on_remove)
            self.iface.newProjectCreated.disconnect(self._on_project_change)
            self.iface.projectRead.disconnect(self._on_project_change)
            self._debouncer.stop()
            self._debouncer.shutdown()
            self.iface.layerToolBar().removeAction(self.toolButtonAction)
            self.iface.removePluginVectorMenu("&Vector Tiles Reader", self.about_action)
            self.iface.removePluginVectorMenu("&Vector Tiles Reader", self.open_connections_action)
            self.iface.removePluginVectorMenu("&Vector Tiles Reader", self.reload_action)
            self.iface.removePluginVectorMenu("&Vector Tiles Reader", self.clear_cache_action)
            self.iface.addLayerMenu().removeAction(self.open_connections_action)
            logging.shutdown()
        except:
            pass
rocket.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def submit(self, fn, *args, **kwargs):
        if self._shutdown_lock.acquire():
            if self._shutdown:
                self._shutdown_lock.release()
                raise RuntimeError(
                    'Cannot schedule new futures after shutdown')

            f = WSGIFuture(self.futures)
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            self._shutdown_lock.release()
            return f
        else:
            return False
ici_run.py 文件源码 项目:icing 作者: slipguru 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def init_logger(filename, root, verbose):
    """Initialise logger."""
    logfile = os.path.join(root, filename + '.log')
    logging.shutdown()
    root_logger = logging.getLogger()
    for _ in list(root_logger.handlers):
        root_logger.removeHandler(_)
        _.flush()
        _.close()
    for _ in list(root_logger.filters):
        root_logger.removeFilter(_)
        _.flush()
        _.close()

    logging.basicConfig(filename=logfile, level=logging.INFO, filemode='w',
                        format='%(levelname)s (%(asctime)-15s): %(message)s')
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.INFO if verbose else logging.ERROR)
    stream_handler.setFormatter(
        logging.Formatter('%(levelname)s (%(asctime)-15s): %(message)s'))

    root_logger.addHandler(stream_handler)
    return logfile
test_logging.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        super(LoggerAdapterTest, self).setUp()
        old_handler_list = logging._handlerList[:]

        self.recording = RecordingHandler()
        self.logger = logging.root
        self.logger.addHandler(self.recording)
        self.addCleanup(self.logger.removeHandler, self.recording)
        self.addCleanup(self.recording.close)

        def cleanup():
            logging._handlerList[:] = old_handler_list

        self.addCleanup(cleanup)
        self.addCleanup(logging.shutdown)
        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
rocket.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def submit(self, fn, *args, **kwargs):
        if self._shutdown_lock.acquire():
            if self._shutdown:
                self._shutdown_lock.release()
                raise RuntimeError(
                    'Cannot schedule new futures after shutdown')

            f = WSGIFuture(self.futures)
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            self._shutdown_lock.release()
            return f
        else:
            return False
rocket.py 文件源码 项目:web3py 作者: web2py 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def submit(self, fn, *args, **kwargs):
        if self._shutdown_lock.acquire():
            if self._shutdown:
                self._shutdown_lock.release()
                raise RuntimeError(
                    'Cannot schedule new futures after shutdown')

            f = WSGIFuture(self.futures)
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            self._shutdown_lock.release()
            return f
        else:
            return False
rocket.py 文件源码 项目:CSC376KnowledgeManagement 作者: WCotterman 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def submit(self, fn, *args, **kwargs):
        if self._shutdown_lock.acquire():
            if self._shutdown:
                self._shutdown_lock.release()
                raise RuntimeError(
                    'Cannot schedule new futures after shutdown')

            f = WSGIFuture(self.futures)
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            self._shutdown_lock.release()
            return f
        else:
            return False
test_logging.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def setUp(self):
        super(LoggerAdapterTest, self).setUp()
        old_handler_list = logging._handlerList[:]

        self.recording = RecordingHandler()
        self.logger = logging.root
        self.logger.addHandler(self.recording)
        self.addCleanup(self.logger.removeHandler, self.recording)
        self.addCleanup(self.recording.close)

        def cleanup():
            logging._handlerList[:] = old_handler_list

        self.addCleanup(cleanup)
        self.addCleanup(logging.shutdown)
        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
rocket.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def submit(self, fn, *args, **kwargs):
        if self._shutdown_lock.acquire():
            if self._shutdown:
                self._shutdown_lock.release()
                raise RuntimeError(
                    'Cannot schedule new futures after shutdown')

            f = WSGIFuture(self.futures)
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            self._shutdown_lock.release()
            return f
        else:
            return False
main.py 文件源码 项目:security-utilities 作者: Eudoxier 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def main(args):
    """ Main

    TODO: validate ip address input

    """
    setup_logs()
    _logger.debug("Starting main()")

    ips = []
    args = parse_args(args)
    print("[*] Fuzzing port {} on IP address {}".format(args.port, args.ip))
    fuzz(args.ip, args.port, args.size, args.increment)

    _logger.debug("All done, shutting down.")
    logging.shutdown()
main.py 文件源码 项目:security-utilities 作者: Eudoxier 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def main(args):
    """ Main

    """
    setup_logs()
    _logger.debug("Starting main()")

    args = parse_args(args)
    print("[*] Initiating attack on IP address {}:{}".format(args.ip, args.port))
    if args.test:
        pwn(args.ip, args.port, 'test')
    elif args.unique:
        pwn(args.ip, args.port, 'unique')
    elif args.chars:
        pwn(args.ip, args.port, 'chars')
    elif args.breakpoint:
        pwn(args.ip, args.port, 'break')
    else:
        pwn(args.ip, args.port, args.payload)

    _logger.debug("All done, shutting down.")
    logging.shutdown()
daemon.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def run(self):
        """Loads plugins, and initiates polling schedules."""
        reactor.callWhenRunning(self.install_sighandlers)

        if self.options.netbox:
            self.setup_single_job()
        elif self.options.multiprocess:
            self.setup_multiprocess(self.options.multiprocess,
                                    self.options.max_jobs)
        elif self.options.worker:
            self.setup_worker()
        else:
            self.setup_scheduling()

        reactor.suggestThreadPoolSize(self.options.threadpoolsize)
        reactor.addSystemEventTrigger("after", "shutdown", self.shutdown)
        reactor.run()
test_logging.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        super(LoggerAdapterTest, self).setUp()
        old_handler_list = logging._handlerList[:]

        self.recording = RecordingHandler()
        self.logger = logging.root
        self.logger.addHandler(self.recording)
        self.addCleanup(self.logger.removeHandler, self.recording)
        self.addCleanup(self.recording.close)

        def cleanup():
            logging._handlerList[:] = old_handler_list

        self.addCleanup(cleanup)
        self.addCleanup(logging.shutdown)
        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
comment_notifier.py 文件源码 项目:duoshuo-comment-notifier 作者: LooEv 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def monitor():
    logger.debug(u'??????...')
    get_config()
    try:
        content = handler()
        if content:
            send_email(content, message_type='comment')
    except Exception as e:
        logger.exception(e)
    else:
        global the_number_of_mistakes
        the_number_of_mistakes = -2
        if not content:
            logger.debug(u'????????')
    finally:
        # ??????????????????????
        logging.shutdown()
webgobbler.py 文件源码 项目:hachoir3 作者: vstinner 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            try:
                commandToken = self.inputCommandQueue.get_nowait()  # Get orders
                if commandToken.shutdown:
                    self._logInfo("Shutting down")
                    self.closing = True
                    self.pool.shutdown()
                    self.pool.join()
                    return  # Exit the tread.
                else:
                    self._logError("Unknown command token")
                    pass  # Unknown command, ignore.
            except Queue.Empty:
                #self._log("Nothing in queue")
                time.sleep(0.5)
webgobbler.py 文件源码 项目:hachoir3 作者: vstinner 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def image_saver(config, imageName='webgobbler.bmp', generateSingleImage=False):
    ''' Continuously generate new images (using the assembler_superpose) and save them
        into a file.
        config (an applicationConfig object) : the program configuration
        imageName (string): name of image to save (eg."toto.jpeg","dudu.png"...)
        generateSingleImage (bool): If True, will generate a single image.
    '''
    log = logging.getLogger('image_saver')
    a = assembler_superpose(pool=imagePool(config=config), config=config)
    a.start()
    try:
        while True:
            log.info("Generating a new image to %s" % imageName)
            a.superposeB()  # Evolve current image
            a.saveImageTo(imageName)
            if generateSingleImage:
                break
            log.info("Will generate a new image in %d seconds." %
                     config["program.every"])
            time.sleep(config["program.every"])
    finally:
        a.shutdown()
        a.join()
mission.py 文件源码 项目:eesa 作者: EqualExperts 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def autopilot(self):
        # Make sure the payload release is in the closed position
        self.lock_payload()

        # TODO use GPS Fix = 3D before allowing continue?
        # TODO check if safety switch activated?
        while not self.connection.location.global_relative_frame.alt and not self.stop():
            self.log( "No GPS signal yet" )
            time.sleep(1)

        while not self.stop():
            if not self.released:
                time.sleep(3)

        logging.shutdown()
        self.connection.close()
rocket.py 文件源码 项目:StuffShare 作者: StuffShare 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def submit(self, fn, *args, **kwargs):
        if self._shutdown_lock.acquire():
            if self._shutdown:
                self._shutdown_lock.release()
                raise RuntimeError(
                    'Cannot schedule new futures after shutdown')

            f = WSGIFuture(self.futures)
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            self._shutdown_lock.release()
            return f
        else:
            return False
mechanic_command.py 文件源码 项目:mechanic 作者: server-mechanic 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run(self):
    args = self.mechanic.commandLine
    if args.verbose:
      self.logger.setLevel(logging.DEBUG)
    else:
      self.logger.setLevel(logging.INFO)
    if not self.mechanic.config.getLogFile() in [ "", "/dev/stderr", "stderr" ]:
      makeparentdirs(self.mechanic.config.getLogFile()) 
      self.logger.addHandler(logging.FileHandler(self.mechanic.config.getLogFile()))

    command = self.mechanic.commands.get(args.commandName)
    try:
      if command is not None:
        command.run(args)
      else:
        self.mechanic.defaultCommand.run(args)
    except MigrationFailedException as e:
      self.logger.error(e.message)
      return 2
    except FollowUpCommandFailedException as e:
      self.logger.error(e.message)
      return 3
    except MechanicException as e:
      self.logger.error(e.message)
      return 1
    except Exception as e:
      self.logger.error(e)
      return 1
    finally:
      logging.shutdown()
    return 0
process_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def tearDown(self):
        if task_id() is not None:
            # We're in a child process, and probably got to this point
            # via an uncaught exception.  If we return now, both
            # processes will continue with the rest of the test suite.
            # Exit now so the parent process will restart the child
            # (since we don't have a clean way to signal failure to
            # the parent that won't restart)
            logging.error("aborting child process from tearDown")
            logging.shutdown()
            os._exit(1)
        # In the surviving process, clear the alarm we set earlier
        signal.alarm(0)
        super(ProcessTest, self).tearDown()
process_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def tearDown(self):
        if task_id() is not None:
            # We're in a child process, and probably got to this point
            # via an uncaught exception.  If we return now, both
            # processes will continue with the rest of the test suite.
            # Exit now so the parent process will restart the child
            # (since we don't have a clean way to signal failure to
            # the parent that won't restart)
            logging.error("aborting child process from tearDown")
            logging.shutdown()
            os._exit(1)
        # In the surviving process, clear the alarm we set earlier
        signal.alarm(0)
        super(ProcessTest, self).tearDown()
process_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def tearDown(self):
        if task_id() is not None:
            # We're in a child process, and probably got to this point
            # via an uncaught exception.  If we return now, both
            # processes will continue with the rest of the test suite.
            # Exit now so the parent process will restart the child
            # (since we don't have a clean way to signal failure to
            # the parent that won't restart)
            logging.error("aborting child process from tearDown")
            logging.shutdown()
            os._exit(1)
        # In the surviving process, clear the alarm we set earlier
        signal.alarm(0)
        super(ProcessTest, self).tearDown()
server.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def stop(self, exit=True):
        for servers in (self.xmlrpcd, self.jsonrpcd, self.webdavd):
            for server in servers:
                server.stop()
                server.join()
        if exit:
            if self.options.pidfile:
                os.unlink(self.options.pidfile)
            logging.getLogger('server').info('stopped')
            logging.shutdown()
            sys.exit(0)
logger.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __exit__(self):
        logging.shutdown()
        sys.exit()
smtpHandler.py 文件源码 项目:scrutiny 作者: lshift 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test():
    logger = logging.getLogger("")
    logger.setLevel(logging.DEBUG)
    logger.addHandler(BufferingSMTPHandler(MAILHOST, FROM, TO, SUBJECT, 10))
    for i in xrange(102):
        logger.info("Info index = %d", i)
    logging.shutdown()


问题


面经


文章

微信
公众号

扫码关注公众号