python类fnmatch()的实例源码

__init__.py 文件源码 项目:inline-plz 作者: guykisel 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run_per_file(config, ignore_paths=None, path=None, config_dir=None):
    ignore_paths = ignore_paths or []
    path = path or os.getcwd()
    cmd = run_config(config, config_dir)
    print(cmd)
    run_cmds = []
    patterns = PATTERNS.get(config.get('language'))
    paths = all_filenames_in_dir(path=path, ignore_paths=ignore_paths)
    for pattern in patterns:
        for filepath in fnmatch.filter(paths, pattern):
            run_cmds.append(cmd + [filepath])
    pool = Pool()

    def result(run_cmd):
        _, out = run_command(run_cmd)
        return run_cmd[-1], out

    output = pool.map(result, run_cmds)
    return output
base.py 文件源码 项目:jawaf 作者: danpozmanter 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _render(self, target_name, read_path, write_path):
        """Render a given template or directory for the target.
        :param target_name: String. Project or App name to render.
        :param read_path: String. Path to template or directory to render.
        :param write_path: String. Path to write to (or create directory).
        """
        if os.path.isdir(read_path):
            if os.path.split(read_path)[1] == 'project_name':
                write_path = os.path.join(os.path.split(write_path)[0], self.variables['project_name'])
            os.mkdir(write_path)
            for filename in os.listdir(read_path):
                if fnmatch(filename, 'test_*'):
                    write_filename = filename.replace('test_', f'test_{target_name}_')
                else:
                    write_filename = filename
                self._render(target_name, os.path.join(read_path, filename), os.path.join(write_path, write_filename))
        else:
            tpl = Template(filename=read_path)
            with open(os.path.splitext(write_path)[0], 'w') as f:
                f.write(tpl.render(**self.variables))
__init__.py 文件源码 项目:badwolf 作者: bosondata 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def match_file(self, filename):
        """Used to check if files can be handled by this linter,
        Often this will just file extension checks."""
        pattern = self.options.get('pattern') or self.default_pattern
        if not pattern:
            return True

        globs = pattern.split()
        for glob in globs:
            if fnmatch.fnmatch(filename, glob):
                # ??? glob ??
                return True
        try:
            if re.match(pattern, filename, re.I):
                # ???????????
                return True
        except re.error:
            pass
        return False
charm_helpers_sync.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_filter(opts=None):
    opts = opts or []
    if 'inc=*' in opts:
        # do not filter any files, include everything
        return None

    def _filter(dir, ls):
        incs = [opt.split('=').pop() for opt in opts if 'inc=' in opt]
        _filter = []
        for f in ls:
            _f = os.path.join(dir, f)

            if not os.path.isdir(_f) and not _f.endswith('.py') and incs:
                if True not in [fnmatch(_f, inc) for inc in incs]:
                    logging.debug('Not syncing %s, does not match include '
                                  'filters (%s)' % (_f, incs))
                    _filter.append(f)
                else:
                    logging.debug('Including file, which matches include '
                                  'filters (%s): %s' % (incs, _f))
            elif (os.path.isfile(_f) and not _f.endswith('.py')):
                logging.debug('Not syncing file: %s' % f)
                _filter.append(f)
            elif (os.path.isdir(_f) and not
                  os.path.isfile(os.path.join(_f, '__init__.py'))):
                logging.debug('Not syncing directory: %s' % f)
                _filter.append(f)
        return _filter
    return _filter
command.py 文件源码 项目:nova 作者: hubblestack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_tags(data):
    '''
    Retrieve all the tags for this distro from the yaml
    '''
    ret = {}
    distro = __grains__.get('osfinger')
    for audit_dict in data.get('command', []):
        # command:0
        for audit_id, audit_data in audit_dict.iteritems():
            # command:0:nodev
            tags_dict = audit_data.get('data', {})
            # command:0:nodev:data
            tags = None
            for osfinger in tags_dict:
                if osfinger == '*':
                    continue
                osfinger_list = [finger.strip() for finger in osfinger.split(',')]
                for osfinger_glob in osfinger_list:
                    if fnmatch.fnmatch(distro, osfinger_glob):
                        tags = tags_dict.get(osfinger)
                        break
                if tags is not None:
                    break
            # If we didn't find a match, check for a '*'
            if tags is None:
                tags = tags_dict.get('*', {})
            # command:0:nodev:data:Debian-8
            if 'tag' not in tags:
                tags['tag'] = ''
            tag = tags['tag']
            if tag not in ret:
                ret[tag] = []
            formatted_data = {'tag': tag,
                              'module': 'command'}
            formatted_data.update(audit_data)
            formatted_data.update(tags)
            formatted_data.pop('data')
            ret[tag].append(formatted_data)
    return ret
