python类search()的实例源码

database.py 文件源码 项目:ln2sql 作者: FerreroJeremy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_table(self, table_string):
        lines = table_string.split("\n")
        table = Table()
        for line in lines:
            if 'TABLE' in line:
                table_name = re.search("`(\w+)`", line)
                table.name = table_name.group(1)
                if self.thesaurus_object is not None:
                    table.equivalences = self.thesaurus_object.get_synonyms_of_a_word(table.name)
            elif 'PRIMARY KEY' in line:
                primary_key_columns = re.findall("`(\w+)`", line)
                for primary_key_column in primary_key_columns:
                    table.add_primary_key(primary_key_column)
            else:
                column_name = re.search("`(\w+)`", line)
                if column_name is not None:
                    column_type = self.predict_type(line)
                    if self.thesaurus_object is not None:
                        equivalences = self.thesaurus_object.get_synonyms_of_a_word(column_name.group(1))
                    else:
                        equivalences = []
                    table.add_column(column_name.group(1), column_type, equivalences)
        return table
urlcategory.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        Analyzer.run(self)

        if self.data_type == 'domain' or self.data_type == 'url':
            try:
                pattern = re.compile("(?:Category: )([\w\s]+)")
                baseurl = 'http://www.fortiguard.com/webfilter?q='
                url = baseurl + self.getData()
                req = requests.get(url)
                category_match = re.search(pattern, req.content, flags=0)
                self.report({
                    'category': category_match.group(1)
                })
            except ValueError as e:
                self.unexpectedError(e)
        else:
            self.notSupported()
loopback.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def loopback_devices():
    '''
    Parse through 'losetup -a' output to determine currently mapped
    loopback devices. Output is expected to look like:

        /dev/loop0: [0807]:961814 (/tmp/my.img)

    :returns: dict: a dict mapping {loopback_dev: backing_file}
    '''
    loopbacks = {}
    cmd = ['losetup', '-a']
    devs = [d.strip().split(' ') for d in
            check_output(cmd).splitlines() if d != '']
    for dev, _, f in devs:
        loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
    return loopbacks
linux-soft-exploit-suggester.py 文件源码 项目:linux-soft-exploit-suggester 作者: belane 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def searchExploit(exploit_list, soft_name, soft_version):
    """ Search affected packages in exploit_list """
    result = []
    version_search = versionVartions(soft_version, args.level)
    for exploit in exploit_list:
        if exploit[5] in valid_platforms and (args.dos or exploit[6]!='dos' or args.type == 'dos'): # Platform and DoS
            if args.filter == None or args.filter.lower() in exploit[2].lower():                    # Filter
                if args.type == None or args.type == exploit[6]:                                    # Type
                    query = "(^(\w*\s){0,%s}|/\s?)%s(\s|\s.*\s|\/).* -" % (args.level, soft_name.replace('+', '\+'))
                    if re.search(query, exploit[2],re.IGNORECASE):
                        affected_versions = extractVersions(exploit[2])
                        for affected_version in affected_versions:
                            if args.level == 5 or LooseVersion(version_search) <= LooseVersion(affected_version):
                                if args.duplicates == False: exploit_list.remove(exploit)          # Duplicates
                                printOutput(exploit, soft_name, soft_version)
                                result.append([exploit, soft_name, soft_version])
                                break
    return result
linux-soft-exploit-suggester.py 文件源码 项目:linux-soft-exploit-suggester 作者: belane 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def extractVersions(title_string):
    """ Extract all version numbers from a string """
    search = re.search(r'\s-|\(|\&', title_string)
    if search:
        title_string = title_string[:search.span()[0]]
    result = []
    for possible_version in title_string.split():
        if possible_version[0].isdigit():
            if '/' in possible_version:
                for multiversion in possible_version.split('/'):
                    if '-' in multiversion:
                        multiversion = '.'.join(multiversion.split('-')[0].split('.')[:-1]) + '.' + multiversion.split('-')[-1]
                    if purgeVersionString(multiversion):
                        result.append(purgeVersionString(multiversion))
            elif '-' in possible_version:
                result.append(purgeVersionString('.'.join(possible_version.split('-')[0].split('.')[:-1]) + '.' + possible_version.split('-')[-1]))
            else:
                result.append(purgeVersionString(possible_version))
    return result
