python类critical()的实例源码

PMUtoCSV_VerA01.py 文件源码 项目:PMUtoCSVwriter 作者: PaulBrogan 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def OpenTCPcomms(self):
        self.serversocket = self.sock
        Connected = False
        Attempts = 0
        while Connected == False and Attempts <15:
            #self.serversocket.connect((self.PMUip, self.PMUport))
            try:
                #print(self.PMUip, self.PMUport)
                self.serversocket.connect((self.PMUip, self.PMUport))
                Connected = True
                logging.critical('Connected to PMU')

            except:
                e = sys.exc_info()[0:2]
                logging.critical('TCP connection failed with ' + str(e) + ' Attempt ' + str(Attempts)) 
                time.sleep(0.25)
                Attempts += 1
                self.CloseTCPcomms()
PMUtoCSV_VerA01.py 文件源码 项目:PMUtoCSVwriter 作者: PaulBrogan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def Get_CF2_and_initialise(self):
        try:
            self.CloseTCPcomms()
        except:
            pass

        self.OpenTCPcomms()
        self.serversocket.send(self.SendCFG2())

        CF2 = b''
        X = self.serversocket.recv(1024)
        while True:
            CF2 += X
            try:
                X = self.serversocket.recv(1024)
                if X == b'':
                    break
            except:
                break

        self.C37dataEnter(CF2)
        logging.critical('Connected, Command Frame 2 received and processed')
main.py 文件源码 项目:attention-sum-reader 作者: cairoHy 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def ensemble_test(test_data, models):
    data = [[] for _ in d_bucket]
    for test_document, test_question, test_answer, test_candidate in zip(*test_data):
        if len(test_document) <= d_bucket[0][0]:
            data[0].append((test_document, test_question, test_answer, test_candidate))
            continue
        if len(test_document) >= d_bucket[-1][-1]:
            data[len(models) - 1].append((test_document, test_question, test_answer, test_candidate))
            continue
        for bucket_id, (d_min, d_max) in enumerate(d_bucket):
            if d_min < len(test_document) < d_max:
                data[bucket_id].append((test_document, test_question, test_answer, test_candidate))
                continue

    acc, num = 0, 0
    for i in range(len(models)):
        num += len(data[i])
        logging.info("Start testing.\nTesting in {} samples.".format(len(data[i])))
        acc_i, _ = models[i].test(zip(*data[i]), batch_size=1)
        acc += acc_i
    logging.critical("Ensemble test done.\nAccuracy is {}".format(acc / num))
tasks.py 文件源码 项目:freeipa-pr-ci 作者: freeipa 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _before(self):
        super(RunPytest, self)._before()

        # Prepare test config files
        try:
            create_file_from_template(
                constants.ANSIBLE_VARS_TEMPLATE.format(
                    action_name=self.action_name),
                os.path.join(self.data_dir, 'vars.yml'),
                dict(repofile_url=urllib.parse.urljoin(
                        self.build_url, 'rpms/freeipa-prci.repo'),
                     update_packages=self.update_packages))
        except (OSError, IOError) as exc:
            msg = "Failed to prepare test config files"
            logging.debug(exc, exc_info=True)
            logging.critical(msg)
            raise exc
builtin.py 文件源码 项目:pcbot 作者: pckv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setowner(message: discord.Message):
    """ Set the bot owner. Only works in private messages. """
    if not message.channel.is_private:
        return

    assert not plugins.owner_cfg.data, "An owner is already set."

    owner_code = str(random.randint(100, 999))
    logging.critical("Owner code for assignment: {}".format(owner_code))

    await client.say(message,
                     "A code has been printed in the console for you to repeat within 60 seconds.")
    user_code = await client.wait_for_message(timeout=60, channel=message.channel, content=owner_code)

    assert user_code, "You failed to send the desired code."

    if user_code:
        await client.say(message, "You have been assigned bot owner.")
        plugins.owner_cfg.data = message.author.id
        plugins.owner_cfg.save()
