python类ERROR的实例源码

exif_log.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def format(self, record):
        if self.debug and self.color:
            if record.levelno >= logging.CRITICAL:
                color = TEXT_RED
            elif record.levelno >= logging.ERROR:
                color = TEXT_RED
            elif record.levelno >= logging.WARNING:
                color = TEXT_YELLOW
            elif record.levelno >= logging.INFO:
                color = TEXT_GREEN
            elif record.levelno >= logging.DEBUG:
                color = TEXT_CYAN
            else:
                color = TEXT_NORMAL
            record.levelname = "\x1b[%sm%s\x1b[%sm" % (color, record.levelname, TEXT_NORMAL)
        return logging.Formatter.format(self, record)
core.py 文件源码 项目:rltk 作者: usc-isi-i2 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_logging_settings(self, file_path=None, level=None, format=None):
        """
        Update global logging. If None is set to the arguments, it will keep the previous setting.

        Args:
            file_path (str): It is Initialized to 'log.log'.
            level (str): It can be 'error', 'warning' or 'info'. It is Initialized to 'error'.
            format (str): It is Initialized to '%(asctime)s %(levelname)s %(message)s'.
        """

        LOGGING_STRING_MAP = {'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR}
        if file_path is not None:
            self._logger_config['file_path'] = self._get_abs_path(file_path)
        if level is not None:
            self._logger_config['level'] = level
        if format is not None:
            self._logger_config['format'] = format

        logger = logging.getLogger(Configuration.LOGGER_NAME)
        log_file = logging.FileHandler(self._logger_config['file_path'])
        logger.addHandler(log_file)
        log_file.setFormatter(logging.Formatter(self._logger_config['format']))
        logger.setLevel(LOGGING_STRING_MAP[self._logger_config['level']])
        self._logger = logger
utils_mnist.py 文件源码 项目:TFExperiments 作者: gnperdue 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_logging_level(log_level):
    logging_level = logging.INFO
    if log_level == 'DEBUG':
        logging_level = logging.DEBUG
    elif log_level == 'INFO':
        logging_level = logging.INFO
    elif log_level == 'WARNING':
        logging_level = logging.WARNING
    elif log_level == 'ERROR':
        logging_level = logging.ERROR
    elif log_level == 'CRITICAL':
        logging_level = logging.CRITICAL
    else:
        print('Unknown or unset logging level. Using INFO')

    return logging_level
LogWrapper.py 文件源码 项目:tagberry 作者: csailer 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _logWriter(self,level,message,exception=None):

        self._logger.setLevel(level)
        self._fh.setLevel(level)
        self._ch.setLevel(level)
        if(exception!=None):
            exFormatted = self._formatException(exception)

        msg = "%s%s" % (message,exFormatted)

        if(level==logging.DEBUG):
           logging.debug(msg) 
        elif(level==logging.INFO):
           logging.info(msg) 
        elif(level==logging.WARN):
           logging.warn(msg) 
        elif(level==logging.FATAL):
           logging.fatal(msg) 
        if(level==logging.ERROR):
           logging.error(msg)
EmailService.py 文件源码 项目:ZhihuSpider 作者: KEN-LJQ 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_message(self, email_content):
        # ???????
        now = datetime.datetime.now()
        header = self.smtp_email_header + '[' + str(now.month) + '-' + str(now.day) + ' ' + \
            str(now.hour) + ':' + str(now.minute) + ':' + str(now.second) + ']'
        msg = MIMEText(email_content, 'plain', 'utf-8')
        msg['from'] = self.smtp_from_addr
        msg['to'] = self.smtp_to_addr
        msg['Subject'] = Header(header, 'utf-8').encode()

        # ??
        try:
            smtp_server = smtplib.SMTP(self.smtp_server_host, self.smtp_server_port)
            smtp_server.login(self.smtp_from_addr, self.smtp_server_password)
            smtp_server.sendmail(self.smtp_from_addr, [self.smtp_to_addr], msg.as_string())
            smtp_server.quit()
        except Exception as e:
            if log.isEnabledFor(logging.ERROR):
                log.error("??????")
                log.exception(e)
Processor.py 文件源码 项目:ZhihuSpider 作者: KEN-LJQ 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def check_and_restart(self):
        for process_thread in self.processor_list:
            if process_thread.thread_status == 'error':
                thread_id = process_thread.thread_id
                self.processor_list.remove(process_thread)
                del process_thread
                new_thread = ProcessThread(thread_id, self.redis_connection, self.token_filter,
                                           self.response_buffer, self.is_parser_following_list,
                                           self.is_parser_follower_list, self.is_parser_follow_relation)
                self.processor_list.append(new_thread)
                new_thread.start()

                if log.isEnabledFor(logging.ERROR):
                    log.error('???????[' + thread_id + ']????')

# URL ??
proxyCore.py 文件源码 项目:ZhihuSpider 作者: KEN-LJQ 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        try:
            while True:
                # ????????????
                while is_scanning:
                    time.sleep(3)

                if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() > 0:
                    unchecked_proxy = unchecked_proxy_list.get()
                    is_available = self.dataValidateModule.validate_proxy_ip(unchecked_proxy)
                    if is_available is True:
                        proxy_pool.put(unchecked_proxy)
                        # print(unchecked_proxy)
                    time.sleep(1)
                else:
                    time.sleep(5)
        except Exception as e:
            if log.isEnabledFor(logging.ERROR):
                log.exception(e)
            self.status = 'error'


# ????????????????????
proxyCore.py 文件源码 项目:ZhihuSpider 作者: KEN-LJQ 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        try:
            while True:
                if proxy_pool.qsize() < PROXY_POOL_SIZE and unchecked_proxy_list.qsize() < PROXY_POOL_SIZE:
                    self.fetch_and_parse_proxy()
                elif proxy_pool.qsize() == PROXY_POOL_SIZE:
                    if log.isEnabledFor(logging.DEBUG):
                        log.debug('?????')
                    self.scan_proxy_pool()
                    time.sleep(PROXY_POOL_SCAN_INTERVAL)
                else:
                    time.sleep(60)
        except Exception as e:
            if log.isEnabledFor(logging.ERROR):
                log.exception(e)
            self.status = 'error'

    # ?????????
config.py 文件源码 项目:myproject 作者: dengliangshi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def init_app(cls, app):
        Config.init_app(app)

        # email errors to the administrator.
        credentials, secure= None, None
        if getattr(cls, 'MAIL_USERNAME', None) is not None:
            credentials = (cls.MAIL_USERNAME, cls.MAIL_PASSWORD)
            if getattr(cls, 'MAIL_USE_TLS', None):
                secure = ()
        mail_handler = SMTPHandler(
            mailhost=(cls.MAIL_SERVER, cls.MAIL_PORT),
            fromaddr=cls.MAIL_SENDER,
            toaddrs=[cls.FLASK_ADMIN],
            subject=cls.MAIL_SUBJECT_PREFIX + ' Application Error',
            credentials=credentials,
            secure=secure)
        mail_handler.setLevel(logging.ERROR)
        app.logger.addHandler(mail_handler)
test_amset.py 文件源码 项目:amset 作者: hackingmaterials 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_poly_bands(self):
        print('\ntesting test_poly_bands...')
        mass = 0.25
        self.model_params['poly_bands'] = [[[[0.0, 0.0, 0.0], [0.0, mass]]]]
        amset = AMSET(calc_dir=self.GaAs_path,material_params=self.GaAs_params,
                      model_params=self.model_params,
                      performance_params=self.performance_params,
                      dopings=[-2e15], temperatures=[300], k_integration=True,
                      e_integration=True, fermi_type='k',
                      loglevel=logging.ERROR)
        amset.run(self.GaAs_cube, kgrid_tp='coarse', write_outputs=False)
        egrid = amset.egrid
        diff = abs(np.array(amset.mobility['n']['ACD'][-2e15][300]) - \
                   np.array(egrid['n']['mobility']['SPB_ACD'][-2e15][300]))
        avg = (amset.mobility['n']['ACD'][-2e15][300] + \
               egrid['n']['mobility']['SPB_ACD'][-2e15][300]) / 2
        self.assertTrue((diff / avg <= 0.01).all())
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, log_level=ERROR):
        """Initialize the deployment environment."""
        super(OpenStackAmuletUtils, self).__init__(log_level)
utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, log_level=logging.ERROR):
        self.log = self.get_logger(level=log_level)
        self.ubuntu_releases = self.get_ubuntu_releases()
Logging.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def set_verbosity(verbose):
    """
    A simple function one can use to set the verbosity of
    the app.
    """
    # Default
    logging.getLogger(SQLALCHEMY_LOGGER).setLevel(logging.ERROR)
    logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.ERROR)
    logging.getLogger(NEWSREAP_LOGGER).setLevel(logging.ERROR)
    logging.getLogger(NEWSREAP_CLI).setLevel(logging.ERROR)
    logging.getLogger(NEWSREAP_CODEC).setLevel(logging.ERROR)
    logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.ERROR)
    logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.ERROR)

    # Handle Verbosity
    if verbose > 0:
        logging.getLogger(NEWSREAP_CLI).setLevel(logging.INFO)
        logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.INFO)
        logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.INFO)

    if verbose > 1:
        logging.getLogger(NEWSREAP_CLI).setLevel(logging.DEBUG)
        logging.getLogger(NEWSREAP_HOOKS).setLevel(logging.DEBUG)
        logging.getLogger(NEWSREAP_ENGINE).setLevel(logging.DEBUG)

    if verbose > 2:
        logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.INFO)
        logging.getLogger(NEWSREAP_CODEC).setLevel(logging.INFO)

    if verbose > 3:
        logging.getLogger(NEWSREAP_CODEC).setLevel(logging.DEBUG)

    if verbose > 4:
        logging.getLogger(SQLALCHEMY_ENGINE).setLevel(logging.DEBUG)