service_tests.py 文件源码 项目:heerkat 作者: Roche 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_oozie_workflow(self):
        # given
        input_data_dir = 'test/availability/input-data'
        apps_streaming_dir = 'test/availability/apps/streaming'

        cmd('hdfs dfs -rm -r %s' % input_data_dir)
        cmd('hdfs dfs -mkdir -p %s' % input_data_dir)
        cmd('hdfs dfs -put %s %s' % (sample_data, input_data_dir))

        cmd('hdfs dfs -rm -r %s' % apps_streaming_dir)
        cmd('hdfs dfs -mkdir -p %s' % apps_streaming_dir)
        cmd('hdfs dfs -put resources/oozie/workflow.xml %s' % apps_streaming_dir)


        # when
        result = cmd('oozie job -oozie %s -config resources/oozie/job.properties -run' % oozie_host)
        job_id = result.stdout.replace('job: ', '')
        cmd('oozie job -oozie %s -poll %s -interval 1' % (oozie_host, job_id))
        result = cmd('oozie job -oozie %s -info %s' % (oozie_host, job_id))

        # than
        status = re.search('Status\s+:\s+(.+)', result.stdout).group(1)
        self.assertEqual('SUCCEEDED', status, result.stderr)
scrape.py 文件源码 项目:xr-telemetry-m2m-web 作者: cisco 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def commit_history(cli):
    """
    Parse output of "show configuration history commit reverse detail"
    """
    result = []
    record = OrderedDict()
    for line in cli.splitlines():
        r = re.search(' ([A-Z][a-z]+(?: ID)?): (.*?) +([A-Z][a-z]+): (.*)', line)
        if not r:
            continue
        record[r.group(1)] = r.group(2)
        record[r.group(3)] = r.group(4)
        if r.group(3) == 'Comment':
            result.append(record)
            record = OrderedDict()
    return result
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _getmember(self, name, tarinfo=None, normalize=False):
        """Find an archive member by name from bottom to top.
           If tarinfo is given, it is used as the starting point.
        """
        # Ensure that all members have been loaded.
        members = self.getmembers()

        # Limit the member search list up to tarinfo.
        if tarinfo is not None:
            members = members[:members.index(tarinfo)]

        if normalize:
            name = os.path.normpath(name)

        for member in reversed(members):
            if normalize:
                member_name = os.path.normpath(member.name)
            else:
                member_name = member.name

            if name == member_name:
                return member
package_index.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def find_external_links(url, page):
    """Find rel="homepage" and rel="download" links in `page`, yielding URLs"""

    for match in REL.finditer(page):
        tag, rel = match.groups()
        rels = set(map(str.strip, rel.lower().split(',')))
        if 'homepage' in rels or 'download' in rels:
            for match in HREF.finditer(tag):
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))

    for tag in ("<th>Home Page", "<th>Download URL"):
        pos = page.find(tag)
        if pos != -1:
            match = HREF.search(page, pos)
            if match:
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _getmember(self, name, tarinfo=None, normalize=False):
        """Find an archive member by name from bottom to top.
           If tarinfo is given, it is used as the starting point.
        """
        # Ensure that all members have been loaded.
        members = self.getmembers()

        # Limit the member search list up to tarinfo.
        if tarinfo is not None:
            members = members[:members.index(tarinfo)]

        if normalize:
            name = os.path.normpath(name)

        for member in reversed(members):
            if normalize:
                member_name = os.path.normpath(member.name)
            else:
                member_name = member.name

            if name == member_name:
                return member
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _find_link_target(self, tarinfo):
        """Find the target member of a symlink or hardlink member in the
           archive.
        """
        if tarinfo.issym():
            # Always search the entire archive.
            linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
            limit = None
        else:
            # Search the archive before the link, because a hard link is
            # just a reference to an already archived file.
            linkname = tarinfo.linkname
            limit = tarinfo

        member = self._getmember(linkname, tarinfo=limit, normalize=True)
        if member is None:
            raise KeyError("linkname %r not found" % linkname)
        return member