launcher.py 文件源码 项目:millilauncher 作者: fhfuih 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, mc_dir=default_minecraft_directory, java_dir=default_java_directory):
        self.minecraft_directory = mc_dir
        self.java_directory = java_dir
        if not mc_dir or not os.path.exists(mc_dir):
            logging.critical('Invalid /.minecraft/ directory.')
            raise FileNotFoundError('Invalid /.minecraft/ directory {0}'.format(mc_dir))
        if not java_dir or not os.path.exists(java_dir):
            logging.critical('Invalid javaw.exe directory.')
            raise FileNotFoundError('Invalid javaw.exe directory {0}'.format(java_dir))
        self.libraries_directory = os.path.join(self.minecraft_directory, 'libraries')
        self.assets_directory = os.path.join(self.minecraft_directory, 'assets')
        self.version_directory = None
        self.natives_directory = None
        self.libraries = None
        os.chdir(self.minecraft_directory)
        self.versions = MCVersionsList(mc_dir)
od_main.py 文件源码 项目:onedrived-dev 作者: xybu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_repo_table(ctx):
    """
    :param onedrived.od_context.UserContext ctx:
    :return dict[str, [onedrived.od_repo.OneDriveLocalRepository]]:
    """
    all_accounts = {}
    all_account_ids = ctx.all_accounts()
    if len(all_account_ids) == 0:
        logging.critical('onedrived is not linked with any OneDrive account. Please configure onedrived first.')
        sys.exit(1)
    for account_id in all_account_ids:
        authenticator, drives = get_authenticator_and_drives(ctx, account_id)
        local_repos = [od_repo.OneDriveLocalRepository(ctx, authenticator, d, ctx.get_drive(d.id))
                       for d in drives if d.id in ctx.config['drives']]
        if len(local_repos) > 0:
            all_accounts[account_id] = local_repos
        else:
            profile = ctx.get_account(account_id)
            logging.info('No Drive associated with account "%s" (%s).', profile.account_email, account_id)
    return all_accounts
main.py 文件源码 项目:VocaBot 作者: bomjacob 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def main():
    token = os.getenv('VOCABOT_TOKEN')
    if not token:
        logging.critical('NO TOKEN FOUND!')
        sys.exit()

    updater = Updater(token)

    # Now we know bot name, set the user-agent of vocadb api session
    voca_db.set_name(updater.bot.name)

    dp = updater.dispatcher

    # Add main handlers
    dp = add_update_handlers(dp)

    # Also add our "log everything" error handler
    dp.add_error_handler(error)

    # Start fetching updates, we might wanna use webhooks instead at some points.
    updater.start_polling()

    # Loop till we quit
    updater.idle()
plugin_settings.py 文件源码 项目:TtcnComplete 作者: HuiMi24 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_valid(self):
        """Check settings validity. If any of the settings is None the settings
        are not valid.

        Returns:
            bool: validity of settings
        """
        if self.sublime_settings is None:
            logging.critical(" no sublime_settings found")
            return False
        if self.debug_mode is None:
            logging.critical(" no debug_mode found")
            return False
        if self.triggers is None:
            logging.critical(" no triggers found")
            return False
        return True
spameater.py 文件源码 项目:erine.email 作者: mdavranche 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ee2f_getReplyAddress(fromAddress, toAddress):
  execQuery("SELECT `disposableMailAddress` FROM `replyAddress` WHERE `mailAddress` = %s", getAddress(toAddress))
  replyAddress = dbCursor.fetchone()
  if replyAddress:
    execQuery("SELECT `user`.`mailAddress` FROM `user` JOIN `disposableMailAddress` ON `user`.`ID` = `disposableMailAddress`.`userID` WHERE `disposableMailAddress`.`mailAddress` = %s", replyAddress[0])
    allowedEmail = dbCursor.fetchone()
    if not allowedEmail:
      logging.critical("Can not check if " + getAddress(fromAddress) + " is allowed to send an email as " + replyAddress[0] + ". Assuming yes.")
    else:
      if allowedEmail[0] != getAddress(fromAddress):
        raise BounceException('"{}" is not allowed to send an email as "{}"').format(
          getAddress(fromAddress), replyAddress[0]
        )
    label = getLabel(fromAddress)
    if label:
      return label + " <" + replyAddress[0] + ">"
    else:
      return replyAddress[0]
  else:
    raise BounceException('Invalid email address: "{}"'.format(toAddress))

