def run_per_file(config, ignore_paths=None, path=None, config_dir=None):
ignore_paths = ignore_paths or []
path = path or os.getcwd()
cmd = run_config(config, config_dir)
print(cmd)
run_cmds = []
patterns = PATTERNS.get(config.get('language'))
paths = all_filenames_in_dir(path=path, ignore_paths=ignore_paths)
for pattern in patterns:
for filepath in fnmatch.filter(paths, pattern):
run_cmds.append(cmd + [filepath])
pool = Pool()
def result(run_cmd):
_, out = run_command(run_cmd)
return run_cmd[-1], out
output = pool.map(result, run_cmds)
return output
python类fnmatch()的实例源码
def _render(self, target_name, read_path, write_path):
"""Render a given template or directory for the target.
:param target_name: String. Project or App name to render.
:param read_path: String. Path to template or directory to render.
:param write_path: String. Path to write to (or create directory).
"""
if os.path.isdir(read_path):
if os.path.split(read_path)[1] == 'project_name':
write_path = os.path.join(os.path.split(write_path)[0], self.variables['project_name'])
os.mkdir(write_path)
for filename in os.listdir(read_path):
if fnmatch(filename, 'test_*'):
write_filename = filename.replace('test_', f'test_{target_name}_')
else:
write_filename = filename
self._render(target_name, os.path.join(read_path, filename), os.path.join(write_path, write_filename))
else:
tpl = Template(filename=read_path)
with open(os.path.splitext(write_path)[0], 'w') as f:
f.write(tpl.render(**self.variables))
def match_file(self, filename):
"""Used to check if files can be handled by this linter,
Often this will just file extension checks."""
pattern = self.options.get('pattern') or self.default_pattern
if not pattern:
return True
globs = pattern.split()
for glob in globs:
if fnmatch.fnmatch(filename, glob):
# ??? glob ??
return True
try:
if re.match(pattern, filename, re.I):
# ???????????
return True
except re.error:
pass
return False
def get_filter(opts=None):
opts = opts or []
if 'inc=*' in opts:
# do not filter any files, include everything
return None
def _filter(dir, ls):
incs = [opt.split('=').pop() for opt in opts if 'inc=' in opt]
_filter = []
for f in ls:
_f = os.path.join(dir, f)
if not os.path.isdir(_f) and not _f.endswith('.py') and incs:
if True not in [fnmatch(_f, inc) for inc in incs]:
logging.debug('Not syncing %s, does not match include '
'filters (%s)' % (_f, incs))
_filter.append(f)
else:
logging.debug('Including file, which matches include '
'filters (%s): %s' % (incs, _f))
elif (os.path.isfile(_f) and not _f.endswith('.py')):
logging.debug('Not syncing file: %s' % f)
_filter.append(f)
elif (os.path.isdir(_f) and not
os.path.isfile(os.path.join(_f, '__init__.py'))):
logging.debug('Not syncing directory: %s' % f)
_filter.append(f)
return _filter
return _filter
def _get_tags(data):
'''
Retrieve all the tags for this distro from the yaml
'''
ret = {}
distro = __grains__.get('osfinger')
for audit_dict in data.get('command', []):
# command:0
for audit_id, audit_data in audit_dict.iteritems():
# command:0:nodev
tags_dict = audit_data.get('data', {})
# command:0:nodev:data
tags = None
for osfinger in tags_dict:
if osfinger == '*':
continue
osfinger_list = [finger.strip() for finger in osfinger.split(',')]
for osfinger_glob in osfinger_list:
if fnmatch.fnmatch(distro, osfinger_glob):
tags = tags_dict.get(osfinger)
break
if tags is not None:
break
# If we didn't find a match, check for a '*'
if tags is None:
tags = tags_dict.get('*', {})
# command:0:nodev:data:Debian-8
if 'tag' not in tags:
tags['tag'] = ''
tag = tags['tag']
if tag not in ret:
ret[tag] = []
formatted_data = {'tag': tag,
'module': 'command'}
formatted_data.update(audit_data)
formatted_data.update(tags)
formatted_data.pop('data')
ret[tag].append(formatted_data)
return ret
def matches(self, cpe):
"""Return true or false if this CPE object matches
the provided cpe_str, using wildcards.
:param cpe: The cpe to compare against
"""
# TODO see issue #3
if self.vendor and not fnmatch.fnmatch(cpe.vendor, self.vendor):
print ("vendor was false")
return False
elif self.product and not fnmatch.fnmatch(cpe.product, self.product):
print ("product was false")
return False
elif self.version and not fnmatch.fnmatch(cpe.version, self.version):
print ("version was false")
return False
elif self.update and not fnmatch.fnmatch(cpe.update, self.update):
print ("update was false")
return False
elif self.edition and not fnmatch.fnmatch(cpe.edition, self.edition):
print ("edition was false")
return False
elif self.part and not fnmatch.fnmatch(cpe.part, self.part):
print ("part was false")
return False
else:
return True
def _match_path(self, path, full_path, pattern):
# override this method to use alternative matching strategy
return fnmatch(path, pattern)
def is_skipped_module(self, module_name):
for pattern in self.skip:
if fnmatch.fnmatch(module_name, pattern):
return True
return False
def get_entries(commit):
return [entry for entry in commit.tree.traverse()
if entry.type == 'blob'
and all([fnmatch.fnmatch(entry.path, pattern) for pattern in args.only])
and not any([fnmatch.fnmatch(entry.path, pattern) for pattern in args.ignore])]
def process_event(self, event):
'''
Take an event and attempt to match it
in the list of keys.
If found, schedule the requested action.
'''
if not self.opts:
return
for tag in self.opts:
if fnmatch.fnmatch(event['tag'], tag):
for action in self.opts[tag]['reactions']:
# Super-simple non-blocking appraoch
# Threading won't scale as much as a true event loop
# would. It will, however, handle cases where single-threaded
# loop would be blocked. Do you trust your reactions to be co-op?!
# Of course, the other side of this is thread-safety. Either way, be smart!
t = threading.Thread(target=self.react, args=(action, event))
t.start()
if 'rules' in self.opts[tag]:
rule_actions = []
for rule in self.opts[tag]['rules']:
rule_actions = process_rule(rule, event, tracking_id)
if rule_actions:
for action in rule_actions:
self.react(action.keys()[0], action.values())
else:
# Rule chaining ends when a rule does not match
break
def _process(self, event):
"""Processes a raw event
Creates the proper salt event class wrapper and notifies listeners
Args:
event (dict): the raw event data
"""
logger.debug("Process event -> %s", event)
wrapper = None
if fnmatch.fnmatch(event['tag'], 'salt/job/*/new'):
wrapper = NewJobEvent(event)
for listener in self.listeners:
listener.handle_salt_event(wrapper)
listener.handle_new_job_event(wrapper)
elif fnmatch.fnmatch(event['tag'], 'salt/run/*/new'):
wrapper = NewRunnerEvent(event)
for listener in self.listeners:
listener.handle_salt_event(wrapper)
listener.handle_new_runner_event(wrapper)
elif fnmatch.fnmatch(event['tag'], 'salt/job/*/ret/*'):
wrapper = RetJobEvent(event)
for listener in self.listeners:
listener.handle_salt_event(wrapper)
listener.handle_ret_job_event(wrapper)
elif fnmatch.fnmatch(event['tag'], 'salt/run/*/ret'):
wrapper = RetRunnerEvent(event)
for listener in self.listeners:
listener.handle_salt_event(wrapper)
listener.handle_ret_runner_event(wrapper)
elif fnmatch.fnmatch(event['tag'], 'salt/state_result/*'):
wrapper = StateResultEvent(event)
for listener in self.listeners:
listener.handle_salt_event(wrapper)
listener.handle_state_result_event(wrapper)
def matchGlob(self,pattern):
if type(pattern) != DNSLabel:
pattern = DNSLabel(pattern)
return fnmatch.fnmatch(str(self).lower(),str(pattern).lower())
def Walk(root='.', recurse=True, pattern='*'):
"""
Generator for walking a directory tree.
Starts at specified root folder, returning files
that match our pattern. Optionally will also
recurse through sub-folders.
"""
for path, subdirs, files in os.walk(root):
for name in files:
if fnmatch.fnmatch(name, pattern):
yield os.path.join(path, name)
if not recurse:
break
def Walk( root, recurse=0, pattern='*', return_folders=0 ):
import fnmatch, os, string
# initialize
result = []
# must have at least root folder
try:
names = os.listdir(root)
except os.error:
return result
# expand pattern
pattern = pattern or '*'
pat_list = string.splitfields( pattern , ';' )
# check each file
for name in names:
fullname = os.path.normpath(os.path.join(root, name))
# grab if it matches our pattern and entry type
for pat in pat_list:
if fnmatch.fnmatch(name, pat):
if os.path.isfile(fullname) or (return_folders and os.path.isdir(fullname)):
result.append(fullname)
continue
# recursively scan other folders, appending results
if recurse:
if os.path.isdir(fullname) and not os.path.islink(fullname):
result = result + Walk( fullname, recurse, pattern, return_folders )
return result
def callback(arg, directory, files):
for file in files:
if fnmatch.fnmatch(file,arg):
for line in fileinput.input(os.path.abspath(os.path.join(directory, file)),inplace=1):
if re.search('.*theunderdogs.*', line): # I changed * to .* but it would probably work without this if
line = string.replace(line,'theunderdogs','the-underdogs') # old string , new string
print line,
def apply_to_targets(runbooks, config, dbc):
''' Match hosts with runbooks '''
targets = dbc.get_target()
logger.debug("Found targets: {0}".format(json.dumps(targets)))
for target in targets.keys():
# Create runbook dictionary if it doesn't exist
if "runbooks" not in targets[target].keys():
logger.debug("Creating runbook dictionary in target config")
targets[target]['runbooks'] = {}
logger.debug("Identifying runbooks for target {0}".format(target))
for matcher in runbooks.keys():
if fnmatch.fnmatch(targets[target]['hostname'], matcher):
for runbook in runbooks[matcher].keys():
logger.debug("Checking if {0} is already applied".format(runbook))
if runbook not in targets[target]['runbooks'].keys():
try:
targets[target]['runbooks'][runbook] = render_runbooks(
runbooks[matcher][runbook],
targets[target]['facts'])
except Exception as e:
logger.warn("Could not apply runbook {0} to target {1}: {2}".format(
runbook,
targets[target]['hostname'],
e.message
))
dbc.save_target(target=targets[target])
msg = {
'msg_type' : 'runbook_add',
'runbook' : runbook,
'target' : target}
logger.debug("Adding runbook policy {0} to target {1}".format(
runbook, target))
count = dbc.notify("monitors", msg)
logger.info("Notified {0} of runbook changes to target {1}".format(
count, target))
else:
logger.debug("{0} is already applied to target {1}".format(runbook, target))
return True
def is_domain_match_glob_whitelist(domain):
"""
?????? `domains_whitelist_auto_add_glob_list` ???????
:type domain: str
:rtype: bool
"""
for domain_glob in domains_whitelist_auto_add_glob_list:
if fnmatch(domain, domain_glob):
return True
return False
def filename_match(filename, patterns, default=True):
"""Check if patterns contains a pattern that matches filename.
If patterns is unspecified, this always returns True.
"""
if not patterns:
return default
return any(fnmatch(filename, pattern) for pattern in patterns)
def run(self):
def callback(*args):
return self.isInterruptionRequested()
entries = os.listdir(self.path)
for i, p in enumerate(entries):
if any(fnmatch(p, ex) for ex in self.exclude):
continue
p = os.path.join(self.path, p)
if not dottorrent.is_hidden_file(p):
sfn = os.path.split(p)[1] + '.torrent'
self.progress_update.emit(sfn, i, len(entries))
t = dottorrent.Torrent(
p,
exclude=self.exclude,
trackers=self.trackers,
web_seeds=self.web_seeds,
private=self.private,
source=self.source,
comment=self.comment,
include_md5=self.include_md5,
creation_date=datetime.now(),
created_by=CREATOR
)
try:
self.success = t.generate(callback=callback)
# ignore empty inputs
except dottorrent.exceptions.EmptyInputException:
continue
except Exception as exc:
self.onError.emit(str(exc))
return
if self.isInterruptionRequested():
return
if self.success:
with open(os.path.join(self.save_dir, sfn), 'wb') as f:
t.save(f)
def _get_tags(data):
'''
Retrieve all the tags for this distro from the yaml
'''
ret = {}
distro = __grains__.get('osfinger')
for audit_dict in data.get('command', []):
# command:0
for audit_id, audit_data in audit_dict.iteritems():
# command:0:nodev
tags_dict = audit_data.get('data', {})
# command:0:nodev:data
tags = None
for osfinger in tags_dict:
if osfinger == '*':
continue
osfinger_list = [finger.strip() for finger in osfinger.split(',')]
for osfinger_glob in osfinger_list:
if fnmatch.fnmatch(distro, osfinger_glob):
tags = tags_dict.get(osfinger)
break
if tags is not None:
break
# If we didn't find a match, check for a '*'
if tags is None:
tags = tags_dict.get('*', {})
# command:0:nodev:data:Debian-8
if 'tag' not in tags:
tags['tag'] = ''
tag = tags['tag']
if tag not in ret:
ret[tag] = []
formatted_data = {'tag': tag,
'module': 'command'}
formatted_data.update(audit_data)
formatted_data.update(tags)
formatted_data.pop('data')
ret[tag].append(formatted_data)
return ret