base64.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def b16decode(s, casefold=False):
    """Decode the Base16 encoded bytes-like object or ASCII string s.

    Optional casefold is a flag specifying whether a lowercase alphabet is
    acceptable as input.  For security purposes, the default is False.

    The result is returned as a bytes object.  A binascii.Error is raised if
    s is incorrectly padded or if there are non-alphabet characters present
    in the input.
    """
    s = _bytes_from_decode_data(s)
    if casefold:
        s = s.upper()
    if re.search(b'[^0-9A-F]', s):
        raise binascii.Error('Non-base16 digit found')
    return binascii.unhexlify(s)

#
# Ascii85 encoding/decoding
#
setup.py 文件源码 项目:py_find_1st 作者: roebel 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def compiler_is_clang(comp) :
    print("check for clang compiler ...", end=' ')
    try:
        cc_output = subprocess.check_output(comp+['--version'],
                                            stderr = subprocess.STDOUT, shell=False)
    except OSError as ex:
        print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror))
        print("no")
        return False

    ret = re.search(b'clang', cc_output) is not None
    if ret :
        print("yes")
    else:
        print("no")
    return ret
text.py 文件源码 项目:AVSR-Deep-Speech 作者: pandeydivesh15 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate_label(label):
    # For now we can only handle [a-z ']
    if "(" in label or \
                    "<" in label or \
                    "[" in label or \
                    "]" in label or \
                    "&" in label or \
                    "*" in label or \
                    "{" in label or \
            re.search(r"[0-9]", label) != None:
        return None

    label = label.replace("-", "")
    label = label.replace("_", "")
    label = label.replace(".", "")
    label = label.replace(",", "")
    label = label.replace("?", "")
    label = label.strip()

    return label.lower()
text_RHL.py 文件源码 项目:AVSR-Deep-Speech 作者: pandeydivesh15 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def validate_label(label):
    # For now we can only handle [a-z ']
    if "(" in label or \
                    "<" in label or \
                    "[" in label or \
                    "]" in label or \
                    "&" in label or \
                    "*" in label or \
                    "{" in label or \
            re.search(r"[0-9]", label) != None:
        return None

    label = label.replace("-", "")
    label = label.replace("_", "")
    label = label.replace(".", "")
    label = label.replace(",", "")
    label = label.replace("?", "")
    label = label.strip()

    return label.lower()
markers.py 文件源码 项目:Blender-power-sequencer 作者: GDquest 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def string_find_id(string, regex):
    """Find a marker's ID using a regular expression
    returns the ID as int if found, otherwise returns None
    Args:
        -string, any string
        -regex, the regex pattern to match."""

    if not string and regex:
        raise AttributeError("Missing name or regex attribute")

    import re
    index = re.search(regex, string)
    if index:
        found_id = index.group(1)
        return int(found_id) if found_id else None
    return None
cb-powershell-decode.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def main(cb, args):
    powershells = cb.process_search_iter('process_name:powershell.exe')
    for s in powershells:
        if s['cmdline']:
            encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline'])
            if encoded != None:
                i = encoded.group(1)
                if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i):
                    trailingBytes = len(i) % 4
                    if trailingBytes == 3:
                        i = i + '='
                    elif trailingBytes == 2:
                        i = i + '=='
                decodedCommand = base64.standard_b64decode(i)
                try:
                    a = decodedCommand.encode('ascii', 'replace')
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % (
                    args['server_url'], s['id'], a.replace('\0', ""))
                except UnicodeError:
                    print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % (
                    args['server_url'], s['id'], s['cmdline'])
                    pass
move_policy_from_mbus.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_triggers(message):
    global action_triggers
    return_value = False
    target_policy = ""

    for policy in action_triggers:
#       Need to make a copy here cause globals :(
        rules = list(action_triggers[policy]['rules'])
        while rules:
            criteria = rules.pop(0)
            expression = rules.pop(0)
            if re.search(expression, message[criteria].lower()):
                return_value = True
            else:
                return_value = False
                rules = []

        if return_value:
            target_policy = action_triggers[policy]['targetpolicy']
            break

    return return_value,target_policy