# A foreign address is writing to an erine.email user (f2ee as Foreign To Erine.Email)
# Forge or retrieve reply email address
# Bounce email on invalid toAddress
install_emulator_deps.py 文件源码 项目:gn_build 作者: realcome 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def InstallKVM():
  """Installs KVM packages."""
  rc = cmd_helper.RunCmd(['sudo', 'apt-get', 'install', 'kvm'])
  if rc:
    logging.critical('ERROR: Did not install KVM. Make sure hardware '
                     'virtualization is enabled in BIOS (i.e. Intel VT-x or '
                     'AMD SVM).')
  # TODO(navabi): Use modprobe kvm-amd on AMD processors.
  rc = cmd_helper.RunCmd(['sudo', 'modprobe', 'kvm-intel'])
  if rc:
    logging.critical('ERROR: Did not add KVM module to Linux Kernel. Make sure '
                     'hardware virtualization is enabled in BIOS.')
  # Now check to ensure KVM acceleration can be used.
  if not RunKvmOk():
    logging.critical('ERROR: Can not use KVM acceleration. Make sure hardware '
                     'virtualization is enabled in BIOS (i.e. Intel VT-x or '
                     'AMD SVM).')
logging_utils.py 文件源码 项目:gn_build 作者: realcome 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def SuppressLogging(level=logging.ERROR):
  """Momentarilly suppress logging events from all loggers.

  TODO(jbudorick): This is not thread safe. Log events from other threads might
  also inadvertently dissapear.

  Example:

    with logging_utils.SuppressLogging():
      # all but CRITICAL logging messages are suppressed
      logging.info('just doing some thing') # not shown
      logging.critical('something really bad happened') # still shown

  Args:
    level: logging events with this or lower levels are suppressed.
  """
  logging.disable(level)
  yield
  logging.disable(logging.NOTSET)
data_helper.py 文件源码 项目:text-classification-cnn-rnn 作者: fudannlp16 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def pad_sentences(sentences, padding_word="<PAD/>", forced_sequence_length=None):
    """Pad setences during training or prediction"""
    if forced_sequence_length is None: # Train
        sequence_length = max(len(x) for x in sentences)
    else: # Prediction
        logging.critical('This is prediction, reading the trained sequence length')
        sequence_length = forced_sequence_length
    logging.critical('The maximum length is {}'.format(sequence_length))

    padded_sentences = []
    for i in range(len(sentences)):
        sentence = sentences[i]
        num_padding = sequence_length - len(sentence)

        if num_padding < 0: # Prediction: cut off the sentence if it is longer than the sequence length
            logging.info('This sentence has to be cut off because it is longer than trained sequence length')
            padded_sentence = sentence[0:sequence_length]
        else:
            padded_sentence = sentence + [padding_word] * num_padding
        padded_sentences.append(padded_sentence)
    return padded_sentences
viui.py 文件源码 项目:spyglass 作者: Crypta-Eve 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def changeTheme(self, newTheme=None):
        if newTheme is not None:
            for action in self.themeGroup.actions():
                if action.theme == newTheme:
                    action.setChecked(True)
        action = self.themeGroup.checkedAction()
        styles = Styles()
        styles.setStyle(action.theme)
        theme = styles.getStyle()
        self.setStyleSheet(theme)
        logging.critical("Setting new theme: {}".format(action.theme))
        self.cache.putIntoCache("theme", action.theme, 60 * 60 * 24 * 365)
        self.setupMap()
        self.clearIntelChat()
        if self.autoRescanIntelEnabled:
            self.rescanIntel()
check_completion_location.py 文件源码 项目:workforce-scripts 作者: Esri 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def get_worker_id(org_url, token, projectId, worker):
    """
    Get the logged in users dispatcher id
    :param org_url: The organizational url to use
    :param token: The token to authenticate with
    :param projectId: The projectId to use
    :param worker: The name of the worker to get the id of
    :return: The OBJECTID of the specified dispatcher
    """
    logger = logging.getLogger()
    logger.debug("Getting dispatcher id for: {}...".format(worker))
    worker_fl_url = workforcehelpers.get_workers_feature_layer_url(org_url, token, projectId)
    workers = workforcehelpers.query_feature_layer(worker_fl_url, token, where="userId='{}'".format(worker))
    if workers["features"]:
        return workers["features"][0]["attributes"]["OBJECTID"]
    else:
        logger.critical("{} is not a worker".format(worker))
        return None
