def main():
for url in url_list :
try:
r = requests.get(url)
except : continue
tree = html.fromstring(r.text)
script = tree.xpath('//script[@language="javascript"]/text()')[0]
json_string = regex.findall(script)[0]
json_data = json.loads(json_string)
next_page_url = tree.xpath('//footer/a/@href')
links = [domain + x['nodeRef'] for x in json_data]
for link in links:
extract(link)
python类fromstring()的实例源码
def slack(text: hug.types.text):
"""Returns JSON containing an attachment with an image url for the Slack integration"""
title = text
if text == 'top250':
top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'})
top250_page = html.fromstring(top250_res.text)
candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a')
title = random.choice(candidates).text
return dict(
response_type='in_channel',
attachments=[
dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}')
]
)
def sns_notification(body):
json_body = body.decode('utf8')
js = json.loads(json_body.replace('\n', ''))
if js["Type"] == "Notification":
arg_info = js["Message"]
arg_info = json.loads(arg_info)
content = arg_info['content']
subject = arg_info['mail']['commonHeaders']['subject']
html_content = content.partition('Content-Type: text/html; charset=UTF-8')[2]
if 'Content-Transfer-Encoding' in html_content:
html_content = html_content.partition('Content-Transfer-Encoding: quoted-printable')[2]
text = html_content.replace('\r\n', '')
table = html.fromstring(text)
content = ''
for item in table:
if item.text:
content += item.text.strip()
mail_content = str(content)
from_mail = arg_info['mail']['source']
to_mail = arg_info['mail']['destination'][0]
hash_code = arg_info['mail']['destination'][0].split('@')[0]
return subject, from_mail, to_mail, hash_code, mail_content
def scrape_mtgs_images(url='http://www.mtgsalvation.com/spoilers/183-hour-of-devastation', mtgscardurl='http://www.mtgsalvation.com/cards/hour-of-devastation/', exemptlist=[]):
page = requests.get(url)
tree = html.fromstring(page.content)
cards = {}
cardstree = tree.xpath('//*[contains(@class, "log-card")]')
for child in cardstree:
if child.text in exemptlist:
continue
childurl = mtgscardurl + child.attrib['data-card-id'] + '-' + child.text.replace(
' ', '-').replace("'", "").replace(',', '').replace('-//', '')
cardpage = requests.get(childurl)
tree = html.fromstring(cardpage.content)
cardtree = tree.xpath('//img[contains(@class, "card-spoiler-image")]')
try:
cardurl = cardtree[0].attrib['src']
except:
cardurl = ''
pass
cards[child.text] = {
"url": cardurl
}
time.sleep(.2)
return cards
def scrape_masterpieces(url='http://www.mtgsalvation.com/spoilers/181-amonkhet-invocations', mtgscardurl='http://www.mtgsalvation.com/cards/amonkhet-invocations/'):
page = requests.get(url)
tree = html.fromstring(page.content)
cards = []
cardstree = tree.xpath('//*[contains(@class, "log-card")]')
for child in cardstree:
childurl = mtgscardurl + \
child.attrib['data-card-id'] + '-' + child.text.replace(' ', '-')
cardpage = requests.get(childurl)
tree = html.fromstring(cardpage.content)
cardtree = tree.xpath('//img[contains(@class, "card-spoiler-image")]')
try:
cardurl = cardtree[0].attrib['src']
except:
cardurl = ''
pass
card = {
"name": child.text,
"url": cardurl
}
cards.append(card)
return cards
def parse_lista_diputados(response):
tree = fromstring(response.content)
# listado de diputados
diputados = tree.xpath('//div[@class="listado_1"]/ul/li/a/@href')
for diputado in diputados:
diputado_url = urljoin(response.url, diputado)
response = requests.get(diputado_url)
parse_diputado(response)
# proxima pagina
pagina_siguiente = tree.xpath('//a[contains(., "Página Siguiente")]/@href')
if pagina_siguiente:
pagina_siguiente_url = pagina_siguiente[0]
response = requests.get(pagina_siguiente_url)
parse_lista_diputados(response)
def set_vokrugsveta_wallpaper():
try:
r = requests.get(URL04)
if r.status_code == 200:
doc = fromstring(r.text)
results = doc.cssselect('a.article__pic')
url = 'http://www.vokrugsveta.ru/' + results[0].get('href')
print(url)
r = requests.get(url, stream=True)
if r.status_code == 200:
doc = fromstring(r.text)
results = doc.cssselect('img')
for index, result in enumerate(results):
print(index, result.get('src'))
i_url = 'http://www.vokrugsveta.ru/' + results[2].get('src')
if download(i_url) is True:
set_background(comun.POTD)
print(url)
except Exception as e:
print(e)
def get_searx_version(response_container):
response_html = response_container.content.decode()
try:
dom = html.fromstring(response_html)
except etree.XMLSyntaxError:
# not a valid HTML document
# TODO workaround with regex ?
return ''
searx_full_version = extract_text_from_dom(dom, "/html/head/meta[@name='generator']/@content")
if searx_full_version is None:
searx_version = ''
else:
s = searx_full_version.split('/')
if len(s) == 2:
searx_version = s[1]
else:
searx_version = searx_full_version
return searx_version
def extract_news(news_url):
# Fetch html
session_requests = requests.session()
response = session_requests.get(news_url, headers=getHeaders())
news = {}
try:
# Parse html
tree = html.fromstring(response.content)
# Extract information
news = tree.xpath(GET_CNN_NEWS_XPATH)
news = ''.join(news)
except Exception as e:
print # coding=utf-8
return {}
return news
def get(self, user_id):
r = requests.get('http://grouple.co/user/%s/bookmarks' % user_id)
tree = html.fromstring(r.text)
tds = tree.xpath('//table')[0].xpath('//tr')[1:]
mangas = list()
for o in tds:
item = o.xpath('.//a')[0]
manga = {
'name': item.xpath('./text()')[0],
'path': item.xpath('./@href')[0],
'summary': item.xpath('./@title')[0].split(': ', 1)[-1]
}
item = item.xpath('../a')[1]
manga.update({
'preview': item.xpath('./@rel')[0],
'id': java_hash_code(manga['path']),
'provider': provider_name(manga['path'])
})
if manga['provider'] is not None:
mangas.append(manga)
return {'all': mangas}
def procura_emprego():
busca = raw_input("[+] - Digite o nome da vaga ou uma palavra-chave: ").replace(' ','+').lower()
url = "http://empregacampinas.com.br/page/1/?s="+busca
#prox_pagina = 0
while True:
try:
r = requests.get(url, timeout=2)
tree = html.fromstring(r.content)
vagas = tree.xpath('//*[@id="article"]/div/div/div/div/a/h2/text()')
link = tree.xpath('//*[@id="article"]/div/div/div/div/a[@title]/@href')
if len(vagas) > 1:
qtd_vagas = len(vagas) - 1
else:
qtd_vagas = len(vagas)
pagina = url.split('/')[4]
info_vaga(qtd_vagas,pagina,vagas,link)
#PEGA NOVA URL
url = tree.xpath('//*[@class="nextpostslink"]/@href')[0]
except:
menu()
def scrap_twitlonger(twitlonger):
'''
Takes a twitlonger post ID, scraps the body of the post
and then returns a string depending on the contents of
the post. If the hour is stated in said post, it's added
If it's not, then it's implied it's current time.
Note to self: Implement GMT - whatever our president
decides to change it to.
'''
page = requests.get('http://www.twitlonger.com/show/%s' %twitlonger)
tree = html.fromstring(page.content)
texto = tree.xpath('/html/body/div[2]/div[1]/div[3]/div/p[1]/text()')
hora = re.search('[0-9]+:[0-9]+',texto[0])
circuitos = texto[0].split(str('detallados a continuación: ').decode('utf-8'))[1].split(str(' #ElNiñoNoEsJuego').decode('utf-8'))[0]
if hora:
return "La luz se ira a las " + hora.group(0) + " en " + circuitos
else:
hora = re.search('En momentos',texto[0])
if hora:
return "La luz se ira a las " + str(datetime.datetime.now().time()) + " en " + circuitos
def lxml_test():
url = "http://www.caixunzz.com"
req = urllib2.Request(url=url)
resp = urllib2.urlopen(req)
#print resp.read()
'''
parse_body=html.fromstring(resp.read())
href=parse_body.xpath('//a[@class="label"]/@href')
print href
#not working from above
'''
tree = etree.HTML(resp.read())
href = tree.xpath('//a[@class="label"]/@href')
#print href.tag
for i in href:
#print html.tostring(i)
#print type(i)
print i
print type(href)
#not working yet
def scrape_url(url):
#url = 'api-ref-compute-v2.1.html'
page = requests.get('http://developer.openstack.org/' + url)
tree = html.fromstring(page.content)
#Create a list of HTTP verbs
verbs = tree.xpath('//a[@class="operation-anchor"]/following::span[1]/text()')
operations = tree.xpath('//a[@class="operation-anchor"]/following::div[1]/text()')
#Match up Verbs and Operations and output a printed list
methods = zip(verbs, operations)
print len(verbs)
print len(operations)
if len(verbs) == len(operations):
for verbs, operations in methods:
print verbs + ' ' + operations
else:
print "Number of verbs doesn't match number of operations for ", page.url
def __init__(self):
self.name = SOURCE_NAME
_file, r = conf.getFeedData(SOURCE_NAME, SOURCE_FILE, unpack=False)
zipobj = zipfile.ZipFile(BytesIO(_file))
self.cves = defaultdict(dict)
for filename in zipobj.namelist():
with zipobj.open(filename) as infile:
page = fromstring(infile.read().decode("utf-8"))
vendor = page.xpath("//table[1]//tr[1]//td[2]")
if vendor: vendor = vendor[0].text.lower()
rows = page.xpath("//table[2]//tr//td")
# CVE - Source ID
IDs = [[rows[i].text, [x.text for x in rows[i+1].iterchildren()]] for i in range(0, len(rows), 2)]
for e in IDs:
vendorID = e[0] if not e[0].startswith(vendor.upper()+':') else e[0][len(vendor)+1:]
for cve in e[1]:
if vendor not in self.cves[cve]: self.cves[cve][vendor] = []
if vendorID not in self.cves[cve][vendor]: self.cves[cve][vendor].append(vendorID)
def get_html_text(url):
response = requests.get(url)
origin_text = response.text
origin_text = re.sub(r'<script.*?>.*?</script>', '', origin_text, flags=re.I | re.M | re.DOTALL)
origin_text = re.sub(r'<style.*?>.*?</style>', '', origin_text, flags=re.I | re.M | re.DOTALL)
doc = html.fromstring(origin_text)
text = doc.xpath('//body//text()')
text = [i.strip() for i in text if i.strip()]
text = ' '.join(text)
seg = jieba.cut(text)
stopwords = read_stopwords('./utils/stopwords.txt') # callable read_stopwords()
seg = [i.strip() for i in seg if i.strip() and not i.strip().isdigit()
and i.strip() not in stopwords]
seg = ' '.join(seg)
return seg
def _get_quotes(self):
'''Gets book's quote data'''
if self._page_source is None:
return
quotes_page = self._page_source.xpath('//a[@class="actionLink" and contains(., "More quotes")]')
quotes = []
if len(quotes_page) > 0:
resp = open_url(self._connection, quotes_page[0].get('href'))
if not resp:
return
quotes_page = html.fromstring(resp)
if quotes_page is None:
return
for quote in quotes_page.xpath('//div[@class="quoteText"]'):
quotes.append(re.sub(r'\s+', ' ', quote.text).strip().decode('ascii', 'ignore'))
else:
for quote in self._page_source.xpath('//div[@class=" clearFloats bigBox" and contains(., "Quotes from")]//div[@class="bigBoxContent containerWithHeaderContent"]//span[@class="readable"]'):
quotes.append(re.sub(r'\s+', ' ', quote.text).strip().decode('ascii', 'ignore'))
return quotes
def _get_book_info_from_tooltips(self, book_info):
'''Gets books ASIN, title, authors, image url, description, and rating information'''
if isinstance(book_info, tuple):
book_info = [book_info]
books_data = []
link_pattern = 'resources[Book.{0}][type]=Book&resources[Book.{0}][id]={0}'
tooltips_page_url = '/tooltips?' + "&".join([link_pattern.format(book_id) for book_id, image_url in book_info])
tooltips_page_info = json.loads(open_url(self._connection, tooltips_page_url))['tooltips']
for book_id, image_url in book_info:
book_data = tooltips_page_info['Book.{0}'.format(book_id)]
if not book_data:
continue
book_data = html.fromstring(book_data)
parsed_data = self._parse_tooltip_info(book_data, book_id, image_url)
if not parsed_data:
continue
books_data.append(parsed_data)
return books_data
def __call__(self, doc, encoding='UTF-8'):
if isinstance(doc,
(str, bytes)):
doc = fromstring(bytes(bytearray(doc,
encoding=encoding)),
parser=HTMLParser(encoding=encoding))
if not isinstance(doc,
HtmlElement):
return None
for cls in self.EXTRACTORS:
extract = cls()
tags_ = extract(doc)
if tags_:
tags = []
for idx, tag in enumerate(tags_):
if idx < 2 and len(tag) > 16:
break
elif len(tag) < 16:
tags.append(tag)
else:
if tags:
logger.info('TagExtractor got tags %s',
tags)
return tags
def walkListItems(sess, url):
try:
global visited
def replacewhite(text):
return re.sub(r'(\ |\r|\n|\t)+', ' ', text)
resp = sess.get(url=url)
root = html.fromstring(resp.text)
tds = root.xpath(".//*[@class='kboard-list']//tr/td[2]")
for td in tds:
href = td.xpath(".//a")[0].attrib['href']
href = urljoin(url, href)
href = re.sub(r'pageid=\d+', '', href)
if href in visited:
continue
text = re.sub(r'(\ |\r|\n|\t)+', ' ', td.text_content())
if '???' not in text:
continue
print(text)
visited[href] = (text)
walkPageItem(sess, href, text)
except BaseException as ex:
traceback.print_exc()
print(ex)
def walkNextPages(sess, url="https://iptime.com/iptime/?page_id=126&dffid=1&dfsid=11"):
try:
from os.path import basename
def get_pageid(url):
from urllib.parse import parse_qsl, urlsplit
qs = dict(parse_qsl(urlsplit(url).query))
return int(qs.get("pageid", "1"))
while True:
pageid = get_pageid(url)
print("pageid=%d" % pageid)
walkListItems(sess, url)
root = html.fromstring(sess.get(url=url).text)
arrows = [basename(_) for _ in root.xpath(".//ul[@class='pages']//img/@src")]
if 'next_1.gif' not in arrows:
break
nexturl = next(_ for _ in root.xpath(".//ul[@class='pages']//img") if
basename(_.attrib['src']) == 'next_1.gif')
url = urljoin(url, nexturl.xpath("../../a/@href")[0])
nextpageid = get_pageid(url)
assert nextpageid == pageid+1
except BaseException as ex:
traceback.print_exc()
print(ex)
def main():
global executor
try:
session = requests.Session()
executor = ThreadPoolExecutor()
os.makedirs(dlDir, exist_ok=True)
url = 'http://www.zyxel.com/us/en/support/download_landing.shtml'
with open('zyxel_us_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
resp = session.get(url=url)
root = html.fromstring(resp.text)
models = get_all_models(root)
for modelName in sorted(models.keys()):
kbid = models[modelName]
resp2 = session.get(url='http://www.zyxel.com/us/en/support/DownloadLandingSR.shtml',
params=dict(c="us", l="en", kbid=kbid, md=modelName))
walkFiles(modelName, session, resp2)
except BaseException as ex:
traceback.print_exc()
finally:
print('Wait for exeuctor shuddown')
executor.shutdown(True)
def main():
global executor
try:
session = requests.Session()
executor = ThreadPoolExecutor()
os.makedirs(dlDir, exist_ok=True)
url='http://downloadcenter.netgear.com'
with open('netgear_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5'])
response = session.get(url=url)
root = html.fromstring(response.text)
href = root.xpath(".//a[@id='ctl00_ctl00_ctl00_mainContent_localizedContent_bodyCenter_BasicSearchPanel_btnAdvancedSearch']/@href")
href = strip_js(href[0])
formdata = {"__EVENTTARGET": href}
resp2 = form_submit(session, root, url,
"aspnetForm",
formdata,
{"Referer": url})
walkCategories(session, resp2)
except BaseException as ex:
traceback.print_exc()
finally:
executor.shutdown(True)
def walkCategories(session, response):
try:
root = html.fromstring(response.text)
url = response.url
categories = root.xpath(".//select[@name='ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProductCategory']/option")
global startCat
for iCat, category in enumerate(categories[startCat:], startCat):
startCat=0
rsrc = category.xpath("./@value")[0]
text = category.xpath(".//text()")[0]
print('Category="%s", iCat=%d'%(text, iCat))
formdata= {"__EVENTTARGET": "ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProductCategory",
"ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProductCategory": rsrc,
"__ASYNCPOST:": "true"}
resp2 = form_submit(session, root, url,
"aspnetForm",
formdata,
{"Referer": url})
if not resp2:
continue
walkFamilies(session, resp2)
except BaseException as ex:
print('iCat=%d, cat="%s"'%(iCat, text))
traceback.print_exc()
def walkProducts(session, response):
try:
root = html.fromstring(response.text)
products = root.xpath("//select[@name='ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProduct']/option")
url = response.url
global startProd
for iProd, product in enumerate(products[startProd:], startProd):
startProd=0
rsrc = product.xpath("./@value")[0]
text = product.xpath(".//text()")[0]
print('Product="%s", iProd=%d'%(text, iProd))
formdata={"__EVENTTARGET": "ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProduct",
"ctl00$ctl00$ctl00$mainContent$localizedContent$bodyCenter$adsPanel$lbProduct": rsrc,
"__ASYNCPOST:": "true"}
resp2 = form_submit(session, root, url,
"aspnetForm",
formdata,
{"Referer": url})
if not resp2:
print('Ignored iProd=%d, product="%s"'%(iProd, text))
continue
walkFirmwares(resp2, product)
except BaseException as ex:
print('Error iProd=%d, product="%s"'%(iProd, text))
traceback.print_exc()
def walkFirmwares(response, product):
try:
root = html.fromstring(response.text)
firmwares = root.xpath("//div[@id='LargeFirmware']//a")
for iFirm, firmware in enumerate(firmwares):
text = firmware.xpath(".//text()")
if "firmware" in " ".join(text).lower():
# print('Firmware="%s", iFirmware=%d'%(text, iFirm))
desc = text[0]
href = firmware.xpath("./@data-durl")
if not href:
href = firmware.xpath("./@href")
url = href[0]
model = product.xpath(".//text()")[0]
print('model="%s", desc="%s", url=%s'%(model, desc, url))
global executor, visited
if url in visited:
continue
visited[url] = (model,desc)
executor.submit(download_file, model, desc, url)
except BaseException as ex:
traceback.print_exc()
def main():
global executor
try:
session = requests.Session()
executor = ThreadPoolExecutor()
os.makedirs(dlDir, exist_ok=True)
url = 'http://support.netgear.cn/'
with open('netgear_cn_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
resp = session.get(url=url)
root = html.fromstring(resp.text)
startProd = 1
prods = root.xpath(".//select[@name='select']/option")
for iProd, prod in enumerate(prods[startProd:], startProd):
# prodText = prod.xpath("./text()")[0].strip()
prodUrl = prod.xpath("./@value")[0].strip()
walkProd(session, urljoin(resp.url, prodUrl))
except BaseException as ex:
traceback.print_exc()
finally:
print('Wait for exeuctor shuddown')
executor.shutdown(True)
def get_list():
os.system('clear')
print "Liste aliniyor..."
worst_response = requests.get(worst_page)
worst_tree = LH.fromstring(worst_response.content)
for atag in worst_tree.xpath(worst_list):
details_response = requests.get(worst_page + atag.attrib['href'])
details_tree = LH.fromstring(details_response.content)
for vuln in details_tree.xpath(heartbleed):
if vuln.text_content().startswith('Yes'):
print WARNING + worst_page + atag.attrib['href'] + ENDC
elif vuln.text_content().startswith('No'):
print worst_page + atag.attrib['href']
else:
print FAIL + worst_page + atag.attrib['href'] + ENDC
def get_corresponding_author_info(self):
"""Try to get corresponding author information.
Returns (scopus-id, name, email).
"""
resp = requests.get(self.scopus_link)
from lxml import html
parsed_doc = html.fromstring(resp.content)
for div in parsed_doc.body.xpath('.//div'):
for a in div.xpath('a'):
if '/cdn-cgi/l/email-protection' in a.get('href', ''):
encoded_text = a.attrib['href'].replace('/cdn-cgi/l/email-protection#', '')
key = int(encoded_text[0:2], 16)
email = ''.join([chr(int('0x{}'.format(x), 16) ^ key)
for x in
map(''.join, zip(*[iter(encoded_text[2:])]*2))])
for aa in div.xpath('a'):
if 'http://www.scopus.com/authid/detail.url' in aa.get('href', ''):
scopus_url = aa.attrib['href']
name = aa.text
else:
scopus_url, name = None, None
return (scopus_url, name, email)
def osu(cmd, message, args):
if args:
osu_input = '%20'.join(args)
try:
profile_url = 'https://osu.ppy.sh/u/' + osu_input
async with aiohttp.ClientSession() as session:
async with session.get(profile_url) as data:
page = await data.text()
root = html.fromstring(page)
username = root.cssselect('.profile-username')[0].text[:-1]
user_color = str(message.author.color)[1:]
sig_url = f'https://lemmmy.pw/osusig/sig.php?colour=hex{user_color}&uname={osu_input}'
response = discord.Embed(color=message.author.color)
response.set_image(url=sig_url)
response.set_author(name=f'{username}\'s osu! Profile', url=profile_url, icon_url=osu_logo)
except IndexError:
response = discord.Embed(color=0xBE1931, title='? Unable to retrieve profile.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing inputted.')
await message.channel.send(None, embed=response)