# set initial level to WARN.
raven.py 文件源码 项目:djaio 作者: Sberned 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setup(app, version:str='undefined'):
    environment = getattr(app.settings, 'ROLE', None) or os.environ.get('ROLE')
    dsn = getattr(app.settings, 'SENTRY_DSN', None)
    tags = getattr(app.settings, 'SENTRY_TAGS', None)

    client = Client(dsn=dsn, transport=AioHttpTransport,
                    version=version, environment=environment,
                    tags=tags)

    handler = DjaioSentryHandler(client=client)
    handler.setLevel(logging.ERROR)
    setup_logging(handler)

    app.raven = client
logger.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def setup_logs(args):
    """
    Initialize the api loggers.

    Args:
        args: dict containing the configuration options.
    """

    flask_logging.create_logger = lambda app: use(app.logger_name)

    if not args.get("debug", True):
        set_level("werkzeug", logging.ERROR)

    level = [logging.WARNING, logging.INFO, logging.DEBUG][
        min(args.get("verbose", 1), 2)]

    internal_error_log = ExceptionHandler()
    internal_error_log.setLevel(logging.ERROR)

    log.root.setLevel(level)
    log.root.addHandler(internal_error_log)

    if api.config.get_settings()["email"]["enable_email"]:
        severe_error_log = SevereHandler()
        severe_error_log.setLevel(logging.CRITICAL)
        log.root.addHandler(severe_error_log)

    stats_log = StatsHandler()
    stats_log.setLevel(logging.INFO)

    log.root.addHandler(stats_log)
__init__.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ConvertLogLevel( oldstyle_level ):
    if  oldstyle_level == 0 :
        return CF.LogLevels.FATAL
    if  oldstyle_level == 1 :
        return CF.LogLevels.ERROR
    if  oldstyle_level == 2 :
        return CF.LogLevels.WARN
    if  oldstyle_level == 3 :
        return CF.LogLevels.INFO
    if  oldstyle_level == 4 :
        return CF.LogLevels.DEBUG
    if  oldstyle_level == 5 :
        return CF.LogLevels.ALL
    return CF.LogLevels.INFO
__init__.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ConvertLevelNameToDebugLevel( level_name ):
    if  level_name == "OFF" :   return 0
    if  level_name == "FATAL" : return 0
    if  level_name == "ERROR" : return 1
    if  level_name == "WARN" :  return 2
    if  level_name == "INFO" :  return 3
    if  level_name == "DEBUG" : return 4
    if  level_name == "TRACE":  return 5
    if  level_name ==  "ALL" :  return 5
    return 3
__init__.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ConvertLevelNameToCFLevel( level_name ):
    if  level_name == "OFF" :   return CF.LogLevels.OFF
    if  level_name == "FATAL" : return CF.LogLevels.FATAL
    if  level_name == "ERROR" : return CF.LogLevels.ERROR
    if  level_name == "WARN" :  return CF.LogLevels.WARN
    if  level_name == "INFO" :  return CF.LogLevels.INFO
    if  level_name == "DEBUG" : return CF.LogLevels.DEBUG
    if  level_name == "TRACE":  return CF.LogLevels.TRACE
    if  level_name ==  "ALL" :  return CF.LogLevels.ALL
    return CF.LogLevels.INFO
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, log_level=ERROR):
        """Initialize the deployment environment."""
        super(OpenStackAmuletUtils, self).__init__(log_level)
utils.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, log_level=logging.ERROR):
        self.log = self.get_logger(level=log_level)
        self.ubuntu_releases = self.get_ubuntu_releases()


问题


面经


文章

微信
公众号

扫码关注公众号