def ensure_soup(value, parser=None):
"""Coerce a value (or list of values) to Tag (or list of Tag).
:param value: String, BeautifulSoup, Tag, or list of the above
:param str parser: Parser to use; defaults to BeautifulSoup default
:return: Tag or list of Tags
"""
if isinstance(value, BeautifulSoup):
return value.find()
if isinstance(value, Tag):
return value
if isinstance(value, list):
return [
ensure_soup(item, parser=parser)
for item in value
]
parsed = BeautifulSoup(value, features=parser)
return parsed.find()
python类Tag()的实例源码
def lowercase_attr_names(tag):
"""Lower-case all attribute names of the provided BeautifulSoup tag.
Note: this mutates the tag's attribute names and does not return a new
tag.
:param Tag: BeautifulSoup tag
"""
# Use list comprehension instead of dict comprehension for 2.6 support
tag.attrs = dict([
(key.lower(), value)
for key, value in iteritems(tag.attrs)
])
def ensure_soup(value, parser=None):
"""Coerce a value (or list of values) to Tag (or list of Tag).
:param value: String, BeautifulSoup, Tag, or list of the above
:param str parser: Parser to use; defaults to BeautifulSoup default
:return: Tag or list of Tags
"""
if isinstance(value, BeautifulSoup):
return value.find()
if isinstance(value, Tag):
return value
if isinstance(value, list):
return [
ensure_soup(item, parser=parser)
for item in value
]
parsed = BeautifulSoup(value, features=parser)
return parsed.find()
def lowercase_attr_names(tag):
"""Lower-case all attribute names of the provided BeautifulSoup tag.
Note: this mutates the tag's attribute names and does not return a new
tag.
:param Tag: BeautifulSoup tag
"""
# Use list comprehension instead of dict comprehension for 2.6 support
tag.attrs = dict([
(key.lower(), value)
for key, value in iteritems(tag.attrs)
])
def get_zf_wb(self, z_jx=None):
if not isinstance(z_jx, Tag):
raise exception.NotFoudZfweibo()
div_attrs = {'node-type': 'feed_list_forwardContent'}
z_jx = z_jx.findChild(name='div', attrs=div_attrs)
self.z_jx = ZDetail(z_jx, self)
# ?? ??? text ????
def get(self):
if not self.dev.connected:
logger.error("{0}: Firewall timed out or incorrect device credentials.".format(self.firewall_config['name']))
return {'error' : 'Could not connect to device.'}, 504
else:
logger.info("{0}: Connected successfully.".format(self.firewall_config['name']))
rpc = etree.tostring(self.dev.rpc.get_commit_information(), encoding='unicode')
soup = BS(rpc,'xml')
entries = list()
logger.debug("soup: {0}".format(str(soup)))
for entry in soup.find('commit-information').children:
if type(entry) != Tag:
continue
entries.append({'user' : entry.user.text, 'sequence' : entry.find('sequence-number').text, 'date' : entry.find('date-time').text, 'comment' : entry.log.text if entry.log else None})
return {'len' : len(entries), 'commit' : entries}
def get(self):
if not self.dev.connected:
logger.error("{0}: Firewall timed out or incorrect device credentials.".format(self.firewall_config['name']))
return {'error' : 'Could not connect to device.'}, 504
else:
logger.info("{0}: Connected successfully.".format(self.firewall_config['name']))
try:
rpc = etree.tostring(str(jns.rpc.get_security_policies_hit_count()), encoding='unicode')
except Exception as e:
logger.error("Error parsing rpc: {0}".format(str(e)))
return {'error' : 'Error parsing soup.'}, 500
finally:
self.dev.close()
soup = BS(rpc,'xml')
entries = list()
for hitcount in soup.find('policy-hit-count').children:
if type(hitcount) != Tag or hitcount.name != 'policy-hit-count-entry':
continue
aux = {
'count' : int(hitcount.find('policy-hit-count-count').text),
'from' : hitcount.find('policy-hit-count-from-zone').text,
'to' : hitcount.find('policy-hit-count-to-zone').text,
'policy' : hitcount.find('policy-hit-count-policy-name').text
}
entries.append(aux)
return {'len' : len(entries), 'hitcount' : entries}
def __init__(self, element: Tag, curse: CurseAPI):
self.el = element
self.curse = curse
self.name = self.get_content("dt > a")
# Shhh it's OK
self.title = self.name
self.imgUrl = ""
self.likes = "N/A"
self.monthly = "N/A"
self.author = self.get_content("a", 1)
self.url = self.get_tag("dt > a", "href")
self.id = self.url.split("/")[-1]
try:
self.id = int(self.id.split("-")[0])
self.id = str(self.id)
except:
pass
self.type = self.url.split("/")[1]
def __init__(self, element: Tag, baseUrl: str):
self.el = element
# FTB Official Packs redirect to a different domain
dat = urlparse(baseUrl)
self.host = dat.scheme + "://" + dat.netloc
self.name = self.get_content(".project-file-name-container > a")
self.releaseType = self.get_tag(".project-file-release-type > div", "title")
self.uploaded = self.get_content(".standard-datetime")
self.url = self.get_tag(".project-file-name-container > a", "href")+"/download"
self.size = float(self.get_content(".project-file-size")[14:-13].replace(',', ''))
self.version = self.get_content(".version-label")
self.downloads = int(self.get_content(".project-file-downloads")[14:-10].replace(',', ''))
self.filename = ""
def _censorTagCandidateWithTemplate(self, candi_tag, template_tag, template_var_cache):
if not type(candi_tag) == element.Tag or not type(template_tag) == element.Tag:
return False
if not candi_tag.name == template_tag.name:
self.logger.debug('tag name inequality: \'%s\' is not equal to \'%s\'',
candi_tag.name, template_tag.name)
return False
for tmpAttrKey, tmpAttrValue in getDictIterItems(template_tag.attrs):
if tmpAttrValue == '%%':
# this means an empty variable,
# indicating that it is expected to be ignored.
continue
if not candi_tag.has_attr(tmpAttrKey):
self.logger.debug(candi_tag)
self.logger.debug('tag attr not exsits: no attr \'%s\' in \'%s\'',
tmpAttrKey, candi_tag.name)
return False
candiAttrValue = candi_tag[tmpAttrKey]
if tmpAttrKey == 'class':
tmpAttrValue = ' '.join(tmpAttrValue)
candiAttrValue = ' '.join(candiAttrValue)
matchObj = self.RegPattern.search(tmpAttrValue)
if matchObj is not None:
varName = matchObj.group(1)
varValue = candiAttrValue
self._procTemplateVariable(varName, varValue, template_var_cache)
elif not tmpAttrValue == candiAttrValue:
self.logger.debug(candi_tag)
self.logger.debug('tag attr inequality: \'%s\' is not equal to \'%s\' in \'%s\'',
tmpAttrValue, candiAttrValue, candi_tag.name)
return False
return True
def _parseTagRecursive(self, candi_tag, template_tag, template_var_cache):
for idx, tmpChild in enumerate(template_tag.contents):
if tmpChild.name == 'lisp_pass':
# this means <...>,
# indicating that anything in this tag is expected to be ignored.
continue
if len(candi_tag.contents) <= idx:
return False
candiChild = candi_tag.contents[idx]
typeCandi = type(candiChild)
typeTmp = type(tmpChild)
valid = False
if typeCandi == typeTmp == element.Tag:
if self._censorTagCandidateWithTemplate(candiChild, tmpChild, template_var_cache):
valid = self._parseTagRecursive(candiChild, tmpChild, template_var_cache)
elif typeCandi == typeTmp == element.NavigableString:
valid = self._censorNaviStrCandidateWithTemplate(
candiChild, tmpChild, template_var_cache)
if valid is False and len(template_var_cache) > 0:
self.logger.warning(template_tag)
self.logger.warning(candi_tag)
self.logger.warning('censor not passed. cache will be cleared')
template_var_cache.clear()
return False
return True
def ParseHtmlContent(self, html_content):
def _searching_helper_func(tag):
templateVarsCache = {}
ret = self._censorTagCandidateWithTemplate(tag, templateRootTag, templateVarsCache)
if ret is True:
self._mergeTemplateVariablesWithCache(templateVarsCache)
return ret
hitTemplateElems = self.Config.HitTemplate['Elements']
for elem in hitTemplateElems:
elem = self._stripWhitespaceAndReturnBeforeParsing(elem)
templateSoup = BeautifulSoup(elem, self.bs4Parser)
if self.bs4Parser == 'html5lib':
templateRootTag = templateSoup.body.contents[0]
else:
templateRootTag = templateSoup.contents[0]
if not type(templateRootTag) == element.Tag:
# TODO: what do we do for this ?
pass
htmlContent = self._stripWhitespaceAndReturnBeforeParsing(html_content)
htmlSoup = BeautifulSoup(htmlContent, self.bs4Parser)
tagCandidates = htmlSoup.find_all(_searching_helper_func)
for candiTag in tagCandidates:
templateVarsCache = {}
self._parseTagRecursive(candiTag, templateRootTag, templateVarsCache)
if not len(templateVarsCache) == 0:
self._mergeTemplateVariablesWithCache(templateVarsCache)
def img_tags(self, prefix=None) -> List[Tag]:
tags = self.soup.select('img[{}]'.format(self.src_attr))
if prefix:
return [t for t in tags if t[self.src_attr].startswith(prefix)]
return tags
def is_tag(self):
''' Check if this element is a notmal tag
'''
return isinstance(self.context, Tag)
def explore_children(node, soup, args):
if type(node) in (Tag, BeautifulSoup):
if DEBUG_MODE:
print("NODE: {}".format(node.name))
print("VALUE: {}".format(node.string))
print("ATTRIBUTES: {}".format(node.attrs))
if node.string is not None:
fuzz_node(node,
soup,
do_inject_file = args.inject_file_xxe,
do_inject_expect = args.inject_expect_xxe)
for cur_attr in node.attrs.iterkeys():
fuzz_attr(node,
cur_attr,
soup,
do_inject_file = args.inject_file_xxe,
do_inject_expect = args.inject_expect_xxe)
for child in node.children:
explore_children(child, soup, args)
def checkRequirement(self, rule):
# unusable rules
if not rule or type(rule) != element.Tag or rule['ruletype'] not in allowed_rule_type or rule['per_complete'] in disallowed_per_complete:
return 10000 # return a impossible number
if rule.requirement and rule.requirement.has_attr('numgroups'):
n = int(rule.requirement['numgroups'])
shortlist = list()
for child_rule in rule.find_all('rule'):
# in case there are multiple subrules
if child_rule.has_attr('per_complete') and child_rule.get('per_complete') not in disallowed_per_complete \
and child_rule['ruletype'] in allowed_rule_type:
shortlist.append(self.checkRequirement(child_rule))
# sort the list and choose the first n (smallest) subrules
return sum(sorted(shortlist)[:n])
else:
return int(rule.requirement['classes_begin']) - int(rule.classes_applied.text)
def get(self,args):
logger.debug("class rules(JUNOS).get({0})".format(str(args)))
if not self.dev.connected:
logger.error("{0}: Firewall timed out or incorrect device credentials.".format(self.firewall_config['name']))
return {'error' : 'Could not connect to device.'}, 504
else:
logger.info("{0}: Connected successfully.".format(self.firewall_config['name']))
try:
soup = BS(str(etree.tostring(self.dev.rpc.get_firewall_policies(), encoding='unicode')),'xml')
logger.debug("soup: " + str(soup))
except Exception as e:
logger.error("Error parsing soup: {0}".format(str(e)))
return {'error' : 'Error parsing soup.'}, 500
finally:
logger.debug("Closing device...")
self.dev.close()
entries = list()
for context in soup.find("security-policies").children:
if type(context) != Tag:
continue
elif context.name == "default-policy":
continue
else:
logger.debug("context: {0}".format(str(context)))
src_zone = context.find("context-information").find("source-zone-name").text
dst_zone = context.find("context-information").find("destination-zone-name").text
logger.debug("src_zone: {0}\ndst_zone: {1}\n".format(src_zone,dst_zone))
for rule in context.children:
logger.debug("Rule: {0}".format(str(rule)))
if rule.name == "context-information" or type(rule) != Tag:
continue
aux = {
"enabled" : True if rule.find('policy-state').text == 'enabled' else False,
"id" : int(rule.find('policy-identifier').text),
"action": rule.find('policy-information').find('policy-action').find('action-type').text,
"destination": list(),
"from": src_zone,
"logging": False if rule.find('policy-information').find('policy-action').find('log') else rule.find('policy-information').find('policy-action').find('log'),
"name": rule.find('policy-information').find('policy-name').text,
"application": list(),
"source": list(),
"to": dst_zone
}
for addr in rule.find('source-addresses').children:
if type(addr) != Tag:
continue
aux['source'].append(addr.find('address-name').text)
for addr in rule.find('destination-addresses').children:
if type(addr) != Tag:
continue
aux['destination'].append(addr.find('address-name').text)
for addr in rule.find('applications').children:
if type(addr) != Tag:
continue
aux['application'].append(addr.find('application-name').text)
entries.append(aux)
#entries = self.filter(args,entries)
return {'len' : len(entries), 'rules' : entries}
def __init__(self, element: Tag, detailed=False):
self.el = element
self.detailed = detailed
if detailed:
self.title = self.get_content(".project-title > a > span")
self.likes = 0
self.imgUrl = self.get_tag(".e-avatar64", "href")
self.el = self.el.select(".project-details")[0]
self.id = int(self.get_content(".info-data"))
self.updated = self.get_content(".standard-date", 1)
self.created = self.get_content(".standard-date")
self.total = int(self.get_content(".info-data", 3).replace(',', ''))
self.latestVersion = ""
return
self.title = self.get_content("h4 > a")
self.id = self.get_tag("h4 > a", "href").split("/")[-1]
try:
self.id = int(self.id.split("-")[0])
self.id = str(self.id)
except:
pass
try:
self.likes = int(self.get_content(".grats")[:-6].replace(',', ''))
except ValueError:
self.likes = 0
self.updated = self.get_content(".updated")[8:]
self.created = self.get_content(".updated", 1)[8:]
self.monthly = int(self.get_content(".average-downloads")[:-8].replace(',', ''))
self.total = int(self.get_content(".download-total")[:-6].replace(',', ''))
self.latestVersion = self.get_content(".version")[10:]
self.imgUrl = self.get_tag(".content-image > img", "src")
def check_html(runner, html, key=None, app=None, check_html=True, check_classes=True):
caller = stack()[1]
filepos = '{}:{:d}'.format(caller.filename.rpartition('/')[2], caller.lineno)
app = app or filepos.partition('_')[2].partition('.')[0]
if key:
filepos += '-{}'.format(key)
store = []
soup = BeautifulSoup(html, 'html.parser')
for desc in soup.descendants:
if isinstance(desc, Tag):
name = desc.name
attrs = desc.attrs
store.append(name)
for attr in sorted(attrs):
tag = str(attrs.get('name'))
if name == 'input' and tag == 'csrfmiddlewaretoken' and attr == 'value':
continue
store.append(attr)
val = attrs[attr]
if check_classes and attr == 'class':
for cls in val:
if cls:
runner.assertIn(cls, CLASS_ARRAY[app], msg=filepos)
if isinstance(val, list):
store.extend(sorted(val))
elif (isinstance(val, str)
and not (val.startswith(STATIC_URL) or ('date' in tag and attr == 'value'))):
if '?' in val:
part = val.rpartition('?')
store.append(part[0])
for arg in sorted(part[2].split('&')):
store.append(arg)
else:
store.append(val)
elif isinstance(desc, NavigableString):
store.append(str(desc))
string = ' '.join(' '.join(store).split())
hsh = md5(string.encode()).hexdigest()[:HASH_LEN]
if check_html:
if WRITE_CHECKFILE:
print(filepos, hsh, file=CHECKFILE)
elif CHECK_HTML:
runner.assertIn(filepos, CHECK_ARRAY, msg=filepos)
runner.assertEqual(CHECK_ARRAY[filepos][:HASH_LEN], hsh, msg=filepos)
def _fetch_courses(self):
body = "SERVICE=SCRIPTER&REPORT=WEB31&SCRIPT=SD2GETAUD%%26ContentType%%3Dxml&ACTION=REVAUDIT&ContentType=xml&STUID=%s&DEBUG=OFF" % (self.studentID)
r = requests.post(self.url, cookies=self.cookies, data=body)
soup = BeautifulSoup(r.text, 'lxml')
block = soup.find('block')
self.units_applied = float(block['credits_applied'])
for goal in soup.find('deginfo').findAll('goal'):
if goal['code'].lower() == 'major':
self.major.append(goal['valuelit'])
elif goal['code'].lower() == 'minor':
self.minor.append(goal['valuelit'])
elif goal['code'].lower() == 'spec':
self.spec.append(goal['valuelit'])
classes = soup.find("clsinfo")
for cls in classes.findAll("class"):
disc, num = '', ''
if len(cls.get('disc', '')) > 0:
disc = cls['disc']
elif len(cls.get('discipline', '')) > 0:
disc = cls['discipline']
if len(cls.get('num', '')) > 0:
num = cls['num']
elif len(cls.get('number', '')) > 0:
num = cls['number']
if len(disc) > 0 and len(num) > 0:
self.classes.add(disc + ' ' + num)
# check for each requirement
for rule in soup.find_all('rule', attrs={'indentlevel':'1'}):
if rule and type(rule) == element.Tag \
and rule['ruletype'] in allowed_rule_type and rule['per_complete'] not in disallowed_per_complete:
ge = re.match(ge_filter, rule.get('label', ''))
if not ge:
continue
self.ge_table['GE'+ge.group(1)] = self.checkRequirement(rule)
# for development purpose, print out how many classes are missing for each requirement
print ('@@@', 'GE'+ge.group(1), 'missing', self.ge_table['GE'+ge.group(1)], 'courses')
# return total missing courses for this rule