python类match()的实例源码

ubuntu.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _add_cloud_pocket(pocket):
    """Add a cloud pocket as /etc/apt/sources.d/cloud-archive.list

    Note that this overwrites the existing file if there is one.

    This function also converts the simple pocket in to the actual pocket using
    the CLOUD_ARCHIVE_POCKETS mapping.

    :param pocket: string representing the pocket to add a deb spec for.
    :raises: SourceConfigError if the cloud pocket doesn't exist or the
        requested release doesn't match the current distro version.
    """
    apt_install(filter_installed_packages(['ubuntu-cloud-keyring']),
                fatal=True)
    if pocket not in CLOUD_ARCHIVE_POCKETS:
        raise SourceConfigError(
            'Unsupported cloud: source option %s' %
            pocket)
    actual_pocket = CLOUD_ARCHIVE_POCKETS[pocket]
    with open('/etc/apt/sources.list.d/cloud-archive.list', 'w') as apt:
        apt.write(CLOUD_ARCHIVE.format(actual_pocket))
ubuntu.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _add_cloud_staging(cloud_archive_release, openstack_release):
    """Add the cloud staging repository which is in
    ppa:ubuntu-cloud-archive/<openstack_release>-staging

    This function checks that the cloud_archive_release matches the current
    codename for the distro that charm is being installed on.

    :param cloud_archive_release: string, codename for the release.
    :param openstack_release: String, codename for the openstack release.
    :raises: SourceConfigError if the cloud_archive_release doesn't match the
        current version of the os.
    """
    _verify_is_ubuntu_rel(cloud_archive_release, openstack_release)
    ppa = 'ppa:ubuntu-cloud-archive/{}-staging'.format(openstack_release)
    cmd = 'add-apt-repository -y {}'.format(ppa)
    _run_with_retries(cmd.split(' '))
ubuntu.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _add_cloud_staging(cloud_archive_release, openstack_release):
    """Add the cloud staging repository which is in
    ppa:ubuntu-cloud-archive/<openstack_release>-staging

    This function checks that the cloud_archive_release matches the current
    codename for the distro that charm is being installed on.

    :param cloud_archive_release: string, codename for the release.
    :param openstack_release: String, codename for the openstack release.
    :raises: SourceConfigError if the cloud_archive_release doesn't match the
        current version of the os.
    """
    _verify_is_ubuntu_rel(cloud_archive_release, openstack_release)
    ppa = 'ppa:ubuntu-cloud-archive/{}-staging'.format(openstack_release)
    cmd = 'add-apt-repository -y {}'.format(ppa)
    _run_with_retries(cmd.split(' '))
dontwi.py 文件源码 项目:dontwi 作者: vocalodon 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def summaries_to_be_listed_in_waiting_list(
            result_log, status_pr, statuses, trigger_str):
        summaries = []
        for a_status in statuses:
            # results's 2nd.  condition which match to "Start" is for
            # fail-safe.
            if not result_log.has_result_of_status(
                    status=a_status, results=["Succeed", "Start", "Failed"]):
                st_str = status_pr.make_tweet_string_from_toot(
                    a_status, hashtag=trigger_str)
                rs_str = "Waiting"
                rs_sm = result_log.make_result_and_others_summary(
                    status_string=st_str, hashtag=trigger_str, result=rs_str)
                in_sm = result_log.make_status_summary("inbound", a_status)
                rs_sm.update(in_sm)
                summaries.append(rs_sm)
        return summaries
model.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def cares_about(self, delta):
        """Return True if this observer "cares about" (i.e. wants to be
        called) for a this delta.

        """
        if (self.entity_id and delta.get_id() and
                not re.match(self.entity_id, str(delta.get_id()))):
            return False

        if self.entity_type and self.entity_type != delta.entity:
            return False

        if self.action and self.action != delta.type:
            return False

        if self.predicate and not self.predicate(delta):
            return False

        return True
libbuild.py 文件源码 项目:libbuild 作者: appscode 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _ungroup_go_imports(fname):
    with open(fname, 'r+') as f:
        content = f.readlines()
        out = []
        import_block = False
        for line in content:
            c = line.strip()
            if import_block:
                if c == '':
                    continue
                elif re.match(END_IMPORT_REGEX, c) is not None:
                    import_block = False
            elif re.match(BEGIN_IMPORT_REGEX, c) is not None:
                    import_block = True
            out.append(line)
        f.seek(0)
        f.writelines(out)
        f.truncate()
setup.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def module_requirements(requirements_path, module_names, strict_bounds,
                        conda_format=False):
    module_names = set(module_names)
    found = set()
    module_lines = []
    for line in read_requirements(requirements_path,
                                  strict_bounds=strict_bounds):
        match = REQ_PATTERN.match(line)
        if match is None:
            raise AssertionError("Could not parse requirement: '%s'" % line)

        name = match.group(1)
        if name in module_names:
            found.add(name)
            if conda_format:
                line = _conda_format(line)
            module_lines.append(line)

    if found != module_names:
        raise AssertionError(
            "No requirements found for %s." % (module_names - found)
        )
    return module_lines