move_policy_from_mbus.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def move_policy(sensor, targetPolicy):
    global eptoken
    global epserver

    bit9 = bit9api.bit9Api(
        "https://"+epserver,  # Replace with actual Bit9 server URL
        token=eptoken,
        ssl_verify=False  # Don't validate server's SSL certificate. Set to True unless using self-signed cert on IIS
    )

    # policy to send the naughty host to
    targetPolicyName = targetPolicy
    destPolicies = bit9.search('v1/policy', ['name:'+targetPolicyName])
    if len(destPolicies)==0:
        raise ValueError("Cannot find destination policy "+targetPolicyName)

    # find the computer id
    destComputer = bit9.search('v1/computer', ['cbSensorId:'+str(sensor)])
    if len(destComputer)==0:
      raise ValueError("Cannot find computer named "+hostname)

    for c in destComputer:
      print "Moving computer %s from policy %s to policy %s" % (c['name'], c['policyName'], targetPolicyName)
      c['policyId'] = destPolicies[0]['id']
      bit9.update('v1/computer', c)
dbc_parser.py 文件源码 项目:CANpy 作者: stefanhoelzl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _parse_attribute(self, attribute_str):
        """Parses a attribute string and updates the CANBus

        Args:
            attribute_str: String with attribute
        """
        pattern  = 'BA_\s+"(?P<attr_name>\S+)"\s*(?P<node>BU_)?(?P<msg>BO_)?(?P<sig>SG_)?\s*'
        pattern += '(?P<can_id>\d*)?\s*(?P<name>\S*)?\s+(?P<value>\S+)\s*;'
        reg = re.search(pattern, attribute_str)

        can_object = self._can_network
        if reg.group('node'):
            can_object = self._can_network.nodes[reg.group('name')]
        elif reg.group('msg'):
            can_object = self._can_network.get_message(int(reg.group('can_id')))
        elif reg.group('sig'):
            can_object = self._can_network.get_signal(int(reg.group('can_id')), reg.group('name'))

        cad = self._can_network.attributes.definitions[reg.group('attr_name')]
        can_object.attributes.add(CANAttribute(cad, value=self._parse_attribute_value(cad, reg.group('value'))))
libkas.py 文件源码 项目:kas 作者: siemens 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def ssh_setup_agent(config, envkeys=None):
    """
        Starts the ssh-agent
    """
    envkeys = envkeys or ['SSH_PRIVATE_KEY']
    output = os.popen('ssh-agent -s').readlines()
    for line in output:
        matches = re.search(r"(\S+)\=(\S+)\;", line)
        if matches:
            config.environ[matches.group(1)] = matches.group(2)

    for envkey in envkeys:
        key = os.environ.get(envkey)
        if key:
            ssh_add_key(config.environ, key)
        else:
            logging.warning('%s is missing', envkey)
tarfile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _getmember(self, name, tarinfo=None, normalize=False):
        """Find an archive member by name from bottom to top.
           If tarinfo is given, it is used as the starting point.
        """
        # Ensure that all members have been loaded.
        members = self.getmembers()

        # Limit the member search list up to tarinfo.
        if tarinfo is not None:
            members = members[:members.index(tarinfo)]

        if normalize:
            name = os.path.normpath(name)

        for member in reversed(members):
            if normalize:
                member_name = os.path.normpath(member.name)
            else:
                member_name = member.name

            if name == member_name:
                return member
package_index.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def find_external_links(url, page):
    """Find rel="homepage" and rel="download" links in `page`, yielding URLs"""

    for match in REL.finditer(page):
        tag, rel = match.groups()
        rels = set(map(str.strip, rel.lower().split(',')))
        if 'homepage' in rels or 'download' in rels:
            for match in HREF.finditer(tag):
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))

    for tag in ("<th>Home Page", "<th>Download URL"):
        pos = page.find(tag)
        if pos != -1:
            match = HREF.search(page, pos)
            if match:
                yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
