python类error()的实例源码

cache.py 文件源码 项目:PowerSpikeGG 作者: PowerSpikeGG 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _connect(self, address, lazy_connection=False):
        """Set up a connection to the MongoDB server.

        Parameters:
            address: MongoDB server address.
            lazy_connection: avoid testing if the connection is working while
                initializing it.
        """
        client = pymongo.MongoClient(address,
            serverSelectionTimeoutMS=FLAGS.mongodb_connection_timeout)
        if lazy_connection:
            return client

        # Send a query to the server to see if the connection is working.
        try:
            client.server_info()
        except pymongo.errors.ServerSelectionTimeoutError as e:
            logging.error("Unable to connect to %s.", address)
            client = None

        return client
test_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def load_config():
    '''
    Walk backwords from __file__ looking for config.yaml, load and return the
    'options' section'
    '''
    config = None
    f = __file__
    while config is None:
        d = os.path.dirname(f)
        if os.path.isfile(os.path.join(d, 'config.yaml')):
            config = os.path.join(d, 'config.yaml')
            break
        f = d

    if not config:
        logging.error('Could not find config.yaml in any parent directory '
                      'of %s. ' % file)
        raise Exception

    return yaml.safe_load(open(config).read())['options']
resource.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def createOrb():
    '''
    Calls the omniorbpy CORBA.ORB_init() method in a thread.  Calling the method in a thread
    allows us to set a timeout for the ORB_init() call, as it will never return if the system
    has run out of threads.

    Return None on failure.
    '''
    # create a queue with one slot to hold the orb
    queue = Queue(maxsize=1) 

    def orbCreator():
        """
        A method to pass to callOmniorbpyWithTimeout.

        """

        orb = CORBA.ORB_init()
        queue.put(orb)

    orb = callOmniorbpyWithTimeout(orbCreator, queue)
    if orb == None:
        logging.error("omniorbpy failed to return from ORB_init.  This is often a result of an insufficient amount of threads available on the system.")
        sys.exit(-1)
    return orb
OldMunkiPackages.py 文件源码 项目:OldMunkiPackages 作者: aysiu 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def trash_old_stuff(trashlist, trashpath, newpath):
    if isinstance(trashlist, list):
        for old_location in trashlist:
            # Get the subfolders needed to be created
            path_within_destination=os.path.relpath(old_location, trashpath)
            # Create what will be the destination path
            new_location=os.path.join(newpath, path_within_destination)
            # Make sure all the relevant subfolders exist in the destination
            if not os.path.exists(os.path.dirname(new_location)):
                os.makedirs(os.path.dirname(new_location))
            # Even though we've been double-checking paths all along, let's just make one last check
            if os.path.exists(old_location) and os.path.isdir(newpath):
                os.rename(old_location, new_location)
                logging.info("Moving %s to %s\n" % (old_location, new_location))
            else:
                logging.error("One of %s or %s does not exist\n" % (old_location, new_location))
    else:
        logging.error("%s is not a valid list\n" % trashlist)