zhihu.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse(self, response):
        """
                ???html??????url ?????url??????
                ?????url???? /question/xxx ?????????????
                """
        all_urls = response.css("a::attr(href)").extract()
        all_urls = [parse.urljoin(response.url, url) for url in all_urls]
        # ??lambda???????url????????true???????false???
        all_urls = filter(lambda x: True if x.startswith("https") else False, all_urls)
        for url in all_urls:
            match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
            if match_obj:
                # ?????question???????????????????
                request_url = match_obj.group(1)
                yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
                #??
                # break
            else:
                # pass
                # ????question??????????
                yield scrapy.Request(url, headers=self.headers, callback=self.parse)
zhihu.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def login(self, response):
        response_text = response.text
        match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
        xsrf = ''
        if match_obj:
            xsrf = (match_obj.group(1))

        if xsrf:
            post_url = "https://www.zhihu.com/login/phone_num"
            post_data = {
                "_xsrf": xsrf,
                "phone_num": "18487255487",
                "password": "ty158917",
                "captcha": ""
            }

            import time
            t = str(int(time.time() * 1000))
            captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
            yield scrapy.Request(captcha_url, headers=self.headers, meta={"post_data":post_data}, callback=self.login_after_captcha)
zhihu_login_requests.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_xsrf():
    #??xsrf code
    response = session.get("https://www.zhihu.com", headers=header)
    response_text = response.text
    match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
    xsrf = ''
    if match_obj:
        xsrf = (match_obj.group(1))
        return xsrf
zhihu_login_requests.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 95 收藏 0 点赞 0 评论 0
def zhihu_login(account, password):
    #????
    if re.match("^1\d{10}",account):
        print ("??????")
        post_url = "https://www.zhihu.com/login/phone_num"
        post_data = {
            "_xsrf": get_xsrf(),
            "phone_num": account,
            "password": password,
            "captcha":get_captcha()
        }
    else:
        if "@" in account:
            #??????????
            print("??????")
            post_url = "https://www.zhihu.com/login/email"
            post_data = {
                "_xsrf": get_xsrf(),
                "email": account,
                "password": password
            }

    response_text = session.post(post_url, data=post_data, headers=header)
    session.cookies.save()

# get_index()
# is_login()
# get_captcha()
items.py 文件源码 项目:ArticleSpider 作者: mtianyan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_nums(value):
    match_re = re.match(".*?(\d+).*", value)
    if match_re:
        nums = int(match_re.group(1))
    else:
        nums = 0

    return nums
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
        """When a node is found, scheduler calls this method with IP address,
        name, CPUs, memory and disk available on that node. This method should
        return a number indicating number of CPUs to use. If return value is 0,
        the node is not used; if the return value is < 0, this allocation is
        ignored (next allocation in the 'node_allocations' list, if any, is
        applied).
        """
        if not re.match(self.ip_rex, ip_addr):
            return -1
        if (self.platform and not re.search(self.platform, platform)):
            return -1
        if ((self.memory and memory and self.memory > memory) or
            (self.disk and disk and self.disk > disk)):
            return 0
        if self.cpus > 0:
            if self.cpus > cpus:
                return 0
            return self.cpus
        elif self.cpus == 0:
            return 0
        else:
            cpus += self.cpus
            if cpus < 0:
                return 0
            return cpus
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
        else:
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def allocate(self, ip_addr, name, platform, cpus, memory, disk):
        """When a node is found, scheduler calls this method with IP address,
        name, CPUs, memory and disk available on that node. This method should
        return a number indicating number of CPUs to use. If return value is 0,
        the node is not used; if the return value is < 0, this allocation is
        ignored (next allocation in the 'node_allocations' list, if any, is
        applied).
        """
        if not re.match(self.ip_rex, ip_addr):
            return -1
        if (self.platform and not re.search(self.platform, platform)):
            return -1
        if ((self.memory and memory and self.memory > memory) or
            (self.disk and disk and self.disk > disk)):
            return 0
        if self.cpus > 0:
            if self.cpus > cpus:
                return 0
            return self.cpus
        elif self.cpus == 0:
            return 0
        else:
            cpus += self.cpus
            if cpus < 0:
                return 0
            return cpus
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
        else:
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
steps.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, fqArchiveUrl, filtersDir, outputPrefix, outputUrl, diskSize, diskType, logsPath, container, scriptUrl, tag, cores, mem, preemptible):
        super(PipelineStep, self).__init__()

        fqFileName = os.path.basename(fqArchiveUrl)
        fqInputs = "{fqArchive}:{fqFileName}".format(fqArchive=fqArchiveUrl, fqFileName=fqFileName)

        try:
            filtersDirContents = subprocess.check_output(["gsutil", "ls", filtersDir])

        except subprocess.CalledProcessError as e:
            print "ERROR: couldn't get a listing of filter files!  -- {reason}".format(reason=e)
            exit(-1)

        bfInputs = [x for x in filtersDirContents.split('\n') if re.match('^.*\.bf$', x) or re.match('^.*\.txt', x)]
        bfInputs.append(fqInputs)

        inputs = ",".join(["{url}:{filename}".format(url=x, filename=os.path.basename(x)) for x in bfInputs])
        outputs = "{outputPrefix}*:{outDir}".format(outputPrefix=outputPrefix, outDir=outputUrl)
        env = "INPUT_FILE={fqFileName},OUTPUT_PREFIX={outputPrefix},FILTERS_LIST={filtersList}".format(fqFileName=fqFileName, outputPrefix=outputPrefix, filtersList=','.join([os.path.basename(x) for x in bfInputs if re.match('^.*\.bf$', x)]))

        self._step = PipelineSchema("biobloomcategorizer",
                                                   self._pipelinesConfig,
                                                   logsPath,
                                                   container,
                                                   scriptUrl=scriptUrl,
                                                   cores=cores,
                                                   mem=mem,
                                                   diskSize=diskSize,
                                                   diskType=diskType,
                                                   inputs=inputs,
                                                   outputs=outputs,
                                                   env=env,
                                                   tag=tag,
                                                   preemptible=preemptible)