text_objects.py 文件源码 项目:NeoVintageous 作者: NeoVintageous 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def next_unbalanced_tag(view, search=None, search_args={}, restart_at=None, tags=[]):
    assert search and restart_at, 'wrong call'
    region, tag, is_end_tag = search(view, **search_args)

    if not region:
        return None, None

    if not is_end_tag:
        tags.append(tag)
        search_args.update(restart_at(region))

        return next_unbalanced_tag(view, search, search_args, restart_at, tags)

    if not tags or (tag not in tags):
        return region, tag

    while tag != tags.pop():
        continue

    search_args.update(restart_at(region))

    return next_unbalanced_tag(view, search, search_args, restart_at, tags)
test_00_Log4py.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _try_config_test(self, logcfg, epattern, foundTest=None ):
        import ossie.utils.log4py.config

        with stdout_redirect(cStringIO.StringIO()) as new_stdout:
            ossie.utils.log4py.config.strConfig(logcfg,None)

        new_stdout.seek(0)
        found = []
        epats=[]
        if type(epattern) == str:
            epats.append(epattern)
        else:
            epats = epattern
        if foundTest == None:
            foundTest = len(epats)*[True]
        for x in new_stdout.readlines():
            for epat in epats:
                m=re.search( epat, x )
                if m :
                    found.append( True )

        self.assertEqual(found, foundTest )
__init__.py 文件源码 项目:cpe_utils 作者: ExodusIntelligence 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_human(self, attr):
        val = getattr(self, attr)
        val = val.replace("_", " ")

        product_mapping = {
            "ie": "Internet Explorer"
        }
        if attr == "product" and val in product_mapping:
            val = product_mapping[val]

        # if there's lowercase letters in the value, make it a title
        # (if there'FAILEDs not, leave it alone - e.g. SP3)
        if re.search('[a-z]', val) is not None:
            val = val.title()

        if val.upper() in ["SP0", "SP1", "SP2", "SP3", "SP4", "SP5", "SP6"]:
            val = val.upper()
        if val.lower() in ["x86", "x64"]:
            val = val.lower()

        return val
alert.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def handle(self, player, action, values, **kwargs):  # pragma: no cover
        await self.close(player)

        # Try to parse the button id instead of the whole action string.
        button = action
        try:
            match = re.search('button_([0-9]+)$', action)
            if len(match.groups()) == 1:
                button = match.group(1)
        except:
            pass

        if not self.response_future.done():
            self.response_future.set_result(button)

        if self.target:
            await self.target(player, action, values, **kwargs)
loopback.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def loopback_devices():
    '''
    Parse through 'losetup -a' output to determine currently mapped
    loopback devices. Output is expected to look like:

        /dev/loop0: [0807]:961814 (/tmp/my.img)

    :returns: dict: a dict mapping {loopback_dev: backing_file}
    '''
    loopbacks = {}
    cmd = ['losetup', '-a']
    devs = [d.strip().split(' ') for d in
            check_output(cmd).splitlines() if d != '']
    for dev, _, f in devs:
        loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
    return loopbacks
sysctl.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __call__(self):
        settings = utils.get_settings('os')
        with open('/proc/cpuinfo', 'r') as fd:
            cpuinfo = fd.readlines()

        for line in cpuinfo:
            match = re.search(r"^vendor_id\s+:\s+(.+)", line)
            if match:
                vendor = match.group(1)

        if vendor == "GenuineIntel":
            vendor = "intel"
        elif vendor == "AuthenticAMD":
            vendor = "amd"

        ctxt = {'arch': platform.processor(),
                'cpuVendor': vendor,
                'desktop_enable': settings['general']['desktop_enable']}

        return ctxt
loopback.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def loopback_devices():
    '''
    Parse through 'losetup -a' output to determine currently mapped
    loopback devices. Output is expected to look like:

        /dev/loop0: [0807]:961814 (/tmp/my.img)

    :returns: dict: a dict mapping {loopback_dev: backing_file}
    '''
    loopbacks = {}
    cmd = ['losetup', '-a']
    devs = [d.strip().split(' ') for d in
            check_output(cmd).splitlines() if d != '']
    for dev, _, f in devs:
        loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
    return loopbacks


问题


面经


文章

微信
公众号

扫码关注公众号