def generate_html_gallery( person_suite ):
doc, tag, text = Doc().tagtext()
doc.asis('<!DOCTYPE html>')
with tag('html', lang="en"):
with tag('head'):
doc.asis('<meta charset="utf-8">')
doc.asis('<meta name="viewport" content="width=device-width, initial-scale=1">')
doc.asis('<link rel="stylesheet" href="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.6/css/bootstrap.min.css">')
with tag('script', src="https://ajax.googleapis.com/ajax/libs/jquery/1.12.0/jquery.min.js"):
pass
with tag('script', src="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.6/js/bootstrap.min.js"):
pass
with tag('body'):
with tag('div', klass="container-fluid"):
for person in person_suite:
print("Adding photos for user {0}".format(person.uid))
with tag('div', klass='row'):
for photo in person.photos:
with tag('div', klass="col-xs-1", style="padding-left: 5px; padding-right: 5px; padding-top: 5px; padding-bottom: 5px;"):
with tag('p'):
with tag('a', href=person.profile_url, target="_blank"):
doc.stag('img', src=photo, height="175", width="175")
return indent(doc.getvalue())
python类html()的实例源码
def __init__(self, id, title, data, export_dir, authors=[], modifiedTime=None, theme=None, editable_by_anyone=False, template='document', appliances=None, config={}):
log.info('Process document %s %s', id, title)
if theme is None:
self._theme = Theme(export_dir)
else:
self._theme = theme
self._template = template
self._config = config
self._export_dir = export_dir
self._authors = authors
self._modifiedTime = modifiedTime
self._data = data
self._title = title
self._appliances = appliances
self._id = id
self._html = lxml.html.fromstring(self._data)
text = html_to_text(self._data)
text = re.sub('\n\n+', '\n\n', text)
self._text = text.replace('\n', '<br/>')
self._editable_by_anyone = editable_by_anyone
def get_file_urls(mainUrl,extension):
uniFileUrls = []
if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'):
mainUrl = 'http://%s'%mainUrl
print('Downloading from %s...'%mainUrl)
if extension.startswith('*'):
extension = extension[1:]
if not extension.startswith('.'):
extension = '.' + extension
req = urllib.request.Request(
mainUrl,
data=None,
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
}
)
urlContent = urllib.request.urlopen(req).read().decode('utf-8')
html = lxml.html.fromstring(urlContent)
urls = html.xpath('//a/@href')
for url in urls:
if url.endswith(extension):
url = urljoin(mainUrl,url)
if url not in uniFileUrls:
uniFileUrls.append(url)
return uniFileUrls
def get_file_urls(mainUrl,extension):
uniFileUrls = []
if not mainUrl.lower().startswith('http://') and not mainUrl.lower().startswith('https://'):
mainUrl = 'http://%s'%mainUrl
print('Downloading from %s...'%mainUrl)
if extension.startswith('*'):
extension = extension[1:]
if not extension.startswith('.'):
extension = '.' + extension
req = urllib.request.Request(
mainUrl,
data=None,
headers={
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
}
)
urlContent = urllib.request.urlopen(req).read().decode('utf-8')
html = lxml.html.fromstring(urlContent)
urls = html.xpath('//a/@href')
for url in urls:
if url.endswith(extension):
url = urljoin(mainUrl,url)
if url not in uniFileUrls:
uniFileUrls.append(url)
return uniFileUrls
def get_list(self, search_url):
data = {}
# keylist = [0] * 5
data['table_name'] = 'dailyKeyword'
html = requests.get(search_url, headers=self.headers, verify=False).content
selector = etree.HTML(html)
# ????
keyurl = selector.xpath('//div[@class="aside"]/ol[@class="hot-news"]/li/a/@href')
keyword = selector.xpath('//div[@class="aside"]/ol[@class="hot-news"]/li/a/text()')
res = {}
res['keyurl'] = keyurl
res['keyword'] = keyword
for x in range(0,10):
data['keyword'] = keyword[x]
data ['keyurl'] = keyurl[x]
data ['id'] = (x+1)
self.save(data)
return res
# ??????
def login(self, username, password):
"""
logs the user in and returns a bool value
stores the username in self.username.
"""
get_response = self.uva_session.get(UvaSession.UVA_HOST)
login_text = lxml.html.fromstring(get_response.text)
hidden_inputs = login_text.xpath(r'//form//input[@type="hidden"]')
# print hidden_inputs
form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs if x.attrib['name'] not in ["cx", "ie"]}
form["username"] = username
form["passwd"] = password
form["remember"] = "yes"
login_response = self.uva_session.post(UvaSession.UVA_HOST + "index.php?option=com_comprofiler&task=login",
data=form, headers={"referer": UvaSession.UVA_HOST})
self.logged_in = login_response.url == UvaSession.UVA_HOST
if (self.logged_in): self.username = username
return self.logged_in
def get_clean_html(etree, text_only=False):
_is_etree(etree)
# enable filters to remove Javascript and CSS from HTML document
cleaner = Cleaner()
cleaner.javascript = True
cleaner.style = True
cleaner.html = True
cleaner.page_structure = False
cleaner.meta = False
cleaner.safe_attrs_only = False
cleaner.links = False
html = cleaner.clean_html(etree)
if text_only:
return html.text_content()
return lxml.html.tostring(html)
def parse_that(url):
resp = requests.get(url)
url = url
raw = resp.text
tree = get_etree(raw)
title = doctitle(tree)
links = get_links(tree, url)
keywords = get_url_keywords(url)
meta_description = meta_name_description(tree)
html = get_clean_html(tree)
text_content = get_clean_html(tree, text_only=True)
return {'rank': 0,
'title': title,
'url': url,
'description': meta_description,
'keywords': keywords,
'raw': raw,
'text': text_content,
'internal_links': links['internal'],
'external_links': links['external']}
def open(self, url, timeout=60):
"""Wait for download to complete and return result"""
loop = QEventLoop()
timer = QTimer()
timer.setSingleShot(True)
timer.timeout.connect(loop.quit)
self.loadFinished.connect(loop.quit)
self.load(QUrl(url))
timer.start(timeout * 1000)
loop.exec_() # delay here until download finished
if timer.isActive():
# downloaded successfully
timer.stop()
return self.html()
else:
# timed out
print 'Request timed out:', url
def register(first_name, last_name, email, password, captcha_fn):
cj = cookielib.CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
html = opener.open(REGISTER_URL).read()
form = parse_form(html)
form['first_name'] = first_name
form['last_name'] = last_name
form['email'] = email
form['password'] = form['password_two'] = password
img = extract_image(html)
captcha = captcha_fn(img)
form['recaptcha_response_field'] = captcha
encoded_data = urllib.urlencode(form)
request = urllib2.Request(REGISTER_URL, encoded_data)
response = opener.open(request)
success = '/user/register' not in response.geturl()
return success
def remove_html_encode_errors(self, headers, error):
"""
Use this method to remove html special characters (Eg. &nbps), encoding errors or other unicode text.
Simply pass headers rows to the method and the error, as a unicode string, you want to correct
:param headers: rows list of headers
:param error: unicode string you want to delete from header cells
:return: nothing
"""
# Iterates over headers
for row in headers:
# Iterate over header cells
for header in row:
# Replace 'error' with u'' in the text of this header cell
header['th'] = header['th'].replace(error, u'')
def url_composer(self, query, service):
"""
This function is used to compose a url to call some web services, such as sparql endpoints.
:param query: is the string used in some rest calls.
:param service: type of service you request (dbpedia sparql endpoint)
:return url: the url composed
"""
# use quote_plus method from urllib to encode special character (must to do with web service)
query = urllib.quote_plus(query)
"""
The following if clause are differentiated by service requested Eg. 'dbpedia',..
but in all the cases url is composed using pre formatted string along with the query
"""
if service == 'dbpedia':
url = self.dbpedia_sparql_url + query + self.call_format_sparql
elif service == 'html':
url = self.html_format + query
else:
url = "ERROR"
return url
def mk_plaintext(self):
try:
h = html2text.HTML2Text()
h.ignore_images = True
h.inline_links = False
h.wrap_links = False
h.unicode_snob = True # Prevents accents removing
h.skip_internal_links = True
h.ignore_anchors = True
h.body_width = 0
h.use_automatic_links = True
h.ignore_tables = True
except html.parser.HTMLParseError as e:
raise WrongHTML(e)
return h.handle(self.mk_html())
def mk_html(self):
"""Simply calls configured html template filters
See settings.CAMPAIGNS['HTML_TEMPLATE_FILTERS']
"""
# Doctype gets frequently removed by content filters, so we save
# it...
doc = lxml.etree.HTML(self.html)
doctype = ''
if doc is not None:
doctype = doc.getroottree().docinfo.doctype
# ... we process content...
mangled_content = post_template_html_generation.process(
self.html,
detach_images=self.detach_images,
organization=self.author.organization)
# And we re-inject it
return '{}\n{}'.format(doctype, mangled_content)
def handle_images(html, detach_images=False, organization=None, **kwargs):
""" Detach base64 images and others if detach_images is enabled
"""
tree = lxml.html.fromstring(html)
for img in tree.cssselect('img'):
try:
src = img.attrib['src']
except KeyError:
raise WrongHTML('<img> devrait avoir un attribut "src"')
if src.startswith('data:image/'):
# TODO: handle ValueError
image = InlineImage(src, organization=organization)
url = image.store()
img.set('src', url)
else:
if detach_images and organization:
image = HostedImage(src, organization=organization)
url = image.store()
img.set('src', url)
return lxml.html.tostring(tree).decode()
def crawl(self, url, base_url):
"""Crawl .html page and extract all URls we think are part of application from there.
Parallerize downloads using threads.
"""
resp = requests.get(url)
# See through redirects
final_base_url = resp.url
tree = lxml.html.fromstring(resp.content)
elems = tree.cssselect("a")
links = [urljoin(final_base_url, elem.attrib.get("href", "")) for elem in elems]
links = [link for link in links if is_likely_app_part(link, base_url)]
# Load all links paraller
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(self.fetch_file, link, base_url): link for link in links}
for future in concurrent.futures.as_completed(future_to_url):
future.result() # Raise exception in main thread if bad stuff happened
def test_create_content_good(self, html_mock):
url = 'https://talkpython.fm.mock/episodes/all'
responses.add(responses.GET, url,
body=html_mock, status=200,
content_type='text/html')
request = Request(
[1, 2],
'Mozilla/5.0 (Windows NT 10.0; WOW64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/53.0.2785.116 Safari/537.36 OPR/40.0.2308.81',
)
html = request.receive_html(url)
content = Content()
content.set_content(html)
assert isinstance(content.get_content(), lxml.html.HtmlElement)
def get_sessions():
"""
Fetch and parse the schedule HTML from the NICAR webpage.
"""
html = fix_encoding(requests.get(SCHEDULE_URL).content)
dom = lxml.html.fromstring(html)
day_els = dom.cssselect("ul.listview.pane")
days_zipped = zip(day_els, DATES)
sessions_nested = [ parse_day(el, date) for el, date in days_zipped ]
sessions = itertools.chain.from_iterable(sessions_nested)
return list(sorted(sessions, key=itemgetter(
"date",
"time_start",
"time_end",
"title"
)))
report_generator.py 文件源码
项目:New-vulnerable-report-scraper-
作者: shamlikt
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def scrape_kb_crt(self, url):
''' This method is used for parsing www.kb.cert.or'''
data = self.get_html_data(url)
lists = data.find(id="list-of-vuls").find_all("li") # Selecting list of valuns from https://www.kb.cert.org/vuls/
for li in lists:
temp_data = deepcopy(self.value) # creating copy of self.value
temp_data['val_name'] = li.find("span", class_="vul-title truncate").text # parsing name using class name of span
date = li.find("span", class_="vul-date").text # parsing published using class name of span
temp_data['date'] = datetime.strptime(date, '%d %b %Y').date()
page_link = "{}{}".format(url.strip('/vuls/'),li.a['href']) # Creating link address
temp_data['link'] = page_link
new_data = self.get_html_data(page_link).find(id="vulnerability-note-content") # fetching link data and selecting a specific div using id
temp_data['description'] = new_data.p.text
temp_data['solution'] = new_data.find_all("table")[2].find("tr").text # selecting solution part from html page using 'tr' tabs
self.data.append(temp_data) # appending temp data info to class variable called self.data
temp_data['severity'] = "Medium"
temp_data['affected'] = "Please find description"
report_generator.py 文件源码
项目:New-vulnerable-report-scraper-
作者: shamlikt
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def scrape_fortinet(self, url):
# ''' This method is used for parsing http://www.fortiguard.com/psirt'''
data_fn = self.get_html_data(url) # souping
advisory_fn = data_fn.find('div', class_ ="results") # identifying the required tagset
section_fn = advisory_fn.find_all('div', class_ ="title")
for list in section_fn:
temp_data_fn = deepcopy(self.value)
temp_data_fn['val_name'] = list.text.strip()
page_link_fn = "{}{}".format(url.strip('/psirt/'),list.a['href'])
temp_data_fn['link'] = page_link_fn
new_data_fn = self.get_html_data(page_link_fn)
temp_data_fn['description'] = new_data_fn.find_all('div', class_="detail-item")[1].html.body.p.text.strip()
new_table_fn = new_data_fn.find('table', class_="table table-responsive table-borderless")
date = new_table_fn.find_all('tr')[1].find_all('td')[1].text.strip()
temp_data_fn['date'] = datetime.strptime(date, '%b %d, %Y').date()
temp_data_fn['severity'] = "Medium"
temp_data_fn['affected'] = "Please find description"
temp_data_fn['solution'] = "Information not available in website"
self.data.append(temp_data_fn) # appending temp data info to class variable called self.data
report_generator.py 文件源码
项目:New-vulnerable-report-scraper-
作者: shamlikt
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def scrape_cisco(self, url):
# Scraping the Ajax page (Identified the json call)
ajax_data = get("https://tools.cisco.com/security/center/publicationService.x?criteria=exact&cves=&keyword=&last_published_date=&limit=30&offset=0&publicationTypeIDs=1,3&securityImpactRatings=&sort=-day_sir&title=").text
json_data = json.loads(ajax_data) #convert to json (Type: List of dicts)
for dictionary in json_data[:9]:
temp_data_ci = deepcopy(self.value)
temp_data_ci['val_name'] = dictionary['title']
temp_data_ci['severity'] = dictionary['severity']
temp_data_ci['date'] = self.convert_cisco_date(dictionary['firstPublished']) # skip all updates and include only new advisories
page_link_ci = dictionary['url']
temp_data_ci['link'] = page_link_ci
# Scraping the CSS part
css_data = get(page_link_ci)
css_tree = lxml.html.fromstring(css_data.text) # build the DOM Tree
sel = CSSSelector('meta') # construct a CSS Selector
results = sel(css_tree) # Apply the selector to the DOM tree.
match = results[38] # copy the list for the 38th result.
temp_data_ci['description'] = match.get('content') # get the content attribute for the 38th result.
new_data_ci = self.get_html_data(page_link_ci)
temp_data_ci['affected'] = new_data_ci.find('div', class_="ud-innercontent-area", id="vulnerableproducts").text.strip()
temp_data_ci['solution'] = new_data_ci.find('div', class_="ud-innercontent-area", id="workaroundsfield").text.strip()
# temp_data_ci['solution'] = new_data_ci.find('div', class_="ud-innercontent-area", id="fixedsoftfield",).text.strip() #alternate
self.data.append(temp_data_ci) # appending temp data info to class variable called self.data
def open(self, url, timeout=60):
"""Wait for download to complete and return result"""
loop = QEventLoop()
timer = QTimer()
timer.setSingleShot(True)
timer.timeout.connect(loop.quit)
self.loadFinished.connect(loop.quit)
self.load(QUrl(url))
timer.start(timeout * 1000)
loop.exec_() # delay here until download finished
if timer.isActive():
# downloaded successfully
timer.stop()
return self.html()
else:
# timed out
print 'Request timed out:', url
def childNodesWithText(self, node):
root = node
# create the first text node
# if we have some text in the node
if root.text:
t = lxml.html.HtmlElement()
t.text = root.text
t.tag = 'text'
root.text = None
root.insert(0, t)
# loop childs
for c, n in enumerate(list(root)):
idx = root.index(n)
# don't process texts nodes
if n.tag == 'text':
continue
# create a text node for tail
if n.tail:
t = self.createElement(tag='text', text=n.tail, tail=None)
root.insert(idx + 1, t)
return list(root)
def get_related_document_ids(kamervraag_url):
logger.info('get related antwoord id for url: ' + kamervraag_url)
page = requests.get(kamervraag_url, timeout=60)
tree = lxml.html.fromstring(page.content)
relations_titles = tree.xpath('//div[@id="main-column"]//h2[@class="divisiekop1"]')
overheidnl_document_ids = []
for title_element in relations_titles:
if title_element.text_content() == "Relaties":
column_elements = title_element.getparent().xpath('//tr/td/p')
next_is_antwoord_url = False
for column_element in column_elements:
if next_is_antwoord_url:
overheidnl_document_ids.append(column_element.text_content())
next_is_antwoord_url = False
if column_element.text_content() == 'is beantwoord in':
next_is_antwoord_url = True
return overheidnl_document_ids
def get_kamervraag_document_id_and_content(url):
logger.info('get kamervraag document id and content for url: ' + url)
page = requests.get(url, timeout=60)
tree = lxml.html.fromstring(page.content)
elements = tree.xpath('//ul/li/a[@id="technischeInfoHyperlink"]')
if elements:
document_id = elements[0].get('href').split('/')[-1]
else:
elements = tree.xpath('/html/head/meta[@name="dcterms.identifier"]')
if not elements:
return None, '', ''
document_id = elements[0].get('content')
logger.info('document id: ' + document_id)
content_html = ''
if tree.xpath('//div[@id="main-column"]'):
content_html = lxml.etree.tostring(tree.xpath('//div[@id="main-column"]')[0])
titles = tree.xpath('//h1[@class="kamervraagomschrijving_kop no-toc"]')
title = ''
if titles:
title = titles[0].text_content()
title = re.sub('\s{2,}', ' ', title).strip()
return document_id, content_html, title
def Main():
output_path = "lol.html"
config_file = "config.ini"
config = POFSession.Config( config_file )
testSession = POFSession(config)
testSession.login(config.username, config.password)
galleryData = list()
users = testSession.searchUsers(config, 100, online_only=True)
print("Search complete.")
for user in users:
photos = testSession.getPhotos(user)
galleryDataEntry = UserGalleyDataEntry(user, photos)
galleryData.append(galleryDataEntry)
html_doc = generate_html_gallery( galleryData )
save_gallery_to_file( output_path, html_doc )
open_gallery( output_path )
def items(self, task, response):
items = []
document = lxml.html.document_fromstring(html=response.text)
products = document.xpath("//div[@class='product']")
for product in products:
iid = int(product.xpath(".//@product-id")[0])
name = product.xpath(".//h2/text()")[0]
desc = product.xpath(".//p/text()")[0]
category = product.xpath(".//span/text()")[0]
price = float(product.xpath(".//em/text()")[0])
images = product.xpath(".//div//img/@src")
item = Product(
iid=iid,
url=response.url,
name=name,
category=category,
desc=desc,
price=price,
images=images,
)
items.append(item)
return items
browser_render.py 文件源码
项目:Python-Web-Scraping-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def open(self, url, timeout=60):
"""Wait for download to complete and return result"""
loop = QEventLoop()
timer = QTimer()
timer.setSingleShot(True)
timer.timeout.connect(loop.quit)
self.loadFinished.connect(loop.quit)
self.load(QUrl(url))
timer.start(timeout * 1000)
loop.exec_() # delay here until download finished
if timer.isActive():
# downloaded successfully
timer.stop()
return self.html()
else:
# timed out
print 'Request timed out:', url
def get_my_content(r):
"""
the return from the server in vk is not a standard HTML.
this is why we must cut it up and cant use the regular
'get_real_content' helper.
"""
assert r.status_code == 200
# str_content=r.content.decode(errors='ignore')
try:
content = r.content # type: bytes
str_content = content.decode(errors='ignore')
except Exception as e:
print(e)
print('could not decode')
print(r.content)
sys.exit(1)
str_content = str_content[str_content.find('<input'):]
c = str.encode('<html><body>')+str.encode(str_content)+str.encode('</body></html>')
root = lxml.html.fromstring(c)
return root
def get_full_answer(url):
print(url)
page = lxml.html.document_fromstring(urllib.request.urlopen(url).read().decode("gbk"))
best = page.xpath("//pre[contains(@class, 'best-text mb-10')]")
common = page.xpath("//meta[contains(@name, 'description')]")
if len(best) >= 1:
best = best[0].text_content()
else:
if len(common) >= 1:
best = common[0].text_content()
else:
best = "???????"
return best
#############################################################
### web server