__init__.py 文件源码 项目:docker-utils 作者: a-ba 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def docker_version():
    out = subprocess.check_output(["docker", "-v"])
    mo = re.match(br"Docker version (\d+)\.(\d+)\.(\d+)", out)
    if mo:
        return tuple(map(int, mo.groups()))
    die("unable to parse a version number from the output of 'docker -v'")
basic.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _processLength(self, lengthMatch):
        """
        Processes the length definition of a netstring.

        Extracts and stores in C{self._expectedPayloadSize} the number
        representing the netstring size.  Removes the prefix
        representing the length specification from
        C{self._remainingData}.

        @raise NetstringParseError: if the received netstring does not
            start with a number or the number is bigger than
            C{self.MAX_LENGTH}.
        @param lengthMatch: A regular expression match object matching
            a netstring length specification
        @type lengthMatch: C{re.Match}
        """
        endOfNumber = lengthMatch.end(1)
        startOfData = lengthMatch.end(2)
        lengthString = self._remainingData[:endOfNumber]
        # Expect payload plus trailing comma:
        self._expectedPayloadSize = self._extractLength(lengthString) + 1
        self._remainingData = self._remainingData[startOfData:]
paths.py 文件源码 项目:xphyle 作者: jdidion 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _match_to_dict(
            self, match: Match, errors: bool = True) -> Dict[str, Any]:
        """Convert a regular expression Match to a dict of (name, value) for
        all PathVars.

        Args:
            match: A :class:`re.Match`.
            errors: If True, raise an exception for validation failure,
                otherwise return None.

        Returns:
            A (name, value) dict.

        Raises:
            ValueError if any values fail validation.
        """
        return match_to_dict(match, self.path_vars, errors)
paths.py 文件源码 项目:xphyle 作者: jdidion 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def find(
            self, root: PathLike = None,
            recursive: bool = False) -> Sequence[PathInst]:
        """Find all paths in `root` matching this spec.

        Args:
            root: Directory in which to begin the search.
            recursive: Whether to search recursively.

        Returns:
            A sequence of PathInst.
        """
        if root is None:
            root = self.default_search_root()
        find_results = find(
            root, self.pattern, path_types=[self.path_type],
            recursive=recursive, return_matches=True)
        matches = dict(
            (path, self._match_to_dict(match, errors=False))
            for path, match in cast(
                Sequence[Tuple[str, Match[str]]], find_results))
        return [
            path_inst(path, match)
            for path, match in matches.items()
            if match is not None]
cookielib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]
cookielib.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]
cookielib.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]
cookielib.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]
cookiejar.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]
cookiejar.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]
visualization.py 文件源码 项目:qiskit-sdk-py 作者: QISKit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _truncate_float(matchobj, format_str='0.2g'):
    """Truncate long floats

    Args:
        matchobj (re.Match object): contains original float
        format_str (str): format specifier
    Returns:
        returns truncated float
    """
    if matchobj.group(0):
        return format(float(matchobj.group(0)), format_str)
cookielib.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]
cookielib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def unmatched(match):
    """Return unmatched part of re.Match object."""
    start, end = match.span(0)
    return match.string[:start]+match.string[end:]


问题


面经


文章

微信
公众号

扫码关注公众号