__init__.py 文件源码 项目:cpe_utils 作者: ExodusIntelligence 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def matches(self, cpe):
        """Return true or false if this CPE object matches
        the provided cpe_str, using wildcards.

        :param cpe: The cpe to compare against
        """
        # TODO see issue #3

        if self.vendor and not fnmatch.fnmatch(cpe.vendor, self.vendor):
            print ("vendor was false")
            return False
        elif self.product and not fnmatch.fnmatch(cpe.product, self.product):
            print ("product was false")      
            return False
        elif self.version and not fnmatch.fnmatch(cpe.version, self.version):
            print ("version was false")
            return False
        elif self.update and not fnmatch.fnmatch(cpe.update, self.update):
            print ("update was false")
            return False
        elif self.edition and not fnmatch.fnmatch(cpe.edition, self.edition):
            print ("edition was false")
            return False
        elif self.part and not fnmatch.fnmatch(cpe.part, self.part):
            print ("part was false")
            return False
        else:
            return True
loader.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _match_path(self, path, full_path, pattern):
        # override this method to use alternative matching strategy
        return fnmatch(path, pattern)
bdb.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_skipped_module(self, module_name):
        for pattern in self.skip:
            if fnmatch.fnmatch(module_name, pattern):
                return True
        return False
analyze.py 文件源码 项目:git-of-theseus 作者: erikbern 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_entries(commit):
    return [entry for entry in commit.tree.traverse()
            if entry.type == 'blob'
            and all([fnmatch.fnmatch(entry.path, pattern) for pattern in args.only])
            and not any([fnmatch.fnmatch(entry.path, pattern) for pattern in args.ignore])]
reactor.py 文件源码 项目:eventdriventalk 作者: cachedout 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def process_event(self, event):
        '''
        Take an event and attempt to match it
        in the list of keys.

        If found, schedule the requested action.
        '''
        if not self.opts:
            return
        for tag in self.opts:
            if fnmatch.fnmatch(event['tag'], tag):
                for action in self.opts[tag]['reactions']:
                    # Super-simple non-blocking appraoch
                    # Threading won't scale as much as a true event loop
                    # would. It will, however, handle cases where single-threaded
                    # loop would be blocked. Do you trust your reactions to be co-op?!
                    # Of course, the other side of this is thread-safety. Either way, be smart!
                    t = threading.Thread(target=self.react, args=(action, event))
                    t.start()
                if 'rules' in self.opts[tag]:
                    rule_actions = []
                for rule in self.opts[tag]['rules']:
                    rule_actions = process_rule(rule, event, tracking_id)
                    if rule_actions:
                        for action in rule_actions:
                            self.react(action.keys()[0], action.values())
                    else:
                        # Rule chaining ends when a rule does not match
                        break
salt_event.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def _process(self, event):
        """Processes a raw event

        Creates the proper salt event class wrapper and notifies listeners

        Args:
            event (dict): the raw event data
        """
        logger.debug("Process event -> %s", event)
        wrapper = None
        if fnmatch.fnmatch(event['tag'], 'salt/job/*/new'):
            wrapper = NewJobEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_new_job_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/run/*/new'):
            wrapper = NewRunnerEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_new_runner_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/job/*/ret/*'):
            wrapper = RetJobEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_ret_job_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/run/*/ret'):
            wrapper = RetRunnerEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_ret_runner_event(wrapper)
        elif fnmatch.fnmatch(event['tag'], 'salt/state_result/*'):
            wrapper = StateResultEvent(event)
            for listener in self.listeners:
                listener.handle_salt_event(wrapper)
                listener.handle_state_result_event(wrapper)
label.py 文件源码 项目:Proxy46 作者: Macronut 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def matchGlob(self,pattern):
        if type(pattern) != DNSLabel:
            pattern = DNSLabel(pattern)
        return fnmatch.fnmatch(str(self).lower(),str(pattern).lower())
recipe-527746.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def Walk(root='.', recurse=True, pattern='*'):
    """
        Generator for walking a directory tree.
        Starts at specified root folder, returning files
        that match our pattern. Optionally will also
        recurse through sub-folders.
    """
    for path, subdirs, files in os.walk(root):
        for name in files:
            if fnmatch.fnmatch(name, pattern):
                yield os.path.join(path, name)
        if not recurse:
            break
recipe-414771.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def Walk( root, recurse=0, pattern='*', return_folders=0 ):
    import fnmatch, os, string

    # initialize
    result = []

    # must have at least root folder
    try:
        names = os.listdir(root)
    except os.error:
        return result

    # expand pattern
    pattern = pattern or '*'
    pat_list = string.splitfields( pattern , ';' )

    # check each file
    for name in names:
        fullname = os.path.normpath(os.path.join(root, name))

        # grab if it matches our pattern and entry type
        for pat in pat_list:
            if fnmatch.fnmatch(name, pat):
                if os.path.isfile(fullname) or (return_folders and os.path.isdir(fullname)):
                    result.append(fullname)
                continue

        # recursively scan other folders, appending results
        if recurse:
            if os.path.isdir(fullname) and not os.path.islink(fullname):
                result = result + Walk( fullname, recurse, pattern, return_folders )

    return result
recipe-120991.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def callback(arg, directory, files):
    for file in files:
        if fnmatch.fnmatch(file,arg):
            for line in fileinput.input(os.path.abspath(os.path.join(directory, file)),inplace=1):
                if re.search('.*theunderdogs.*', line): # I changed * to .* but it would probably work without this if
                    line = string.replace(line,'theunderdogs','the-underdogs') # old string , new string
                print line,