# Function that checks paths are writable
cache.py 文件源码 项目:PowerSpikeGG 作者: PowerSpikeGG 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _silent_connection_failure(func):
    """Decorator used to avoid raising an exception when the database timeouts

    Parameters:
        func: Function to decorate.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        """Wraps the function to catch timeout exception.
        """
        if not FLAGS.disable_mongodb_exception:
            return func(*args, **kwargs)

        try:
            result = func(*args, **kwargs)
        except pymongo.errors.ServerSelectionTimeoutError as e:
            logging.error("Unable to reach the caching server: %s", e)
            return None
        return result

    return wrapper
manager.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _on_change(self, *args, **kwargs):
        # Making sure we set the settings + variables.
        if len(self._next_settings_update.keys()) > 0:
            logger.debug('Setting mode settings right now!')
            try:
                await self.update_settings(self._next_settings_update)
            except Exception as e:
                logging.error('Can\'t set the script mode settings! Error: {}'.format(str(e)))
            self._next_settings_update = dict()
        if len(self._next_variables_update.keys()) > 0:
            logger.debug('Setting mode variables right now!')
            try:
                await self.update_variables(self._next_variables_update)
            except Exception as e:
                logging.error('Can\'t set the script mode variables! Error: {}'.format(str(e)))
            self._next_variables_update = dict()

        # Make sure we send to the signal when mode is been changed.
        if self._current_script != self._next_script:
            await script_mode_changed.send_robust({
                'unloaded_script': self._current_script, 'loaded_script': self._next_script
            })

        await self.get_current_script(refresh=True)
scheduler.py 文件源码 项目:coursera-scheduled-exports 作者: LU-C4i 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def get_course_id(self):
        # Link below retrieves JSON file with course information based on course name
        base_url = "https://www.coursera.org/api/onDemandCourses.v1?q=slug&slug="
        # Paste
        url_tmp = base_url + self.course_slug
        # GET
        resp = requests.get(url_tmp)
        # If not ok
        if not resp.ok:
            # Log event
            if self.log:
                logging.error("Cannot fetch course id ({})".format(self.course_slug))
            raise ApiResolve("Server returned {}. Check whether course name is correct.".format(str(resp)))
        json_data = resp.json()
        # Get courseID
        course_id = json_data["elements"][0]["id"]
        # Return
        self.course_id = course_id
models.py 文件源码 项目:django_pipedrive 作者: MasAval 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def sync_one(cls, external_id, last_error=None):
        post_data = cls.pipedrive_api_client.get_instance(external_id)

        # Error code from the API
        if not post_data[u'success']:
            logging.error(post_data)
            raise UnableToSyncException(cls, external_id)
        try:
            return cls.update_or_create_entity_from_api_post(post_data[u'data'])
        except IntegrityError as e:
            logging.warning(e)
            if e.message == last_error:
                raise SameErrorTwiceSyncException(cls, external_id, e.message)
            match = re.search('.*Key \((.*)\)=\((.*)\).*', e.message)
            if match:
                field_name = match.group(1)
                field_id = match.group(2)
                model = cls.field_model_map(field_name)
                model.sync_one(field_id)
                return cls.sync_one(external_id, e.message)
            else:
                raise Exception("Could not handle error message")
runner.py 文件源码 项目:apiTest 作者: wuranxu 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def transfer_data(self, case):
        try:
            params = self.addToken(case.get('params'))
            enable = case.get('enable')
            code = int(case.get('code'))
            headers = case.get('headers')
            expected = self.get_response(case.get('expected'))
            if params != '' and isinstance(params, str):
                params = self.form_to_dict(params) if 'form' in headers else dumps(self.data_to_json(params))
            else:
                params = dumps(params)
            headers = self.data_to_json(headers)
            case.update(dict(headers=headers, params=params,
                             enable=enable, code=code,
                             expected=expected))
        except Exception as err:
            logging.error("{} error: {}".format(Operator.get_current_func(), str(err)))
        return case
utils.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def mqtt_connect(mqtt_client, core_info):
    connected = False

    # try connecting to all connectivity info objects in the list
    for connectivity_info in core_info.connectivityInfoList:
        core_host = connectivity_info.host
        core_port = connectivity_info.port
        logging.info("Connecting to Core at {0}:{1}".format(
            core_host, core_port))
        mqtt_client.configureEndpoint(core_host, core_port)
        try:
            mqtt_client.connect()
            connected = True
            break
        except socket.error as se:
            print("SE:{0}".format(se))
        except operationTimeoutException as te:
            print("operationTimeoutException:{0}".format(te.message))
            traceback.print_tb(te, limit=25)
        except Exception as e:
            print("Exception caught:{0}".format(e.message))

    return connected
lambda_setup.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _create_lambda(arn, func_name, func_desc, lambda_handler, lambda_main,
                   runtime):
    func = dict()
    lamb = boto3.client('lambda')
    with open(temp_deploy_zip) as deploy:
        func['ZipFile'] = deploy.read()
    try:
        resp = lamb.create_function(
            FunctionName=func_name, Runtime=runtime, Publish=True,
            Description=func_desc,
            Role=arn, Code=func, Handler='{0}.{1}'.format(
                lambda_main, lambda_handler
            ))
        logging.info("Create Lambda Function resp:{0}".format(
            json.dumps(resp, indent=4, sort_keys=True))
        )
        return resp
    except ClientError as ce:
        if ce.response['Error']['Code'] == 'ValidationException':
            logging.warning("Validation Error {0} creating function '{1}'.".format(
                ce, func_name))
        else:
            logging.error("Unexpected Error: {0}".format(ce))
lambda_setup.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _create_function_alias(func_alias, func_name, func_version):
    lamb = boto3.client('lambda')

    try:
        resp = lamb.create_alias(
            Name=func_alias,
            FunctionName=func_name,
            FunctionVersion=func_version
        )
        logging.info("Create Lambda Alias resp:{0}".format(
            json.dumps(resp, indent=4, sort_keys=True))
        )
        return resp
    except ClientError as ce:
        if ce.response['Error']['Code'] == 'ValidationException':
            logging.warning("Validation Error {0} creating alias '{1}'.".format(
                ce, func_alias))
        else:
            logging.error("Unexpected Error: {0}".format(ce))
lambda_setup.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _update_lambda_alias(func_alias, func_name, func_version):
    lamb = boto3.client('lambda')
    try:
        resp = lamb.update_alias(
            Name=func_alias,
            FunctionName=func_name,
            FunctionVersion=func_version
        )
        return resp['AliasArn']
    except ClientError as ce:
        if ce.response['Error']['Code'] == 'ValidationException':
            logging.warning(
                "Validation Error {0} updating alias '{1}'.".format(
                    ce, func_name))
        else:
            logging.error("Unexpected Error: {0}".format(ce))
utils.py 文件源码 项目:aws-greengrass-mini-fulfillment 作者: awslabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def mqtt_connect(mqtt_client, core_info):
    connected = False

    # try connecting to all connectivity info objects in the list
    for connectivity_info in core_info.connectivityInfoList:
        core_host = connectivity_info.host
        core_port = connectivity_info.port
        logging.info("Connecting to Core at {0}:{1}".format(
            core_host, core_port))
        mqtt_client.configureEndpoint(core_host, core_port)
        try:
            mqtt_client.connect()
            connected = True
            break
        except socket.error as se:
            print("SE:{0}".format(se))
        except operationTimeoutException as te:
            print("operationTimeoutException:{0}".format(te.message))
            traceback.print_tb(te, limit=25)
        except Exception as e:
            print("Exception caught:{0}".format(e.message))

    return connected
conv2mp4-server.py 文件源码 项目:conv2mp4-py 作者: Kameecoding 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def drive_upload(source):
    if MEDIA_DIR in source:
        target = os.path.dirname(source.replace(MEDIA_DIR, 'media'))
    else:
        target = os.path.dirname(source.replace(CONVERTED_DIR, 'media'))
    target_copy = target
    to_add = []
    while (not FILES.get(target_copy)):
        to_add.insert(0, os.path.basename(target_copy))
        target_copy = os.path.dirname(target_copy)

    if (to_add):
        for folder in to_add:
            create_folder(folder, FILES[target_copy], target_copy)
            target_copy = os.path.join(target_copy,folder)

    Logger.info("Uploading file: {name} to {target}, with Title: {title}".format(name=source, target=target, title=os.path.basename(source)))
    f = DRIVE.CreateFile({"parents": [{"id": FILES[target]}], "title" : os.path.basename(source)})
    f.SetContentFile(source)
    f.Upload()

    if not f['id']:
        logging.error("Failed to upload file {source}".format(source=source))
        raise Exception("Failed to upload file {source}".format(source=source))
dashboard_duty.py 文件源码 项目:dashboard-duty 作者: jrasell 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _get_url(self, payload, endpoint):
        """
        Performs a GET request to the PD API endpoint with the payload.
        If a 200 response is received the response data is returned.

        :param payload: The GET payload to send to the PD API
        :param endpoint: The PagerDuty endpoint, appended to api.pagerduty.com
        :return: The response data from the PD endpoint
        """
        url = 'https://api.pagerduty.com/%s' % endpoint
        try:
            r_data = self._s.get(url, params=payload)
            if r_data.status_code != 200:
                logging.error('PagerDuty API returned a status code of %s'
                              % r_data.status_code)
            return r_data.json()
        except Exception, e:
            logging.error(e)
run_part4.py 文件源码 项目:lichking 作者: melonrun 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def print_time(source_type, source_name, document_item):
    today = datetime.datetime.now().strftime("%Y-%m-%d") + " 12:00:00"
    yestday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') + " 00:00:00"
    connect('yuqing', host=MONGODB_URI['host'], port=MONGODB_URI['port'],
            username=MONGODB_URI['username'], password=MONGODB_URI['password'])
    crawl_pages = document_item.objects(Q(insert_time__gte=yestday) & Q(insert_time__lte=today))
    new_pages = document_item.objects(Q(time__gte=yestday) & Q(time__lte=today)).count()
    ymonitor = YuqingSpiderMonitor()
    ymonitor.key = source_name
    logging.error(source_name)
    ymonitor.crawl_pages = str(len(crawl_pages))
    ymonitor.new_pages = str(new_pages)
    ymonitor.source_type = source_type
    ymonitor.date_stat = datetime.datetime.now().strftime("%Y-%m-%d") + " 00:00:00"
    if len(crawl_pages) > 0:
        date1 = crawl_pages.order_by('insert_time')[0].insert_time
        date2 = crawl_pages.order_by('-insert_time')[0].insert_time
        ymonitor.duration = str(TimeUtil.get_date_diff_seconds(date1, date2))
    else:
        ymonitor.duration = str(0)
    ymonitor._id = Md5Util.generate_md5(source_name+today)
    ymonitor.save()
metadata.py 文件源码 项目:manubot 作者: greenelab 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def get_doi_citeproc(doi):
    """
    Use Content Negotioation (http://citation.crosscite.org/docs.html) to
    retrieve the citeproc JSON citation for a DOI.
    """
    url = 'https://doi.org/' + urllib.request.quote(doi)
    header = {
        'Accept': 'application/vnd.citationstyles.csl+json',
    }
    response = requests.get(url, headers=header)
    try:
        citeproc = response.json()
    except Exception as error:
        logging.error(f'Error fetching metadata for doi:{doi}.\n'
                      f'Invalid response from {response.url}:\n{response.text}')
        raise error
    citeproc['URL'] = f'https://doi.org/{doi}'
    short_doi_url = get_short_doi_url(doi)
    if short_doi_url:
        citeproc['short_url'] = short_doi_url
    return citeproc
metadata.py 文件源码 项目:manubot 作者: greenelab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_pubmed_citeproc(pubmed_id):
    """
    Get the citeproc JSON for a PubMed or PubMed Central identifier

    https://github.com/ncbi/citation-exporter
    https://www.ncbi.nlm.nih.gov/pmc/tools/ctxp/
    https://www.ncbi.nlm.nih.gov/pmc/utils/ctxp/samples
    """
    params = {
        'ids': pubmed_id,
        'report': 'citeproc'
    }
    url = 'https://www.ncbi.nlm.nih.gov/pmc/utils/ctxp'
    response = requests.get(url, params)
    try:
        citeproc = response.json()
    except Exception as error:
        logging.error(f'Error fetching metadata for pmid:{pubmed_id}.\n'
                      f'Invalid response from {response.url}:\n{response.text}')
        raise error
    citeproc['URL'] = f'https://www.ncbi.nlm.nih.gov/pubmed/{pubmed_id}'
    return citeproc
manubot.py 文件源码 项目:manubot 作者: greenelab 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def main():
    """
    Called as a console_scripts entry point in setup.py. This function defines
    the manubot command line script.
    """
    # Track if message gets logged with severity of error or greater
    # See https://stackoverflow.com/a/45446664/4651668
    error_handler = errorhandler.ErrorHandler()

    # Log to stderr
    logger = logging.getLogger()
    stream_handler = logging.StreamHandler(stream=sys.stderr)
    stream_handler.setFormatter(logging.Formatter('## {levelname}\n{message}', style='{'))
    logger.addHandler(stream_handler)

    args = parse_arguments()
    logger.setLevel(getattr(logging, args.log_level))

    prepare_manuscript(args)

    if error_handler.fired:
        logging.critical('Failure: exiting with code 1 due to logged errors')
        raise SystemExit(1)
helpers.py 文件源码 项目:sat6_scripts 作者: RedHatSatellite 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_org_id(org_name):
    """
    Return the Organisation ID for a given Org Name
    """
    # Check if our organization exists, and extract its ID
    org = get_json(SAT_API + "organizations/" + org_name)
    # If the requested organization is not found, exit
    if org.get('error', None):
        msg = "Organization '%s' does not exist." % org_name
        log_msg(msg, 'ERROR')
        sys.exit(1)
    else:
        # Our organization exists, so let's grab the ID and write some debug
        org_id = org['id']
        msg = "Organisation '" + org_name + "' found with ID " + str(org['id'])
        log_msg(msg, 'DEBUG')

    return org_id
helpers.py 文件源码 项目:sat6_scripts 作者: RedHatSatellite 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def wait_for_task(task_id, label):
    """
    Wait for the given task ID to complete
    This displays a message without CR/LF waiting for an OK/FAIL status to be shown
    """
    msg = "  Waiting for " + label + " to complete..."
    colx = "{:<70}".format(msg)
    print colx[:70],
    log_msg(msg, 'INFO')
    # Force the status message to be shown to the user
    sys.stdout.flush()
    while True:
        info = get_json(FOREMAN_API + "tasks/" + str(task_id))
        if info['state'] == 'paused' and info['result'] == 'error':
            msg = "Error with " + label + " " + str(task_id)
            log_msg(msg, 'ERROR')
            break
        if info['pending'] != 1:
            break
        sleep(30)
helpers.py 文件源码 项目:sat6_scripts 作者: RedHatSatellite 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def log_msg(msg, level):
    """Write message to logfile"""

    # If we are NOT in debug mode, only write non-debug messages to the log
    if level == 'DEBUG':
        if DEBUG:
            logging.debug(msg)
            print BOLD + "DEBUG: " + msg + ENDC
    elif level == 'ERROR':
        logging.error(msg)
        tf.write('ERROR:' + msg + '\n')
        print ERROR + "ERROR: " + msg + ENDC
    elif level == 'WARNING':
        logging.warning(msg)
        tf.write('WARNING:' + msg + '\n')
        print WARNING + "WARNING: " + msg + ENDC
    # Otherwise if we ARE in debug, write everything to the log AND stdout
    else:
        logging.info(msg)
        tf.write(msg + '\n')
__init__.py 文件源码 项目:facebook-scraper 作者: bacilo 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def extend_token(self):
        """
        Extends access token and replaces the previously used one
        Prints error message if API Key or API Secret not found

        TODO: Replace also config file once that file is defined
        TODO: Additional checks on the response
        """
        if not self.api_key or not self.api_secret:
            logging.error('No API Key and/or API Secret defined')
            return None

        resp = self.request(
            req='oauth/access_token?grant_type=fb_exchange_token&client_id={}'
            '&client_secret={}&fb_exchange_token={}'.format(
                self.api_key, self.api_secret, self.access_token))
        msg = json.loads(resp.read().decode('utf-8'))
        self.access_token = msg['access_token']
        logging.info('Extended Access Token: \n%s', self.access_token)
        return self.access_token
emailer.py 文件源码 项目:bitcoin-arbitrage 作者: ucfyao 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def send_email(subject, msg):
    import smtplib

    message = "From: %s\r\nTo: %s\r\nSubject: %s\r\n\r\n%s\r\n" % (config.EMAIL_HOST_USER, ", ".join(config.EMAIL_RECEIVER), subject, msg)
    try:
        smtpserver = smtplib.SMTP(config.EMAIL_HOST)
        smtpserver.set_debuglevel(0)
        smtpserver.ehlo()
        smtpserver.starttls()
        smtpserver.ehlo
        smtpserver.login(config.EMAIL_HOST_USER, config.EMAIL_HOST_PASSWORD)
        smtpserver.sendmail(config.EMAIL_HOST_USER, config.EMAIL_RECEIVER, message)
        smtpserver.quit()  
        smtpserver.close() 
        logging.info("send mail success")      
    except:
        logging.error("send mail failed")
        traceback.print_exc()
build.py 文件源码 项目:foxbms-setup 作者: foxBMS 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def start_process(cmd, supress_output=False):
    """Starts the build process by passing the command string to the
    command line

    Args:
        cmd (string): command for the build process.
        supress_output (bool): Indicates if logging is active for the build .
    """
    logging.debug(cmd)
    proc = subprocess.Popen(cmd, stdout=None, stderr=subprocess.PIPE)
    out, err = proc.communicate()
    rtn_code = proc.returncode

    if supress_output is False:
        if out:
            logging.info(out)
        if err:
            logging.error(err)

    if rtn_code == 0 or rtn_code is None:
        logging.info('Success: Process return code %s', str(rtn_code))
    else:
        logging.error('Error: Process return code %s', str(rtn_code))
        sys.exit(1)
build.py 文件源码 项目:foxbms-setup 作者: foxBMS 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def build(mcu_switch=None, doxygen=False, supress_output=False):
    """Creates the build command string for the specified build and passes the
    build command string to `start_process` which actually starts the build
    process.

    Args:
        mcu_switch (string): specifies what will be built.
        doxygen (bool): specifies if the doxygen documentation to a mcu should
            be built.
        supress_output (bool): indicates if the output should appear on the
            command line.
    """
    cmd = TOOLCHAIN_BASIC_CONFIGURE + ' '
    if mcu_switch is None:
        cmd += 'sphinx'
    elif mcu_switch == '-p' or mcu_switch == '-s' or mcu_switch == '-b':
        cmd += 'build' + ' ' + mcu_switch
        if doxygen is True:
            cmd += ' ' + 'doxygen'
    else:
        logging.error('Invalid build argument: \'%s\'', mcu_switch)
        sys.exit(1)
    start_process(cmd, supress_output)
bootstrap.py 文件源码 项目:foxbms-setup 作者: foxBMS 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def get_main_git_path():
    """Gets the remote URL of the setup repository.

    Returns:
        string: remote URL of the setup-repository.
    """
    try:
        repository_basepath = subprocess.check_output(
            'git config --get remote.origin.url'.split(' '))
    except subprocess.CalledProcessError as err:
        setup_dir_path = os.path.dirname(os.path.realpath(__file__))
        err_msg = '''
\'{}\' is not a git repository.
Did you download a .zip file from GitHub?

Use
    \'git clone https://github.com/foxBMS/foxBMS-setup\'
to download the foxBMS-setup repository.
        '''.format(setup_dir_path)
        logging.error(err_msg)
        sys.exit(1)
    repository_basepath, repository_name = repository_basepath.rsplit('/', 1)
    return repository_basepath, repository_name
cardDB.py 文件源码 项目:hearthscan-bot 作者: d-schmidt 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __load(self):

        # load cards
        with open(self.cardJSON, 'r', encoding='utf8') as file:
            cards = json.load(file)
        with open(self.tokenJSON, 'r', encoding='utf8') as file:
            tokens = json.load(file)

        # json to db full of text
        for name, card in itertools.chain(cards.items(), tokens.items()):
            clean = CardDB.cleanName(name)
            if clean in self.__db:
                log.error("load() duplicate name, already in the db: %s",
                        clean)
                raise Exception('duplicate card')

            self.__db[clean] = formatter.createCardText(card, self.constants)

        self.tokens = [CardDB.cleanName(name) for name in tokens.keys()]

        # finally load temp file
        self.refreshTemp()
proxy.py 文件源码 项目:Proxy46 作者: Macronut 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def handle(self):
        client_ip = self.client_address[0]

        addr = ''
        server = ''
        try:
            sock = self.connection
            sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)

            odestdata = sock.getsockopt(socket.SOL_IP, 80, 16)
            port, addr_ip = struct.unpack("!xxH4sxxxxxxxx", odestdata)
            addr = socket.inet_ntoa(addr_ip)

            server = reverse(addr)

            print_log('%s connecting %s:%d %d %s' % (client_ip, addr, port, server[0], str(server[1])))

            Proxy[server[0]].proxy(sock, server[1], (addr, port))
        except socket.error, e:
            logging.warn(addr + ':' + str(server) + ':' + str(e))
            sock.close()


问题


面经


文章

微信
公众号

扫码关注公众号