def process_request(self, request, spider):
if 'how' in request.meta:
if 'isscreen' in request.meta:
print(1)
true_page = selenium_request(request.url,True)
else:
true_page = selenium_request(request.url)
return HtmlResponse(request.url, body=true_page, encoding='utf-8', request=request, )
python类HtmlResponse()的实例源码
def process_request(self, request, spider):
if spider.name == "jobbole":
spider.browser.get(request.url)
import time
# time.sleep(3)
print("???{0}".format(request.url))
return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8"
, request=request)
def parse_kb(self, response):
# initial html tokenization to find regions segmented by e.g. "======"
# or "------"
filtered = response.xpath(
"//div[@class='sfdc_richtext']").extract()[0].split("=-")
for entry in [x and x.strip() for x in filtered]:
resp = HtmlResponse(url=response.url, body=entry,
encoding=response.encoding)
for link in resp.xpath("//a"):
href = link.xpath("@href").extract()[0]
if "cache-www" in href:
text = resp.xpath("//text()").extract()
text_next = link.xpath("following::text()").extract()
item = FirmwareLoader(item=FirmwareImage(),
response=response,
date_fmt=["%b %d, %Y", "%B %d, %Y",
"%m/%d/%Y"])
version = FirmwareLoader.find_version_period(text_next)
if not version:
version = FirmwareLoader.find_version_period(text)
item.add_value("version", version)
item.add_value("date", item.find_date(text))
item.add_value("url", href)
item.add_value("product", response.meta["product"])
item.add_value("vendor", self.name)
yield item.load_item()
def parse_kb(self, response):
mib = None
# need to perform some nasty segmentation because different firmware versions are not clearly separated
# reverse order to get MIB before firmware items
for entry in reversed(response.xpath(
"//div[@id='support-article-downloads']/div/p")):
for segment in reversed(entry.extract().split("<br><br>")):
resp = HtmlResponse(
url=response.url, body=segment, encoding=response.encoding)
for href in resp.xpath("//a/@href").extract():
text = resp.xpath("//text()").extract()
if "MIBs" in href:
mib = href
elif "firmware" in href:
text = resp.xpath("//text()").extract()
item = FirmwareLoader(
item=FirmwareImage(), response=resp, date_fmt=["%m/%d/%Y"])
item.add_value("date", item.find_date(text))
item.add_xpath("url", "//a/@href")
item.add_value("mib", mib)
item.add_value("product", response.meta["product"])
item.add_value("vendor", self.name)
item.add_value(
"version", FirmwareLoader.find_version_period(text))
yield item.load_item()
def process_request(self, request, spider):
print("Using process_request")
true_page = selenium_request(request.url)
return HtmlResponse(request.url, body=true_page, encoding='utf-8', request=request)
def process_request(self, request, spider):
if spider.name == 'jobbole':
spider.browser.get(request.url)
time.sleep(3)
print("??: {0}".format(request.url))
return HtmlResponse(
url=spider.browser.current_url,
body=spider.browser.page_source,
encoding="utf-8",
request=request
)
def read(self, source):
source_filename = os.path.basename(source)
with zipfile.ZipFile(source) as zf:
filenames = sorted(set([zipinfo.filename[:10] for zipinfo in zf.infolist()]))
for filename in filenames:
source_path = u'{0}/{1}'.format(source_filename, filename)
# Read info
desc = zf.read(self.INFO_FORMAT.format(filename))
info = json.loads(desc)
url = info['url'].encode('utf8')
info.pop('url', None)
headers = info['headers']
info.pop('headers', None)
status = info['status']
info.pop('status', None)
info_meta = info['meta']
info_meta['source_path'] = source_path
# Read content
content = zf.read(self.BODY_FORMAT.format(filename))
request = Request(
url=url,
meta=info_meta
)
response = HtmlResponse(
url=url,
headers=headers,
status=status,
body=content,
request=request,
)
yield response
def handle_detail(self, response, itemi):
print(response)
response = response.strip()
# requests.adapters.DEFAULT_RETRIES = 10
# s = requests.session()
# s.config['keep_alive'] = False
html_requests_item = requests.get(response)
html_requests = html_requests_item.text.encode('utf-8')
# html_requests_item.connection.close()
html_response = HtmlResponse(url=response, body=html_requests, headers={'Connection': 'close'})
html_all = Selector(html_response)
html = html_all.xpath('//div[@class="main3"]/div[@class="shileft"]')
itemi['detail_dynasty'] = html.xpath(
u'div[@class="son2"]/p/span[contains(text(),"???")]/parent::p/text()').extract()[0]
itemi['detail_translate_note_url'] = html.xpath(
u'div[@class="son5"]//u[contains(text(),"?????")]/parent::a/@href').extract()
itemi['detail_appreciation_url'] = html.xpath(
u'div[@class="son5"]//u[contains(text(),"?")]/parent::a/@href').extract()
itemi['detail_background_url'] = html.xpath(
u'div[@class="son5"]//u[contains(text(),"????") or contains(text(),"????")]/parent::a/@href').extract()
itemi['detail_author'] = html.xpath(
u'div[@class="son2"]/p/span[contains(text(),"???")]/parent::p/a/text()').extract()
itemi['detail_text'] = "".join(html.xpath('div[@class="son2"]/text()').extract()).strip().encode('utf-8')
# itemi['detail_text'] = re.sub(r'?',"“",itemi['detail_text'])
# itemi['detail_text'] = re.sub(r'\(.*?\)',"",itemi['detail_text'])
itemi['detail_text'] = re.sub(r'\r?\n\t?.*?\)', "", itemi['detail_text'])
if itemi['detail_background_url']:
self.detail_background(itemi['detail_background_url'], itemi)
pass
else:
pass
self.detail_translate_note(itemi['detail_translate_note_url'], itemi)
self.detail_appreciation(itemi['detail_appreciation_url'], itemi)
# ??????
def detail_background(self, all_url, itemi):
detail_appreciation_container = []
for url in all_url:
url = self.site_domain + url
print('detail_background_text url : %s' % url)
html_requests = requests.get(url).text.encode('utf-8')
html_response = HtmlResponse(url=url, body=html_requests, headers={'Connection': 'close'})
html_all = Selector(html_response)
temp = ''.join(html_all.xpath(
u'//div[@class="main3"]/div[@class="shileft"]/div[@class="shangxicont"]/p[not(@style or contains(text(),"?????"))]').extract())
temp = temp.encode('utf-8')
temp = re.sub(r'<p>', '', temp)
temp = re.sub(r'</p>', '', temp)
temp = re.sub(r'</a>', '', temp)
temp = re.sub(r'(<a\s+href=\s*\".*?\">)', '', temp)
alt = re.search(r'\s+alt=\s*\"(.*?)\"\s+', temp)
# print(alt.group(1))
if alt is not None:
temp = re.sub(r'<img.*\s*>', alt.group(1), temp)
else:
print('%s have a none img' % url)
temp = re.sub(r'\"', "“", temp)
detail_appreciation_container.append(temp)
itemi['detail_background_text'] = detail_appreciation_container
# ???????
def detail_appreciation(self, all_url, itemi):
detail_appreciation_container = []
for url in all_url:
url = self.site_domain + url
print('detail_appreciation url : %s' % url)
html_requests = requests.get(url).text.encode('utf-8')
html_response = HtmlResponse(url=url, body=html_requests, headers={'Connection': 'close'})
html_all = Selector(html_response)
temp = ''.join(html_all.xpath(
u'//div[@class="main3"]/div[@class="shileft"]/div[@class="shangxicont"]/p[not(@style or contains(text(),"?????"))]').extract())
temp = temp.encode('utf-8')
temp = re.sub(r'<p>', '', temp)
temp = re.sub(r'</p>', '', temp)
temp = re.sub(r'</a>', '', temp)
temp = re.sub(r'(<a\s+href=\s*\".*?\">)', '', temp)
alt = re.search(r'\s+alt=\s*\"(.*?)\"\s+', temp)
# print(alt.group(1))
if alt is not None:
temp = re.sub(r'<img.*\s*>', alt.group(1), temp)
else:
print('%s have a none img in appricate' % url)
temp = re.sub(r'\"', "“", temp)
# if self.site_domain + '/shangxi_4618.aspx' == url:
# print(temp)
detail_appreciation_container.append(temp)
itemi['detail_appreciation_text'] = detail_appreciation_container
pass
def process_request(self, request, spider):
if spider.name == "jobbole":
# browser = webdriver.Chrome(executable_path="D:/Temp/chromedriver.exe")
spider.browser.get(request.url)
import time
time.sleep(3)
print ("??:{0}".format(request.url))
return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8", request=request)
def test_parse_drug_details_or_overview_delegates_to_parse_drug_details_when_response_in_drug_details(self):
url = 'http://www.accessdata.fda.gov/scripts/cder/drugsatfda/index.cfm?fuseaction=Search.DrugDetails'
mock_response = HtmlResponse(url=url)
expected_result = 'expected_result'
with mock.patch.object(Spider,
'parse_drug_details',
return_value=expected_result) as mock_method:
spider = Spider()
result = spider.parse_drug_details_or_overview(mock_response)
mock_method.assert_called_once_with(mock_response)
assert result == expected_result
def test_parse_drug_details_or_overview_delegates_to_parse_drug_details_when_response_in_drug_overview(self):
url = 'http://www.accessdata.fda.gov/scripts/cder/drugsatfda/index.cfm?fuseaction=Search.Overview&DrugName=E-BASE'
mock_response = HtmlResponse(url=url)
expected_result = 'expected_result'
with mock.patch.object(Spider,
'parse_drug_overview',
return_value=expected_result) as mock_method:
spider = Spider()
result = spider.parse_drug_details_or_overview(mock_response)
mock_method.assert_called_once_with(mock_response)
assert result == expected_result
def test_parse_drug_details_or_overview_raises_exception_for_unknown_pages(self):
url = 'http://www.accessdata.fda.gov/'
mock_response = HtmlResponse(url=url)
with pytest.raises(Exception):
spider = Spider()
spider.parse_drug_details_or_overview(mock_response)
def parse(self, response):
marker_txt = re.findall(re.compile("markerData.*\}", re.MULTILINE), response.body_as_unicode())
if not len(marker_txt):
return
markers_json = "{\"" + marker_txt[0]
markers = list(json.loads(markers_json).values())[0]
if not len(markers):
return
for marker in markers:
marker_response = HtmlResponse(url="", body=marker["info"].encode("utf-8"))
hours = re.findall(r"\{\"label.*\}", marker["info"])
hours = hours[0]
parsed_hours = json.loads(hours)
addr_parts = marker_response.css(".address span:not(.phone)::text").extract()
url = marker_response.css("header a").xpath("@href").extract_first()
city, state = addr_parts[-1].split(",")
yield GeojsonPointItem(lat=marker.get("lat"), lon=marker.get("lng"),
name=marker_response.css("header a::text").extract_first(default=None),
addr_full=", ".join(addr_parts),
city=city.strip(),
state=state.strip(),
country="United States",
phone=marker_response.css(".phone::text").extract_first(),
website=url,
opening_hours=get_hours(parsed_hours["days"]),
ref=url.split("/")[-1].split(".")[0])
def parse(self, response):
data = json.loads(response.body_as_unicode())
stores = data['markers']
for store in stores:
html = HtmlResponse(
url="",
body=store['info'].encode('UTF-8')
)
unp = {}
unp['lat'] = store['lat']
unp['lon'] = store['lng']
if unp['lat']: unp['lat'] = float(unp['lat'])
if unp['lon']: unp['lon'] = float(unp['lon'])
unp['ref'] = store['locationId']
unp['addr_full'] = html.xpath('//div[contains(@class, "addr")]/text()').extract_first()
unp['phone'] = html.xpath('//div[contains(@class, "phone")]/text()').extract_first()
unp['name'] = html.xpath('//div[@class="loc-name"]/text()').extract_first()
addr2 = html.xpath('//div[contains(@class, "csz")]/text()').extract_first()
if addr2:
addr2 = addr2.strip()
three_pieces = self.addr2regex.search(addr2)
if three_pieces:
city, state, zipcode = three_pieces.groups()
unp['city'] = city
unp['state'] = state
unp['postcode'] = zipcode
properties = {}
for key in unp:
if unp[key]:
properties[key] = unp[key]
yield GeojsonPointItem(**properties)
def main():
total = 0
time = 0
tar = tarfile.open("bookfiles.tar.gz")
for member in tar.getmembers():
f = tar.extractfile(member)
html = f.read()
response = HtmlResponse(url="local", body=html, encoding='utf8')
start = timer()
rating = response.xpath(
"//*[@id='content_inner']/article/div[1]/div[2]/p[3]/i[1]").extract(), # .split(' ')[-1],
title = response.xpath(
"//*[@id=('content_inner')]/article/div[1]/div[2]/h1").extract(),
price = response.xpath(
"//*[@id=('content_inner')]/article/div[1]/div[2]/p[1]"),
stock = ''.join(response.xpath(
"//*[@id=('content_inner')]/article/div[1]/div[2]/p[2]").re('(\d+)')),
end = timer()
page = [rating, title, price, stock]
total = total + 1
time = time + end - start
print("\nTotal number of pages extracted = {0}".format(total))
print("Time taken = {0}".format(time))
click.secho("Rate of link extraction : {0} pages/second\n".format(
float(total / time)), bold=True)
with open("Benchmark.txt", 'w') as g:
g.write(" {0}".format((float(total / time))))
def main():
url = 'http://scrapinghub.com/'
link_extractor = LinkExtractor()
total = 0
time = 0
tar = tarfile.open("sites.tar.gz")
for member in tar.getmembers():
f = tar.extractfile(member)
html = f.read()
start = timer()
response = HtmlResponse(url=url, body=html, encoding='utf8')
links = link_extractor.extract_links(response)
end = timer()
total = total + len(links)
time = time + end - start
print("\nTotal number of links extracted = {0}".format(total))
print("Time taken = {0}".format(time))
click.secho("Rate of link extraction : {0} links/second\n".format(
float(total / time)), bold=True)
with open("Benchmark.txt", 'w') as g:
g.write(" {0}".format((float(total / time))))
def _extract_requests(self, response):
r = []
if isinstance(response, HtmlResponse):
links = self.link_extractor.extract_links(response)
r.extend(Request(x.url, callback=self.parse) for x in links)
return r
def main():
total = 0
time = 0
tar = tarfile.open("bookfiles.tar.gz")
for member in tar.getmembers():
f = tar.extractfile(member)
html = f.read()
response = HtmlResponse(url="local", body=html, encoding='utf8')
start = timer()
rating = response.css(
'p.star-rating::attr(class)').extract_first().split(' ')[-1]
title = response.css('.product_main h1::text').extract_first()
price = response.css(
'.product_main p.price_color::text').re_first('£(.*)')
stock = ''.join(
response.css('.product_main .instock.availability ::text').re('(\d+)'))
category = ''.join(
response.css('ul.breadcrumb li:nth-last-child(2) ::text').extract()).strip()
end = timer()
page = [rating, title, price, stock, category]
total = total + 1
time = time + end - start
print("\nTotal number of pages extracted = {0}".format(total))
print("Time taken = {0}".format(time))
click.secho("Rate of link extraction : {0} pages/second\n".format(
float(total / time)), bold=True)
with open("Benchmark.txt", 'w') as g:
g.write(" {0}".format((float(total / time))))