runbooks.py 文件源码 项目:automatron 作者: madflojo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def apply_to_targets(runbooks, config, dbc):
    ''' Match hosts with runbooks '''
    targets = dbc.get_target()
    logger.debug("Found targets: {0}".format(json.dumps(targets)))
    for target in targets.keys():
        # Create runbook dictionary if it doesn't exist
        if "runbooks" not in targets[target].keys():
            logger.debug("Creating runbook dictionary in target config")
            targets[target]['runbooks'] = {}
        logger.debug("Identifying runbooks for target {0}".format(target))
        for matcher in runbooks.keys():
            if fnmatch.fnmatch(targets[target]['hostname'], matcher):
                for runbook in runbooks[matcher].keys():
                    logger.debug("Checking if {0} is already applied".format(runbook))
                    if runbook not in targets[target]['runbooks'].keys():
                        try:
                            targets[target]['runbooks'][runbook] = render_runbooks(
                                runbooks[matcher][runbook],
                                targets[target]['facts'])
                        except Exception as e:
                            logger.warn("Could not apply runbook {0} to target {1}: {2}".format(
                                runbook,
                                targets[target]['hostname'],
                                e.message
                            ))
                        dbc.save_target(target=targets[target])
                        msg = {
                            'msg_type' : 'runbook_add',
                            'runbook' : runbook,
                            'target' : target}
                        logger.debug("Adding runbook policy {0} to target {1}".format(
                            runbook, target))
                        count = dbc.notify("monitors", msg)
                        logger.info("Notified {0} of runbook changes to target {1}".format(
                            count, target))
                    else:
                        logger.debug("{0} is already applied to target {1}".format(runbook, target))
    return True
utils.py 文件源码 项目:zmirror 作者: aploium 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_domain_match_glob_whitelist(domain):
    """
    ?????? `domains_whitelist_auto_add_glob_list` ???????
    :type domain: str
    :rtype: bool
    """
    for domain_glob in domains_whitelist_auto_add_glob_list:
        if fnmatch(domain, domain_glob):
            return True
    return False
pep8.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def filename_match(filename, patterns, default=True):
    """Check if patterns contains a pattern that matches filename.

    If patterns is unspecified, this always returns True.
    """
    if not patterns:
        return default
    return any(fnmatch(filename, pattern) for pattern in patterns)
gui.py 文件源码 项目:dottorrent-gui 作者: kz26 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        def callback(*args):
            return self.isInterruptionRequested()

        entries = os.listdir(self.path)
        for i, p in enumerate(entries):
            if any(fnmatch(p, ex) for ex in self.exclude):
                continue
            p = os.path.join(self.path, p)
            if not dottorrent.is_hidden_file(p):
                sfn = os.path.split(p)[1] + '.torrent'
                self.progress_update.emit(sfn, i, len(entries))
                t = dottorrent.Torrent(
                    p,
                    exclude=self.exclude,
                    trackers=self.trackers,
                    web_seeds=self.web_seeds,
                    private=self.private,
                    source=self.source,
                    comment=self.comment,
                    include_md5=self.include_md5,
                    creation_date=datetime.now(),
                    created_by=CREATOR
                )
                try:
                    self.success = t.generate(callback=callback)
                # ignore empty inputs
                except dottorrent.exceptions.EmptyInputException:
                    continue
                except Exception as exc:
                    self.onError.emit(str(exc))
                    return
                if self.isInterruptionRequested():
                    return
                if self.success:
                    with open(os.path.join(self.save_dir, sfn), 'wb') as f:
                        t.save(f)
command.py 文件源码 项目:hubble-salt 作者: hubblestack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_tags(data):
    '''
    Retrieve all the tags for this distro from the yaml
    '''
    ret = {}
    distro = __grains__.get('osfinger')
    for audit_dict in data.get('command', []):
        # command:0
        for audit_id, audit_data in audit_dict.iteritems():
            # command:0:nodev
            tags_dict = audit_data.get('data', {})
            # command:0:nodev:data
            tags = None
            for osfinger in tags_dict:
                if osfinger == '*':
                    continue
                osfinger_list = [finger.strip() for finger in osfinger.split(',')]
                for osfinger_glob in osfinger_list:
                    if fnmatch.fnmatch(distro, osfinger_glob):
                        tags = tags_dict.get(osfinger)
                        break
                if tags is not None:
                    break
            # If we didn't find a match, check for a '*'
            if tags is None:
                tags = tags_dict.get('*', {})
            # command:0:nodev:data:Debian-8
            if 'tag' not in tags:
                tags['tag'] = ''
            tag = tags['tag']
            if tag not in ret:
                ret[tag] = []
            formatted_data = {'tag': tag,
                              'module': 'command'}
            formatted_data.update(audit_data)
            formatted_data.update(tags)
            formatted_data.pop('data')
            ret[tag].append(formatted_data)
    return ret


问题


面经


文章

微信
公众号

扫码关注公众号