def process_request(self, request, spider):
if spider.name == "jobbole":
self.browser.get(request.url)
import time
time.sleep(3)
print ("??:{0}".format(request.url))
return HtmlResponse(url=self.browser.current_url, body=self.browser.page_source, encoding="utf-8", request=request)
#linux?
# from pyvirtualdisplay import Display
# display = Display(visible=0, size=(800, 600))
# display.start()
#
# browser = webdriver.Chrome()
# browser.get()
python类HtmlResponse()的实例源码
def main():
start = timer()
url = 'http://scrapinghub.com/'
link_extractor = LinkExtractor()
total = 0
for files in glob.glob('sites/*'):
f = (io.open(files, "r", encoding="utf-8"))
html = f.read()
r3 = HtmlResponse(url=url, body=html, encoding='utf8')
links = link_extractor.extract_links(r3)
total = total + len(links)
end = timer()
print("\nTotal number of links extracted = {0}".format(total))
print("Time taken = {0}".format(end - start))
click.secho("Rate of link extraction : {0} links/second\n".format(
float(total / (end - start))), bold=True)
with open("Benchmark.txt", 'w') as g:
g.write(" {0}".format((float(total / (end - start)))))
def process_request(self, request, spider):
if request.meta.has_key('PhantomJS'):
log.debug('PhantomJS Requesting: %s' % request.url)
ua = None
try:
ua = UserAgent().random
except:
ua = 'Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11'
webdriver.DesiredCapabilities.PHANTOMJS['phantomjs.page.settings.userAgent'] = ua
try:
self.driver.get(request.url)
content = self.driver.page_source.encode('utf-8')
url = self.driver.current_url.encode('utf-8')
except:
return HtmlResponse(request.url, encoding='utf-8', status=503, body='')
if content == '<html><head></head><body></body></html>':
return HtmlResponse(request.url, encoding ='utf-8', status=503, body='')
else:
return HtmlResponse(url, encoding='utf-8', status=200, body=content)
else:
log.debug('Common Requesting: %s' % request.url)
def goodsUrlList(home_url):
'''
?????????????????url
:param home_url: http://www.vipmro.com/search/?&categoryId=501110
:return:url??
'''
# ????????
all_group_list = parseOptional(home_url)
# ????goods????url
url_list = []
for url in all_group_list:
# url = 'http://www.vipmro.com/search/?ram=0.9551325197768372&categoryId=501110&attrValueIds=509805,509801,509806,509807'
# ??html
home_page = getHtmlFromJs(url)['content'].encode('utf-8')
html = HtmlResponse(url=url,body=str(home_page))
urls = html.selector.xpath('/html/body/div[7]/div[1]/ul/li/div[2]/a/@href').extract()
url_list.extend(urls)
# print(len(urls))
# print(urls)
# exit()
# print(len(url_list))
# print(url_list)
return url_list
def parseOptional(url):
'''
??url???????????url
:param url: http://www.vipmro.com/search/?&categoryId=501110
:return:['http://www.vipmro.com/search/?categoryId=501110&attrValueIds=509801,512680,509807,509823']
'''
# ??html
home_page = getHtmlFromJs(url)['content'].encode('utf-8')
html = HtmlResponse(url=url,body=str(home_page))
# ????
xi_lie = html.selector.xpath('/html/body/div[5]/div[6]/ul/li/a/@href').re(r'ValueIds=(\d+)')
# ????????
fen_duan = html.selector.xpath('/html/body/div[5]/div[10]/ul/li/a/@href').re(r'ValueIds=(\d+)')
# ?????
tuo_kou_qi = html.selector.xpath('/html/body/div[5]/div[14]/ul/li/a/@href').re(r'ValueIds=(\d+)')
# ????
an_zhuang = html.selector.xpath('/html/body/div[5]/div[12]/ul/li/a/@href').re(r'ValueIds=(\d+)')
# ????????
all_group = list(itertools.product(xi_lie,fen_duan,tuo_kou_qi,an_zhuang))
_url = url + '&attrValueIds='
url_list = map(lambda x:_url+','.join(list(x)),all_group)
return url_list
def process_request(self, request, spider):
try:
driver = webdriver.PhantomJS() #????????
# driver = webdriver.Firefox()
print "---"+str(request.meta["page"])+"-----js url start-------"
print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
driver.get(self.pc_index_url+"&page="+str(request.meta["page"]) )
# time.sleep(1)
tmp=driver.find_element_by_id('sf-item-list-data').get_attribute("innerHTML")
print "---"+str(request.meta["page"])+"-----js url end-------"
print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
body = tmp
return HtmlResponse(driver.current_url, body=body, encoding='utf-8', request=request)
except Exception,e:
print "-------------------"
print e.__doc__
print e.message
print "-------------------"
def intohotel(self,Links):
url = "http://hotels.ctrip.com/" + Links
self.driver.get(url)
self.driver.maximize_window()
self.driver.implicitly_wait(80)
time.sleep(3)
response = HtmlResponse(url="my HTML string",body=self.driver.page_source,encoding="utf-8")
# ????????
# self.crawlcommentinfo(commentnum)
# # ????????
try:
items = self.crawlhotelinfo(response,url)
except:
items = self.crawlhotelinfo2(response,url)
# ????????
self.xiechengDao.savehotelComment(items)
# ???????????
def __crawllianjie(self,page_sourse):
response = HtmlResponse(url="my HTML string",body=page_sourse,encoding="utf-8")
hotel_list = response.xpath("//div[@class='searchresult_list ']/ul")
for hotel in hotel_list:
url = hotel.xpath("li[@class='searchresult_info_name']/h2/a/@href").extract()[0]
address = hotel.xpath("li[@class='searchresult_info_name']/p[@class='searchresult_htladdress']/text()").extract()[0]
commnum = hotel.xpath("li[@class='searchresult_info_judge ']/div/a/span[@class='hotel_judgement']/text()").extract()
if len(commnum):
commnum = re.sub('\D','',commnum[0])
commnum = commnum if len(commnum)>0 else 0
else:
commnum = 0
name = hotel.xpath("li[@class='searchresult_info_name']/h2/a/text()").extract()[0]
self.listPageInfo.append({
"guid": uuid.uuid1(),
"url": url,
"hotel_name": name,
"OTA": self.__ota_info,
"comm_num": int(commnum),
"address": address
})
def __parseUrls(self, page_source):
response = HtmlResponse(url="my HTML string",body=page_source,encoding="utf-8")
# ?????????url???urlList?
url_list = response.xpath("//a[@class='name']/@href").extract()
comment_number_list = response.xpath("//div[@class='comment']/a/span/text()").extract()
name_list = response.xpath("//a[@class='name']/text()").extract()
address_list = response.xpath("//span[@class='address']/text()").extract()
if len(url_list) == len(comment_number_list) == len(name_list) == len(address_list):
for i in range(0, len(url_list)):
self.listPageInfo.append({
"guid": uuid.uuid1(),
"url": url_list[i],
"hotel_name": name_list[i],
"OTA": "??",
"comm_num": int(comment_number_list[i]),
"address": address_list[i]
})
def __parseUrls(self,page_source):
response = HtmlResponse(url="My HTML String",body=page_source,encoding="utf-8")
hotel_list = response.xpath("//div[@class='h_list']/div[@class='h_item']")
for hotel in hotel_list:
url = hotel.xpath(".//p[@class='h_info_b1']/a/@href").extract()[0]
name = hotel.xpath(".//p[@class='h_info_b1']/a/@title").extract()[0]
address = hotel.xpath(".//p[@class='h_info_b2']/text()").extract()[1]
commnum = hotel.xpath(".//div[@class='h_info_comt']/a/span[@class='c555 block mt5']/b/text()").extract()
if len(commnum)==0:
commnum = 0
else:commnum = commnum[0]
self.listPageInfo.append({
"guid": uuid.uuid1(),
"url": url,
"hotel_name": name,
"OTA": self.__ota_info,
"comm_num": commnum,
"address": address
})
pass
def process_request(self, request, spider):
try:
selenium_enable = request.meta.get('selenium')
except Exception as e:
log.info(e)
selenium_enable = False
if selenium_enable:
self.driver.get(request.url)
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR,
"#js-fans-rank > div > div.f-con > div.f-cn.cur > ul > li> a"))
)
body = self.driver.page_source
response = HtmlResponse(url=self.driver.current_url, body=body, request=request, encoding='utf8')
return response
else:
request.headers[
'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
request.headers[
'Accept'] = '*/*'
request.headers['Accept-Encoding'] = 'gzip, deflate, sdch, br'
request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6'
request.headers['Connection'] = 'keep-alive'
request.headers['Host'] = 'www.douyu.com'
request.headers['Upgrade-Insecure-Requests'] = 1
try:
cookies_enable = request.meta.get('cookies')
except Exception as e:
log.info(e)
cookies_enable = False
if cookies_enable:
del request.headers['Upgrade-Insecure-Requests']
request.headers['DNT'] = '1'
request.headers['X-Requested-With'] = 'XMLHttpRequest'
request.headers['referer'] = request.meta['referer']
self.cookies['_dys_lastPageCode'] = request.meta.get('_dys_lastPageCode')
self.cookies['_dys_refer_action_code'] = request.meta.get('_dys_refer_action_code')
request.cookies = self.cookies
def process_request(self, request, spider):
if self.use_selenium(request.url):
if self.use_proxy():
if self._count > 20:
self.update_driver()
self._count = 0
log.info('update driver')
yield HtmlResponse(request.url, encoding='utf-8', body=self.driver.page_source.encode('utf8'))
def process_request(self, request, spider):
if request.url[26] == 'c':
ua = random.choice(self.user_agent_list)
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = ua
dcap["phantomjs.page.settings.loadImages"] = False
driver = webdriver.PhantomJS(executable_path='E:\Webdriver\phantomjs-2.1.1-windows\\bin\phantomjs.exe',
desired_capabilities=dcap)
driver.get(request.url)
sleep_time = random.randint(15, 22)
time.sleep(sleep_time)
try:
detail = driver.find_element_by_xpath('//a[@ng-click="showDetail = btnOnClick(showDetail)"]')
detail.click()
except:
pass
body = driver.page_source
url = driver.current_url
driver.quit()
return HtmlResponse(url=url, body=body, request=request, encoding='utf-8')
def process_request(self, request, spider):
if spider.name == "gsxt":
# print("PhantomJS is starting...")
# driver = webdriver.PhantomJS(r"/home/lxw/Downloads/phantomjs/phantomjs-2.1.1-linux-x86_64/bin/phantomjs") # OK
driver = webdriver.Chrome(r"/home/lxw/Software/chromedirver_selenium/chromedriver") # OK
"""
# Using IP Proxies:
# ????chrome?????chrome???IP?????????????????
# ??DesiredCapabilities(????)??????????sessionId????????????????????????????url
proxy = webdriver.Proxy()
proxy.proxy_type = ProxyType.MANUAL
req = requests.get("http://datazhiyuan.com:60001/plain", timeout=10)
print("Get an IP proxy:", req.text)
if req.text:
proxy.http_proxy = req.text # "1.9.171.51:800"
# ????????webdriver.DesiredCapabilities.PHANTOMJS?
proxy.add_to_capabilities(webdriver.DesiredCapabilities.PHANTOMJS)
driver.start_session(webdriver.DesiredCapabilities.PHANTOMJS)
"""
driver.get(request.url) # ????????????, ??http://roll.news.qq.com/??
time.sleep(2)
js = "var q=document.documentElement.scrollTop=10000"
driver.execute_script(js) # ???js????????????????????
time.sleep(3)
body = driver.page_source
print("??" + request.url)
return HtmlResponse(driver.current_url, body=body, encoding='utf-8', request=request)
else:
return
def do_test(self, meta_object,
text, expected_raw, expected_requests):
request = Request(url='http://www.drudgereport.com',
meta=meta_object)
response = HtmlResponse('drudge.url', body=text, request=request)
raw_item_count = 0
request_count = 0
for x in self.spider.parse(response):
if isinstance(x, RawResponseItem):
raw_item_count = raw_item_count + 1
elif isinstance(x, Request):
request_count = request_count + 1
self.assertEqual(raw_item_count, expected_raw)
self.assertEqual(request_count, expected_requests)
def detail_translate_note(self, all_url, itemi):
for url in all_url:
url = self.site_domain + url
print('detail_translate_note url %s' % url)
html_requests = requests.get(url).text.encode('utf-8')
html_response = HtmlResponse(url=url, body=html_requests, headers={'Connection': 'close'})
html_all = Selector(html_response)
itemi['detail_translate_note_text_title'] = html_all.xpath(
'//div[@class="main3"]/div[@class="shileft"]/div[@class="son1"]/h1/text()').extract()
itemi['detail_translate_text'] = html_all.xpath(
'//div[@class="main3"]/div[@class="shileft"]/div[@class="shangxicont"]/p[not(@style)]/descendant-or-self::text()').extract()
item_list_temp = []
for item_list in itemi['detail_translate_text']:
temp = item_list.encode('utf-8')
temp = re.sub(r'\"', "“", temp)
item_list_temp.append(temp)
itemi['detail_translate_text'] = item_list_temp
pass
# ????
def test_parse_drug_details_or_overview_generates_new_request_if_redirected_to_search_page(self):
url = 'http://www.accessdata.fda.gov/scripts/cder/drugsatfda/index.cfm?fuseaction=Search.Search_Drug_Name'
meta = {
'original_url': 'http://www.accessdata.fda.gov/somewhere.cfm',
'original_cookies': {
'foo': 'bar',
},
}
mock_response = HtmlResponse(url=url)
mock_response.request = Request(url, meta=meta)
with mock.patch('random.random', return_value='random_cookiejar'):
spider = Spider()
request = spider.parse_drug_details_or_overview(mock_response)
assert request.url == meta['original_url']
assert request.cookies == meta['original_cookies']
assert request.dont_filter
assert request.callback == spider.parse_drug_details_or_overview
assert request.meta['cookiejar'] == 'random_cookiejar'
def get_url(betamax_session):
def _get_url(url, request_kwargs={}):
'''Returns a scrapy.html.HtmlResponse with the contents of the received
url.
Note that the session is kept intact among multiple calls to this
method (i.e. cookies are passed over).
We also don't verify SSL certificates, because Takeda's certificate is
invalid. If they become valid, we can resume verifying the
certificates.
'''
response = betamax_session.get(url, verify=False)
scrapy_response = HtmlResponse(
url=str(response.url),
body=response.content,
)
scrapy_response.request = Request(url, **request_kwargs)
return scrapy_response
return _get_url
def test_form_request_from_response():
# Copied from scrapy tests (test_from_response_submit_not_first_clickable)
def _buildresponse(body, **kwargs):
kwargs.setdefault('body', body)
kwargs.setdefault('url', 'http://example.com')
kwargs.setdefault('encoding', 'utf-8')
return HtmlResponse(**kwargs)
response = _buildresponse(
"""<form action="get.php" method="GET">
<input type="submit" name="clickable1" value="clicked1">
<input type="hidden" name="one" value="1">
<input type="hidden" name="two" value="3">
<input type="submit" name="clickable2" value="clicked2">
</form>""")
req = SplashFormRequest.from_response(
response, formdata={'two': '2'}, clickdata={'name': 'clickable2'})
assert req.method == 'GET'
assert req.meta['splash']['args']['url'] == req.url
fs = cgi.parse_qs(req.url.partition('?')[2], True)
assert fs['clickable2'] == ['clicked2']
assert 'clickable1' not in fs
assert fs['one'] == ['1']
assert fs['two'] == ['2']
def test_form_request_from_response():
# Copied from scrapy tests (test_from_response_submit_not_first_clickable)
def _buildresponse(body, **kwargs):
kwargs.setdefault('body', body)
kwargs.setdefault('url', 'http://example.com')
kwargs.setdefault('encoding', 'utf-8')
return HtmlResponse(**kwargs)
response = _buildresponse(
"""<form action="get.php" method="GET">
<input type="submit" name="clickable1" value="clicked1">
<input type="hidden" name="one" value="1">
<input type="hidden" name="two" value="3">
<input type="submit" name="clickable2" value="clicked2">
</form>""")
req = SplashFormRequest.from_response(
response, formdata={'two': '2'}, clickdata={'name': 'clickable2'})
assert req.method == 'GET'
assert req.meta['splash']['args']['url'] == req.url
fs = cgi.parse_qs(req.url.partition('?')[2], True)
assert fs['clickable2'] == ['clicked2']
assert 'clickable1' not in fs
assert fs['one'] == ['1']
assert fs['two'] == ['2']
def test_form_request_from_response():
# Copied from scrapy tests (test_from_response_submit_not_first_clickable)
def _buildresponse(body, **kwargs):
kwargs.setdefault('body', body)
kwargs.setdefault('url', 'http://example.com')
kwargs.setdefault('encoding', 'utf-8')
return HtmlResponse(**kwargs)
response = _buildresponse(
"""<form action="get.php" method="GET">
<input type="submit" name="clickable1" value="clicked1">
<input type="hidden" name="one" value="1">
<input type="hidden" name="two" value="3">
<input type="submit" name="clickable2" value="clicked2">
</form>""")
req = SplashFormRequest.from_response(
response, formdata={'two': '2'}, clickdata={'name': 'clickable2'})
assert req.method == 'GET'
assert req.meta['splash']['args']['url'] == req.url
fs = cgi.parse_qs(req.url.partition('?')[2], True)
assert fs['clickable2'] == ['clicked2']
assert 'clickable1' not in fs
assert fs['one'] == ['1']
assert fs['two'] == ['2']
def extractLinks(self, response):
retv = []
link_extractor = LinkExtractor()
if isinstance(response, HtmlResponse):
links = link_extractor.extract_links(response)
for link in links:
if self.postfix in link.url:
retv.append(link.url)
return retv
def goodsDetail(detail_url):
'''
??xpath??????
:param detail_url: ???url
:return: ?????? dict
'''
goods_data = defaultdict()
# ?????
goods_data['source_url'] = detail_url
# ??html body???str??
body = getHtmlFromJs(detail_url)['content'].encode('utf-8')
html = HtmlResponse(url=detail_url,body=str(body))
# ??
goods_data['name'] = html.xpath('/html/body/div[7]/div[2]/h1/text()').extract()[0]
# ??
goods_data['price'] = html.selector.xpath('/html/body/div[7]/div[2]/div[2]/ul/li[1]/label[1]/text()').extract()[0]
# ??
goods_data['type'] = html.selector.xpath('/html/body/div[7]/div[2]/div[2]/ul/li[3]/label/text()').extract()[0]
# ??
goods_data['detail'] = html.selector.xpath('/html/body/div[9]/div[2]/div[2]/table').extract()[0]
# ??
pics = []
for pic in html.selector.xpath('/html/body/div[7]/div[1]/div[2]/div[2]/ul/li/img'):
# ??????,????
pics.append(pic.xpath('@src').extract()[0].replace('!240240',''))
goods_data['pics'] = '|'.join(pics)
goods_data['storage'] = ''
goods_data['lack_period'] = ''
goods_data['created'] = int(time.time())
goods_data['updated'] = int(time.time())
# print(goods_data['detail'])
return goods_data
OpenPensionSeleniumCrawler.py 文件源码
项目:open-pension-crawler
作者: nirgn975
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def process_request(self, request, spider):
# driver = webdriver.Firefox(executable_path="/Users/roysegall/geckodriver")
driver = webdriver.PhantomJS(executable_path='/Users/roysegall/phantomjs')
driver.get(request.url)
return HtmlResponse(request.url, encoding='utf-8', body=driver.page_source.encode('utf-8'))
def process_request(self, request, spider):
if request.meta.get('nojs'):
# disable js rendering in a per-request basis
return
self.driver.get(request.url)
content = self.driver.page_source
return HtmlResponse(request.url, body=content, encoding='utf-8')
def pageHandler_comment(self,page_source,pageNum,userID,weiboID):
response = HtmlResponse(url="my HTML string",body=page_source,encoding="utf-8")
if pageNum==1:
pass
items = self.__getCommentItems(response,pageNum,userID,weiboID)
if len(items)>0:
self.weiboDao.saveWeiboComment(items)
# ??????????
def __parseHotelComment(self, page_source, hotel_id, comm_type):
response = HtmlResponse(url="My HTML String", body=page_source, encoding="utf-8")
remarkDom = response.xpath("//div[@class='user_remark_datail']")
remarkDomLen = len(response.xpath("//div[@class='user_remark_datail']/div"))
# ?????????????????????
same_num = 0
for i in range(1, remarkDomLen+1):
id = uuid.uuid1()
# ???
username = remarkDom.xpath("div[%d]/div[@class='a1']/div[@class='b2']/text()"%i).extract()
username = username[0] if len(username) > 0 else ""
# ????
remarkText = remarkDom.xpath("div[%d]/div[@class='a2']/div[@class='b2']/p/text()"%i).extract()
remark = ""
for str in remarkText:
remark = remark + re.sub("\s+", "", str)
# ????
comm_time = remarkDom.xpath("div[%d]/div[@class='a2']/div[@class='b4']/div[@style='float: right;']/text()"%i).extract()[0]
# ????
user_type = ""
senti_value = None
viewpoint = None
try:
user_type = remarkDom.xpath("div[%d]/div[@class='a1']/div[@class='b3']/text()"%i).extract()[0]
senti_value = self.hotelNLP.sentiment(remark.encode("utf-8"))
viewpoint = json.dumps(self.hotelNLP.viewpoint(remark.encode("utf-8"),decoding="utf-8"))
except:
traceback.print_exc()
comm = {"guid":id, "username":username, "remark":remark, "comm_time":comm_time, "user_type":user_type, "hotel_id":hotel_id, "comm_type":comm_type, "senti_value":senti_value, "viewpoint":viewpoint}
if self.__is_exist_in_comment_list(comm):
same_num += 1
else:
self.commList.append(comm)
if same_num == remarkDomLen:
return False
else:
return True
def _extract_requests(self, response):
r = []
if isinstance(response, HtmlResponse):
links = self.link_extractor.extract_links(response)
r.extend(Request(x.url, callback=self.parse) for x in links)
return r
def parse(self, response):
# Wiener Linien returns HTML with an XML content type which creates an
# XmlResponse.
response = HtmlResponse(url=response.url, body=response.body)
for item in response.css('.block-news-item'):
il = FeedEntryItemLoader(response=response,
timezone=self._timezone,
base_url='http://{}'.format(self.name))
link = response.urljoin(item.css('a::attr(href)').extract_first())
il.add_value('link', link)
il.add_value('title', item.css('h3::text').extract_first())
il.add_value('updated', item.css('.date::text').extract_first())
yield scrapy.Request(link, self.parse_item, meta={'il': il})
def process_request(self, request, spider):
if JAVASCRIPT in request.meta and request.meta[JAVASCRIPT] is True:
driver = self.phantomjs_opened()
try:
driver.get(request.url)
body = driver.page_source
return HtmlResponse(request.url, body=body, encoding='utf-8', request=request)
finally:
self.phantomjs_closed(driver)