check_completion_location.py 文件源码 项目:workforce-scripts 作者: Esri 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_worker_id(shh, projectId, worker):
    """
    Get the logged in users dispatcher id
    :param shh: The ArcREST security handler helper
    :param projectId: The projectId to use
    :param worker: The name of the worker to get the id of
    :return: The OBJECTID of the specified dispatcher
    """
    logger = logging.getLogger()
    logger.debug("Getting dispatcher id for: {}...".format(worker))
    worker_fl = workforcehelpers.get_workers_feature_layer(shh, projectId)
    workers = worker_fl.query(where="userId='{}'".format(worker))
    if workers.features:
        return workers.features[0].asDictionary["attributes"]["OBJECTID"]
    else:
        logger.critical("{} is not a worker".format(worker))
        return None
visualmetrics.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_decimate_filter():
    decimate = None
    try:
        filters = subprocess.check_output(
            ['ffmpeg', '-filters'], stderr=subprocess.STDOUT)
        lines = filters.split("\n")
        match = re.compile(
            r'(?P<filter>[\w]*decimate).*V->V.*Remove near-duplicate frames')
        for line in lines:
            m = re.search(match, line)
            if m is not None:
                decimate = m.groupdict().get('filter')
                break
    except BaseException:
        logging.critical('Error checking ffmpeg filters for decimate')
        decimate = None
    return decimate
devtools_browser.py 文件源码 项目:wptagent 作者: WPO-Foundation 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self, task):
        """Connect to the dev tools interface"""
        ret = False
        from internal.devtools import DevTools
        self.devtools = DevTools(self.options, self.job, task, self.use_devtools_video)
        if task['running_lighthouse']:
            ret = self.devtools.wait_for_available(self.CONNECT_TIME_LIMIT)
        else:
            if self.devtools.connect(self.CONNECT_TIME_LIMIT):
                logging.debug("Devtools connected")
                ret = True
            else:
                task['error'] = "Error connecting to dev tools interface"
                logging.critical(task['error'])
                self.devtools = None
        return ret
peeringdb_mysql.py 文件源码 项目:peerme 作者: cooperlees 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_pool(self):
        HOST = self.global_config['peeringdb_mysql']['host']
        USER = self.global_config['peeringdb_mysql']['user']
        PASS = self.global_config['peeringdb_mysql']['pass']
        PORT = int(self.global_config['peeringdb_mysql']['port'])
        DATABASE = self.global_config['peeringdb_mysql']['database']
        try:
            self.pool = await aiomysql.create_pool(
                host=HOST,
                port=PORT,
                user=USER,
                password=PASS,
                db=DATABASE,
                loop=self.loop,
            )
        except pymysql_err.OperationalError as pmye:
            logging.critical("DB Connect Error: {}".format(pmye))
            sys.exit(1)

        logging.debug("Obtained DB connection pool to {}".format(HOST))
learner.py 文件源码 项目:icing 作者: slipguru 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def distributions(self, records=None):
        logging.info("Analysing %s ...", self.database)
        try:
            if records is not None and isinstance(records, pd.DataFrame):
                max_mut = np.max(records['MUT'])
                self.n_samples = records.shape[0]
            else:
                # load from file
                max_mut, self.n_samples = io.get_max_mut(self.database)

            lin = np.linspace(0, max_mut, min(self.n_samples / 15., 12))
            sets = [(0, 0)] + zip(lin[:-1], lin[1:])
            if len(sets) == 1:
                # no correction needs to be applied
                return None
            out_muts = [self.intra_donor_distance(
                records, i, j) for i, j in zip(sets, sets)]
        except StandardError as msg:
            logging.critical(msg)
            out_muts = []

        my_dict = dict()
        for f, m in out_muts:
            my_dict.setdefault(m, []).append(f)
        return my_dict


问题


面经


文章

微信
公众号

扫码关注公众号