def create_table(self, table_string):
lines = table_string.split("\n")
table = Table()
for line in lines:
if 'TABLE' in line:
table_name = re.search("`(\w+)`", line)
table.name = table_name.group(1)
if self.thesaurus_object is not None:
table.equivalences = self.thesaurus_object.get_synonyms_of_a_word(table.name)
elif 'PRIMARY KEY' in line:
primary_key_columns = re.findall("`(\w+)`", line)
for primary_key_column in primary_key_columns:
table.add_primary_key(primary_key_column)
else:
column_name = re.search("`(\w+)`", line)
if column_name is not None:
column_type = self.predict_type(line)
if self.thesaurus_object is not None:
equivalences = self.thesaurus_object.get_synonyms_of_a_word(column_name.group(1))
else:
equivalences = []
table.add_column(column_name.group(1), column_type, equivalences)
return table
python类search()的实例源码
def run(self):
Analyzer.run(self)
if self.data_type == 'domain' or self.data_type == 'url':
try:
pattern = re.compile("(?:Category: )([\w\s]+)")
baseurl = 'http://www.fortiguard.com/webfilter?q='
url = baseurl + self.getData()
req = requests.get(url)
category_match = re.search(pattern, req.content, flags=0)
self.report({
'category': category_match.group(1)
})
except ValueError as e:
self.unexpectedError(e)
else:
self.notSupported()
def loopback_devices():
'''
Parse through 'losetup -a' output to determine currently mapped
loopback devices. Output is expected to look like:
/dev/loop0: [0807]:961814 (/tmp/my.img)
:returns: dict: a dict mapping {loopback_dev: backing_file}
'''
loopbacks = {}
cmd = ['losetup', '-a']
devs = [d.strip().split(' ') for d in
check_output(cmd).splitlines() if d != '']
for dev, _, f in devs:
loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
return loopbacks
linux-soft-exploit-suggester.py 文件源码
项目:linux-soft-exploit-suggester
作者: belane
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def searchExploit(exploit_list, soft_name, soft_version):
""" Search affected packages in exploit_list """
result = []
version_search = versionVartions(soft_version, args.level)
for exploit in exploit_list:
if exploit[5] in valid_platforms and (args.dos or exploit[6]!='dos' or args.type == 'dos'): # Platform and DoS
if args.filter == None or args.filter.lower() in exploit[2].lower(): # Filter
if args.type == None or args.type == exploit[6]: # Type
query = "(^(\w*\s){0,%s}|/\s?)%s(\s|\s.*\s|\/).* -" % (args.level, soft_name.replace('+', '\+'))
if re.search(query, exploit[2],re.IGNORECASE):
affected_versions = extractVersions(exploit[2])
for affected_version in affected_versions:
if args.level == 5 or LooseVersion(version_search) <= LooseVersion(affected_version):
if args.duplicates == False: exploit_list.remove(exploit) # Duplicates
printOutput(exploit, soft_name, soft_version)
result.append([exploit, soft_name, soft_version])
break
return result
linux-soft-exploit-suggester.py 文件源码
项目:linux-soft-exploit-suggester
作者: belane
项目源码
文件源码
阅读 38
收藏 0
点赞 0
评论 0
def extractVersions(title_string):
""" Extract all version numbers from a string """
search = re.search(r'\s-|\(|\&', title_string)
if search:
title_string = title_string[:search.span()[0]]
result = []
for possible_version in title_string.split():
if possible_version[0].isdigit():
if '/' in possible_version:
for multiversion in possible_version.split('/'):
if '-' in multiversion:
multiversion = '.'.join(multiversion.split('-')[0].split('.')[:-1]) + '.' + multiversion.split('-')[-1]
if purgeVersionString(multiversion):
result.append(purgeVersionString(multiversion))
elif '-' in possible_version:
result.append(purgeVersionString('.'.join(possible_version.split('-')[0].split('.')[:-1]) + '.' + possible_version.split('-')[-1]))
else:
result.append(purgeVersionString(possible_version))
return result
def test_oozie_workflow(self):
# given
input_data_dir = 'test/availability/input-data'
apps_streaming_dir = 'test/availability/apps/streaming'
cmd('hdfs dfs -rm -r %s' % input_data_dir)
cmd('hdfs dfs -mkdir -p %s' % input_data_dir)
cmd('hdfs dfs -put %s %s' % (sample_data, input_data_dir))
cmd('hdfs dfs -rm -r %s' % apps_streaming_dir)
cmd('hdfs dfs -mkdir -p %s' % apps_streaming_dir)
cmd('hdfs dfs -put resources/oozie/workflow.xml %s' % apps_streaming_dir)
# when
result = cmd('oozie job -oozie %s -config resources/oozie/job.properties -run' % oozie_host)
job_id = result.stdout.replace('job: ', '')
cmd('oozie job -oozie %s -poll %s -interval 1' % (oozie_host, job_id))
result = cmd('oozie job -oozie %s -info %s' % (oozie_host, job_id))
# than
status = re.search('Status\s+:\s+(.+)', result.stdout).group(1)
self.assertEqual('SUCCEEDED', status, result.stderr)
def commit_history(cli):
"""
Parse output of "show configuration history commit reverse detail"
"""
result = []
record = OrderedDict()
for line in cli.splitlines():
r = re.search(' ([A-Z][a-z]+(?: ID)?): (.*?) +([A-Z][a-z]+): (.*)', line)
if not r:
continue
record[r.group(1)] = r.group(2)
record[r.group(3)] = r.group(4)
if r.group(3) == 'Comment':
result.append(record)
record = OrderedDict()
return result
def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
# Limit the member search list up to tarinfo.
if tarinfo is not None:
members = members[:members.index(tarinfo)]
if normalize:
name = os.path.normpath(name)
for member in reversed(members):
if normalize:
member_name = os.path.normpath(member.name)
else:
member_name = member.name
if name == member_name:
return member
def find_external_links(url, page):
"""Find rel="homepage" and rel="download" links in `page`, yielding URLs"""
for match in REL.finditer(page):
tag, rel = match.groups()
rels = set(map(str.strip, rel.lower().split(',')))
if 'homepage' in rels or 'download' in rels:
for match in HREF.finditer(tag):
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
for tag in ("<th>Home Page", "<th>Download URL"):
pos = page.find(tag)
if pos != -1:
match = HREF.search(page, pos)
if match:
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
# Limit the member search list up to tarinfo.
if tarinfo is not None:
members = members[:members.index(tarinfo)]
if normalize:
name = os.path.normpath(name)
for member in reversed(members):
if normalize:
member_name = os.path.normpath(member.name)
else:
member_name = member.name
if name == member_name:
return member
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
limit = None
else:
# Search the archive before the link, because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def b16decode(s, casefold=False):
"""Decode the Base16 encoded bytes-like object or ASCII string s.
Optional casefold is a flag specifying whether a lowercase alphabet is
acceptable as input. For security purposes, the default is False.
The result is returned as a bytes object. A binascii.Error is raised if
s is incorrectly padded or if there are non-alphabet characters present
in the input.
"""
s = _bytes_from_decode_data(s)
if casefold:
s = s.upper()
if re.search(b'[^0-9A-F]', s):
raise binascii.Error('Non-base16 digit found')
return binascii.unhexlify(s)
#
# Ascii85 encoding/decoding
#
def compiler_is_clang(comp) :
print("check for clang compiler ...", end=' ')
try:
cc_output = subprocess.check_output(comp+['--version'],
stderr = subprocess.STDOUT, shell=False)
except OSError as ex:
print("compiler test call failed with error {0:d} msg: {1}".format(ex.errno, ex.strerror))
print("no")
return False
ret = re.search(b'clang', cc_output) is not None
if ret :
print("yes")
else:
print("no")
return ret
def validate_label(label):
# For now we can only handle [a-z ']
if "(" in label or \
"<" in label or \
"[" in label or \
"]" in label or \
"&" in label or \
"*" in label or \
"{" in label or \
re.search(r"[0-9]", label) != None:
return None
label = label.replace("-", "")
label = label.replace("_", "")
label = label.replace(".", "")
label = label.replace(",", "")
label = label.replace("?", "")
label = label.strip()
return label.lower()
def validate_label(label):
# For now we can only handle [a-z ']
if "(" in label or \
"<" in label or \
"[" in label or \
"]" in label or \
"&" in label or \
"*" in label or \
"{" in label or \
re.search(r"[0-9]", label) != None:
return None
label = label.replace("-", "")
label = label.replace("_", "")
label = label.replace(".", "")
label = label.replace(",", "")
label = label.replace("?", "")
label = label.strip()
return label.lower()
def string_find_id(string, regex):
"""Find a marker's ID using a regular expression
returns the ID as int if found, otherwise returns None
Args:
-string, any string
-regex, the regex pattern to match."""
if not string and regex:
raise AttributeError("Missing name or regex attribute")
import re
index = re.search(regex, string)
if index:
found_id = index.group(1)
return int(found_id) if found_id else None
return None
def main(cb, args):
powershells = cb.process_search_iter('process_name:powershell.exe')
for s in powershells:
if s['cmdline']:
encoded = re.search('\-[eE][nN][cC][oOdDeEcCmMaAnN]*\s([A-Za-z0-9\+/=]+)', s['cmdline'])
if encoded != None:
i = encoded.group(1)
if not re.search('[a-zA-Z0-9\+/]+={1,2}$', i):
trailingBytes = len(i) % 4
if trailingBytes == 3:
i = i + '='
elif trailingBytes == 2:
i = i + '=='
decodedCommand = base64.standard_b64decode(i)
try:
a = decodedCommand.encode('ascii', 'replace')
print "Powershell Decoded Command\n%s/#analyze/%s/1\n%s\n\n" % (
args['server_url'], s['id'], a.replace('\0', ""))
except UnicodeError:
print "Powershell Decoded Command\n%s/#analyze/%s/1\nNon-ASCII decoding, encoded form printed to assist more research\n%s\n" % (
args['server_url'], s['id'], s['cmdline'])
pass
def check_triggers(message):
global action_triggers
return_value = False
target_policy = ""
for policy in action_triggers:
# Need to make a copy here cause globals :(
rules = list(action_triggers[policy]['rules'])
while rules:
criteria = rules.pop(0)
expression = rules.pop(0)
if re.search(expression, message[criteria].lower()):
return_value = True
else:
return_value = False
rules = []
if return_value:
target_policy = action_triggers[policy]['targetpolicy']
break
return return_value,target_policy
def move_policy(sensor, targetPolicy):
global eptoken
global epserver
bit9 = bit9api.bit9Api(
"https://"+epserver, # Replace with actual Bit9 server URL
token=eptoken,
ssl_verify=False # Don't validate server's SSL certificate. Set to True unless using self-signed cert on IIS
)
# policy to send the naughty host to
targetPolicyName = targetPolicy
destPolicies = bit9.search('v1/policy', ['name:'+targetPolicyName])
if len(destPolicies)==0:
raise ValueError("Cannot find destination policy "+targetPolicyName)
# find the computer id
destComputer = bit9.search('v1/computer', ['cbSensorId:'+str(sensor)])
if len(destComputer)==0:
raise ValueError("Cannot find computer named "+hostname)
for c in destComputer:
print "Moving computer %s from policy %s to policy %s" % (c['name'], c['policyName'], targetPolicyName)
c['policyId'] = destPolicies[0]['id']
bit9.update('v1/computer', c)
def _parse_attribute(self, attribute_str):
"""Parses a attribute string and updates the CANBus
Args:
attribute_str: String with attribute
"""
pattern = 'BA_\s+"(?P<attr_name>\S+)"\s*(?P<node>BU_)?(?P<msg>BO_)?(?P<sig>SG_)?\s*'
pattern += '(?P<can_id>\d*)?\s*(?P<name>\S*)?\s+(?P<value>\S+)\s*;'
reg = re.search(pattern, attribute_str)
can_object = self._can_network
if reg.group('node'):
can_object = self._can_network.nodes[reg.group('name')]
elif reg.group('msg'):
can_object = self._can_network.get_message(int(reg.group('can_id')))
elif reg.group('sig'):
can_object = self._can_network.get_signal(int(reg.group('can_id')), reg.group('name'))
cad = self._can_network.attributes.definitions[reg.group('attr_name')]
can_object.attributes.add(CANAttribute(cad, value=self._parse_attribute_value(cad, reg.group('value'))))
def ssh_setup_agent(config, envkeys=None):
"""
Starts the ssh-agent
"""
envkeys = envkeys or ['SSH_PRIVATE_KEY']
output = os.popen('ssh-agent -s').readlines()
for line in output:
matches = re.search(r"(\S+)\=(\S+)\;", line)
if matches:
config.environ[matches.group(1)] = matches.group(2)
for envkey in envkeys:
key = os.environ.get(envkey)
if key:
ssh_add_key(config.environ, key)
else:
logging.warning('%s is missing', envkey)
def _getmember(self, name, tarinfo=None, normalize=False):
"""Find an archive member by name from bottom to top.
If tarinfo is given, it is used as the starting point.
"""
# Ensure that all members have been loaded.
members = self.getmembers()
# Limit the member search list up to tarinfo.
if tarinfo is not None:
members = members[:members.index(tarinfo)]
if normalize:
name = os.path.normpath(name)
for member in reversed(members):
if normalize:
member_name = os.path.normpath(member.name)
else:
member_name = member.name
if name == member_name:
return member
def find_external_links(url, page):
"""Find rel="homepage" and rel="download" links in `page`, yielding URLs"""
for match in REL.finditer(page):
tag, rel = match.groups()
rels = set(map(str.strip, rel.lower().split(',')))
if 'homepage' in rels or 'download' in rels:
for match in HREF.finditer(tag):
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
for tag in ("<th>Home Page", "<th>Download URL"):
pos = page.find(tag)
if pos != -1:
match = HREF.search(page, pos)
if match:
yield urllib.parse.urljoin(url, htmldecode(match.group(1)))
def next_unbalanced_tag(view, search=None, search_args={}, restart_at=None, tags=[]):
assert search and restart_at, 'wrong call'
region, tag, is_end_tag = search(view, **search_args)
if not region:
return None, None
if not is_end_tag:
tags.append(tag)
search_args.update(restart_at(region))
return next_unbalanced_tag(view, search, search_args, restart_at, tags)
if not tags or (tag not in tags):
return region, tag
while tag != tags.pop():
continue
search_args.update(restart_at(region))
return next_unbalanced_tag(view, search, search_args, restart_at, tags)
def _try_config_test(self, logcfg, epattern, foundTest=None ):
import ossie.utils.log4py.config
with stdout_redirect(cStringIO.StringIO()) as new_stdout:
ossie.utils.log4py.config.strConfig(logcfg,None)
new_stdout.seek(0)
found = []
epats=[]
if type(epattern) == str:
epats.append(epattern)
else:
epats = epattern
if foundTest == None:
foundTest = len(epats)*[True]
for x in new_stdout.readlines():
for epat in epats:
m=re.search( epat, x )
if m :
found.append( True )
self.assertEqual(found, foundTest )
def get_human(self, attr):
val = getattr(self, attr)
val = val.replace("_", " ")
product_mapping = {
"ie": "Internet Explorer"
}
if attr == "product" and val in product_mapping:
val = product_mapping[val]
# if there's lowercase letters in the value, make it a title
# (if there'FAILEDs not, leave it alone - e.g. SP3)
if re.search('[a-z]', val) is not None:
val = val.title()
if val.upper() in ["SP0", "SP1", "SP2", "SP3", "SP4", "SP5", "SP6"]:
val = val.upper()
if val.lower() in ["x86", "x64"]:
val = val.lower()
return val
def handle(self, player, action, values, **kwargs): # pragma: no cover
await self.close(player)
# Try to parse the button id instead of the whole action string.
button = action
try:
match = re.search('button_([0-9]+)$', action)
if len(match.groups()) == 1:
button = match.group(1)
except:
pass
if not self.response_future.done():
self.response_future.set_result(button)
if self.target:
await self.target(player, action, values, **kwargs)
def loopback_devices():
'''
Parse through 'losetup -a' output to determine currently mapped
loopback devices. Output is expected to look like:
/dev/loop0: [0807]:961814 (/tmp/my.img)
:returns: dict: a dict mapping {loopback_dev: backing_file}
'''
loopbacks = {}
cmd = ['losetup', '-a']
devs = [d.strip().split(' ') for d in
check_output(cmd).splitlines() if d != '']
for dev, _, f in devs:
loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
return loopbacks
def __call__(self):
settings = utils.get_settings('os')
with open('/proc/cpuinfo', 'r') as fd:
cpuinfo = fd.readlines()
for line in cpuinfo:
match = re.search(r"^vendor_id\s+:\s+(.+)", line)
if match:
vendor = match.group(1)
if vendor == "GenuineIntel":
vendor = "intel"
elif vendor == "AuthenticAMD":
vendor = "amd"
ctxt = {'arch': platform.processor(),
'cpuVendor': vendor,
'desktop_enable': settings['general']['desktop_enable']}
return ctxt
def loopback_devices():
'''
Parse through 'losetup -a' output to determine currently mapped
loopback devices. Output is expected to look like:
/dev/loop0: [0807]:961814 (/tmp/my.img)
:returns: dict: a dict mapping {loopback_dev: backing_file}
'''
loopbacks = {}
cmd = ['losetup', '-a']
devs = [d.strip().split(' ') for d in
check_output(cmd).splitlines() if d != '']
for dev, _, f in devs:
loopbacks[dev.replace(':', '')] = re.search('\((\S+)\)', f).groups()[0]
return loopbacks