def files(self):
if not self._files:
path = '/ajax_details_filelist.php'
url = self.url.path(path).query_param('id', self.id)
request = urllib.request.Request(
url, headers={'User-Agent': "Magic Browser"})
response = urllib.request.urlopen(request).read()
root = html.document_fromstring(response)
rows = root.findall('.//tr')
if len(rows) == 1 and rows[0].find('td').get('colspan') == str(2):
self._files = {}
else:
for row in rows:
name, size = [unicode(v.text_content())
for v in row.findall('.//td')]
self._files[name] = size.replace('\xa0', ' ')
return self._files
python类document_fromstring()的实例源码
def all(self):
url = "http://www.presidency.ucsb.edu/executive_orders.php?year=%d&Submit=DISPLAY" % self.year
page = requests.get(url)
tree = html.document_fromstring(page.text)
table = tree.xpath('//form[@name="executive_orders"]')[0].getnext().xpath('tr')
output = []
for i in range(1, len(table)):
data = table[i].xpath('td')
output.append({
"president": data[0].text_content(),
"date": data[1].text_content(),
"id": data[2].xpath('a')[0].attrib['href'].split('=')[1],
"link": "http://www.presidency.ucsb.edu" + data[2].xpath('a')[0].attrib['href'][2:]
})
return output
def search_shops_on_forum(force=False):
# Get member pages
step = 500
last_page = page_number = (Member.objects.aggregate(Max('page_number')) and not force) or 1
page_url = 'http://www.prestashop.com/forums/members/page__sort_key__members_display_name__sort_order__asc__max_results__%d__st__%d' % (step, (last_page-1)*step)
while page_url:
page = document_fromstring(urllib2.urlopen(page_url).read())
for member in page.cssselect('ul.members li h3.bar a:first'):
# member url
Member.objects.get_or_create(link=member.get('href'), defaults={'page_number':page_number})
page_url = page.cssselect('ul.pagination.left li.next a').get('href')
page_number+=1
for member in Member.objects.filter(page_number__gte=last_page):
member_page = document_fromstring(urllib2.urlopen(member.link).read())
for link in member_page.cssselect('div.general_box div.signature a'):
ShopLink.objects.get_or_create(link=link.get('href'), member=member)
def search_shops_on_rus_forum(force=False):
last_page = (MemberRus.objects.aggregate(Max('page_number')) and not force) or 1
for i in range(last_page, 4219):
page_url = 'http://prestadev.ru/forum/profile.php?u='+str(i)
page = document_fromstring(urllib2.urlopen(page_url).read())
messages = 0
try:
messages = int(page.cssselect('div.wttborder td strong')[2].text.strip())
except:
pass
try:
params = {'title': page.cssselect('#profilename')[0].text.strip(),
'messages': messages,
'page_number': i,
'home_page':page.cssselect('div.wttborder td.row1')[4]}
except IndexError:
continue
member = MemberRus.objects.get_or_create(**params)[0]
for link in page.cssselect('div.wgborder td.row1 a'):
ShopLinkRus.objects.get_or_create(link=link.get('href'), member=member)
def __init__(self, file_name, user_id):
with open(file_name, 'r') as self.opened_file:
# So Instapaper doesn't close <li> tags
# This was causing infinite recursion when using BS directly
# Hence why the stuff below is being done, so that the <li> tags get closed
self.html = html.document_fromstring(self.opened_file.read())
self.html = html.tostring(self.html)
self.soup = BeautifulSoup4(self.html)
self.user = user_id
self.urls = dict()
self.check_duplicates = dict()
self.check_duplicates_query = Bookmark.query.filter(Bookmark.user == self.user,
Bookmark.deleted == False).all()
for bmark in self.check_duplicates_query:
self.check_duplicates[bmark.main_url] = bmark
self.tags_dict = dict()
self.tags_set = set()
self.valid_url = re.compile(
r'^(?:[a-z0-9\.\-]*)://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}(?<!-)\.?)|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|'
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
def get_apikey(self, header=None):
"""
Retrieve and sets a new apikey.
:param header: a custom header for retrieving the apikey.
"""
self.header = copy.deepcopy(self.DEFAULTHEADER)
if header is None:
header = self.APIKEYHEADER
response = requests.get('http://www.gutefrage.net/frage_hinzufuegen', headers=header)
self.apikey = re.search(
"key: '([^']+)'",
html.document_fromstring(response.text).xpath('//script[1]')[0].text
).group(1)
self.header['X-Api-Key'] = self.apikey
return self.apikey
#TODO: rework this function eventually
def items(self):
"""
Request URL and parse response. Yield a ``Torrent`` for every torrent
on page.
"""
request = urllib.request.Request(
self.url, headers={'User-Agent': "Magic Browser"})
response = urllib.request.urlopen(request).read()
root = html.document_fromstring(str(response))
items = [self._build_torrent(row) for row in
self._get_torrent_rows(root)]
for item in items:
yield item
def info(self):
if self._info is None:
request = urllib.request.Request(
self.url, headers={'User-Agent': "Magic Browser"})
response = urllib.request.urlopen(request).read()
root = html.document_fromstring(response)
info = root.cssselect('#details .nfo pre')[0].text_content()
self._info = info
return self._info
def scrape(self):
# Return Wikipedia page and turn into a tree.
base_url = 'https://en.wikipedia.org'
response = requests.get(base_url + '/wiki/Cabinet_of_the_United_States')
tree = html.document_fromstring(response.text)
# Get all of the rows of the Cabinet table.
rows = tree.xpath('//th[text()="Cabinet"]')[0].getparent().getparent().getchildren()
obj = []
# Iterate through all rows.
for x in rows:
# Retrieve all of the elements per row.
data = x.getchildren()
# Only look at this if we're looking at Cabinet members.
if len(data) == 3 and data[0].tag == 'td':
print(data[1].xpath('div/a'))
# Clean up data with strip.
obj.append({
"title": [x for x in data[0].text_content().split('\n') if x != ''][0],
"seal": 'https:' + data[0].xpath('a/img')[0].attrib['src'],
"img": 'https:' + data[1].xpath('a/img')[0].attrib['src'],
"name": [x for x in data[1].text_content().split('\n') if x != ''][0],
"details": base_url + data[1].xpath('div/a')[0].attrib['href'] if len(data[1].xpath('div/a')) > 0 else None,
"is_acting": (len([x for x in data[1].text_content().split('\n') if x != '']) > 1 and [x for x in data[1].text_content().split('\n') if x != ''][1] == 'Acting'),
"date_appointed": data[2].text_content(),
})
print(json.dumps(obj))
def __init__(self):
self._base_url = 'https://en.wikipedia.org'
self._response = requests.get(self._base_url + '/wiki/Political_appointments_of_Donald_Trump')
self._tree = html.document_fromstring(self._response.text)
self._congress_url = '/wiki/List_of_United_States_Senators_in_the_115th_Congress_by_seniority'
self._senators_scraper = Senators(self._congress_url)
self._senators = self._senators_scraper.scrape()
def __init__(self, url):
self._base_url = 'https://en.wikipedia.org'
self._response = requests.get(self._base_url + url)
self._tree = html.document_fromstring(self._response.text)
def all(self):
url = "http://www.presidency.ucsb.edu/debates.php"
# Retrieve all debates as tree.
page = requests.get(url)
tree = html.document_fromstring(page.text)
# List of all debate and date elements.
dates = [x for x in tree.xpath('//td[@class="docdate"]') if len(x.text_content()) > 0]
debates = tree.xpath('//td[@class="doctext"]')
# Throw error if lengths are off.
if len(dates) != len(debates):
raise Exception('Sorry - something went wrong! Please open an issue at https://github.com/jayrav13/presidency/issues and include the following timestamp: %s' % str(time.time()))
return None
# Curate list of all debates.
self.data = []
for i in range(0, len(debates)):
self.data.append({
"date" : dates[i].text_content(),
"debate" : debates[i].xpath('a')[0].text_content(),
"link" : debates[i].xpath('a')[0].attrib['href'],
"id" : int(debates[i].xpath('a')[0].attrib['href'].split('?')[1].split('=')[1])
})
return self.data
def retrieve(self):
url = 'http://www.presidency.ucsb.edu/ws/index.php?pid='
page = requests.get(url + str(self.id))
tree = html.document_fromstring(page.text)
self.data = {
"text": tree.xpath('//span[@class="displaytext"]')[0].text_content()
}
return self.data
def get(self):
page = requests.get(self.url)
self.tree = html.document_fromstring(page.text)
output = {
"text" : self.tree.xpath('//span[@class="displaytext"]')[0].text_content(),
"date": self.tree.xpath('//span[@class="docdate"]')[0].text_content(),
"title": self.tree.xpath('//title')[0].text_content(),
"id": self.id,
"url": self.url,
"president": self.tree.xpath('//title')[0].text_content().split(':')[0]
}
return output
def select_rank_rows(html):
"""Return the table rows that are expected to contain rank data."""
root = document_fromstring(html)
return root.xpath(
'body/form/table[@class="sportView"][2]/tr[position() > 1]')
def scrape(site_address):
page = requests.get(site_address) #returns raw html
page = clean_html(page.content) #removes <script> tags and their contents
document = html.document_fromstring(page) #removes all other tags
return document.text_content()
# takes a url as a string and returns a STRING of all of the words
# that are used on that webpage
def fill_login_form(url, body, username, password):
doc = html.document_fromstring(body, base_url=url)
form = _pick_form(doc.xpath('//form'))
userfield, passfield = _pick_fields(form)
form.fields[userfield] = username
form.fields[passfield] = password
hasSubmitBefore, submit_values= submit_value(form)
form_values = form.form_values()
if not hasSubmitBefore:
form_values += submit_values
return (form.form_values()+submit_values),form_values, form.action or form.base_url, form.method, _pick_fields(form)
def get_content(source):
'''
get the content from the source code page
:param source:
:return:
'''
selector = html.document_fromstring(source)
content = selector.xpath('//div[@class="readtext"]')[0]
num = content.xpath('h4/text()')
every_content = content.xpath('p/text()')
write_file(num)
for each in every_content:
write_file(each)
def select_url(url, html, fruitline_spider_variable):
if html < 10:
return []
try:
html_element = document_fromstring(urllib2.unquote(html))
html_element.make_links_absolute(url)
links = [i[2] for i in html_element.iterlinks()]
except Exception, e:
spider_logger.error("Function: select_url, Info: %s" % str(e))
return []
links_unrepeat = set()
[links_unrepeat.add(i) for i in links]
final_links = []
for i in list(links_unrepeat):
full_url = repair_url(i, fruitline_spider_variable)
if fruitline_spider_variable.filter_rule != "":
pattern = re.compile(fruitline_spider_variable.filter_rule)
if re.match(pattern, full_url):
if full_url not in fruitline_spider_variable.crawled_url_queue:
d = dict()
d['method'] = "get"
d['url'] = full_url
final_links.append(d)
else:
if full_url not in fruitline_spider_variable.crawled_url_queue:
d = dict()
d['method'] = "get"
d['url'] = full_url
final_links.append(d)
return final_links
def get_wx_article_lists(article_html,id_index):
# global article_flag
#?????
wx_article_list = []
html_tree = html.document_fromstring(article_html)
html_nodes = html_tree.xpath('//ul[@class="article-ul"]//li')
for html_node in html_nodes:
#?????????
wx_article_object = {}
html_node_children = html_node.getchildren()
#???????????????????(????????find?????????????)
div_wx_ft_children = html_node_children[1].find('div[@class="wx-ft"]').getchildren()
pub_time = div_wx_ft_children[1].text_content().strip()
pub_time = pub_time.encode('utf-8').split('?')
if len(pub_time) < 2:
print_pass_a_article(id_index,'time')
else:
pub_time = int(time.mktime(time.strptime(pub_time[1],'%Y-%m-%d %H:%M:%S')))
#????????
if pub_time <= last_time:
# article_flag = False
# print 'out of the time and return'
return wx_article_list
wx_article_object['time'] = str(pub_time)
readnum_and_likenum = re.split(r'\s',div_wx_ft_children[2].text_content().strip())
length = len(readnum_and_likenum)
if length < 2:
print_pass_a_article(id_index,'readnum_and_likenum')
readnum = str(readnum_and_likenum[0]).strip()
wx_article_object['readnum'] = str(int(readnum))
likenum = str(readnum_and_likenum[length-1]).strip()
wx_article_object['likenum'] = str(int(likenum))
div_wx_ft_h4 = html_node_children[1].find('h4')
title = div_wx_ft_h4.find('a').text_content()
if not title:
print_pass_a_article(id_index,'title')
wx_article_object['title'] = title
content = div_wx_ft_h4.getnext().text_content()
if not content:
print_pass_a_article(id_index,'content')
wx_article_object['content'] = content
#url?img-data-hash
div_wx_img_a = html_node_children[0].find('a')
url = div_wx_img_a.get('href')
if not url:
print_pass_a_article(id_index,'url')
wx_article_object['url'] = url
img_hash = div_wx_img_a.find('img').get('data-hash')
if not img_hash:
print_pass_a_article(id_index,'img-hash')
wx_article_object['imglink'] = get_img_link(img_hash)
wx_article_object['id'] = str(int(id_index))
wx_article_list.append(wx_article_object)
return wx_article_list