def parse_answer(self, reponse):
# ??question?answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
# ??answer?????
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
python类Request()的实例源码
def parse_book(self, response):
item = BookItem()
sel = Selector(response)
e = sel.xpath("//div[@id='wrapper']")
item['name'] = e.xpath("./descendant::h1/descendant::span/text()").extract()
item['author'] = e.xpath("//*[@id='info']/span[1]/a/text()").extract()
item['bookinfo'] = e.xpath("//*[@id='info']/text()").extract()
item['score'] = e.xpath('//*[@id="interest_sectl"]/div/div[2]/strong/text()').extract()
item['commentNum'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@property = "v:votes"]/text()').extract()
item['fivestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][1]/text()').extract()
item['fourstar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][2]/text()').extract()
item['threestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][3]/text()').extract()
item['twostar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][4]/text()').extract()
item['onestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][5]/text()').extract()
item['tag'] = response.xpath("//*[@id = 'db-tags-section']/descendant::a/text()").extract()
request = scrapy.Request(response.url + "/comments/hot", callback=self.parse_review) # ???????????
request.meta['item'] = item
return request
# ???????????
def parse(self, response):
item = BookItem()
sel = Selector(response)
e = sel.xpath("//div[@id='wrapper']")
item['name'] = e.xpath("./descendant::h1/descendant::span/text()").extract()
item['author'] = e.xpath("//*[@id='info']/span[1]/a/text()").extract()
item['bookinfo'] = e.xpath("//*[@id='info']/text()").extract()
item['score'] = e.xpath('//*[@id="interest_sectl"]/div/div[2]/strong/text()').extract()
item['commentNum'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@property = "v:votes"]/text()').extract()
item['fivestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][1]/text()').extract()
item['fourstar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][2]/text()').extract()
item['threestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][3]/text()').extract()
item['twostar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][4]/text()').extract()
item['onestar'] = e.xpath('//*[@id="interest_sectl"]/descendant::span[@class = "rating_per"][5]/text()').extract()
item['tag'] = response.xpath("//*[@id = 'db-tags-section']/descendant::a/text()").extract()
request = scrapy.Request(response.url + "/comments/hot", callback=self.parse_review) # ???????????
request.meta['item'] = item
return request
# ???????????
def generate_productlist(self, response):
product_list = response.xpath("//a[@class='sellPoint']/@href").extract()
for product_url in product_list:
yield scrapy.Request(
'http:' + product_url,
callback=self.generate_product_detail
)
# ?????
# next_page = response.xpath("//a[@class='cur']/following-sibling::*[1]/@href").extract()[0]
page_key = int(response.meta['page_key'])
if page_key < 100:
yield scrapy.Request(
response.url.replace('-' + str(page_key) + '.html', '-' + str(page_key + 1) + '.html'),
meta={"page_key": page_key+1},
callback=self.generate_productlist
)
def start_requests(self):
yield scrapy.Request(
'http://bbs.zhiyoo.com/',
meta={"page_key": 1, "proxy": MongoClient.get_random_proxy()},
callback=self.generate_forum
)
for index in self.forum_arr:
yield scrapy.Request(
'http://bbs.zhiyoo.com/source/module/forum/tab_ajax.php?index=nav_' + str(index),
meta={"page_key": 1, "proxy": MongoClient.get_random_proxy()},
callback=self.generate_forum
)
# yield scrapy.Request(
# 'http://bbs.zhiyoo.com/forum-401-1.html',
# callback=self.generate_forum_page_list
# )
def generate_forum_url_list(self, response):
all_a_tags = response.xpath('//a/@href').extract()
forum_dict = {}
for a_tag in all_a_tags:
if a_tag.find("forum") != -1:
if a_tag in forum_dict:
forum_dict[a_tag] += 1
else:
forum_dict[a_tag] = 1
for a_href in forum_dict:
yield scrapy.Request(
a_href,
meta={"page_key": 1},
dont_filter='true',
callback=self.get_record_list
)
# ???????
for a_href in self.forum_url:
yield scrapy.Request(
a_href,
meta={"page_key": 1},
dont_filter='true',
callback=self.get_record_list
)
def generate_firm_content(self, response):
qitem = YQichachaItem()
qitem._id = re.search(u'firm_(.*)(\.html)$', response.url).group(1)
qitem.name = response.xpath("//div[contains(@class, 'company-top-name')]/text()").extract()[0]
base_info = list()
base_info.append({"????": self.clean_content(response.xpath(
"//span[contains(@class, 'm_comInfo')]").extract()[0])})
qitem.base_info = base_info
qitem.save()
chacha_url_pre = self.url_qichacha_pre + '/company_getinfos?unique=' + qitem._id + '&companyname='+qitem.name
yield scrapy.Request(
chacha_url_pre +'&tab=base',
callback=self.generate_firm_base,
cookies=self.qicha_cookie,
encoding='utf-8',
meta={"item": qitem, "chacha_url_pre":chacha_url_pre}
)
def generate_article_url(self, response):
as_id = ''.join(random.sample(string.ascii_letters + string.digits, 15))
cp_id = ''.join(random.sample(string.ascii_letters + string.digits, 15))
yield scrapy.Request(
"http://www.toutiao.com/api/pc/feed/?category=news_tech&utm_source=toutiao&widen=1&max_behot_time=0" +
"max_behot_time_tmp=" + str(int(time.time())) +
"tadrequire=true&as=" + as_id + "&cp=" + cp_id + "&t=" + str(time.time()),
callback=self.generate_article_url
)
article_list = json.loads(response.body)
if article_list.get("message") != "success":
return
for article_detail in article_list.get('data'):
# wenda gallery ad ?
# news_tech and news_finance
tag_url = article_detail.get('tag_url')
if article_detail.get('article_genre') == 'article'\
and (tag_url == 'news_tech' or tag_url == 'news_finance'):
yield scrapy.Request(
self.toutiao_url_pre + article_detail.get('source_url'),
callback=self.generate_article_content
)
def generate_articlelist(self, response):
if response.body.find("list") == -1:
return
articlelist = json.loads(response.body)
page_key = int(response.meta['page_key'])
# if 1 == 1:
if page_key == 1 or self.check_rep_time(response.body):
yield scrapy.Request(
response.url.replace(re.search(u'index=[\d]+', response.url).group(0), 'index='+str(page_key+1)),
callback=self.generate_articlelist,
meta={"page_key": str(page_key+1)}
)
# scrapy all article
for artUrl in articlelist['list']:
yield scrapy.Request(
artUrl['ArtUrl'],
callback=self.generate_article_detail
)
def get_changyan_topic_id(self, response):
article_item = YPcpopItem()
article_item._id = response.meta['article_id']
comment_all = json.loads(response.body)
if 'cmt_sum' in comment_all:
article_item.replies = str(comment_all['cmt_sum'])
if 'participation_sum' in comment_all:
article_item.views = str(comment_all['participation_sum'])
MongoClient.save_forum_views(article_item, YPcpopItem)
MongoClient.save_forum_replies(article_item, YPcpopItem)
if 'topic_id' in comment_all:
yield scrapy.Request(
'http://changyan.sohu.com/api/2/topic/comments?&client_id=cyrYYYfxG&page_size=100&page_no=1&topic_id='+
str(comment_all['topic_id']),
meta={"article_id": article_item._id, "page_no":1, "topic_id":str(comment_all['topic_id'])},
callback=self.get_changyan_comment
)
def generate_forum_url(self, response):
# page_key = int(response.meta['page_key']) + 1
# check last forum time ?????????
# rep_time = response.xpath('//div[@class="Forumhome_listbox"]//dl//dd//p/text()').extract()
# if self.check_rep_date(rep_time):
# url = 'http://club.lenovo.com.cn/forum-all-reply_time-0-' + str(page_key)
# yield scrapy.Request(
# url,
# meta={"page_key": page_key, "proxy": MongoClient.get_random_proxy()},
# callback=self.generate_forum_url
# )
for h1a_forum_url in response.xpath('//div[@class="Forumhome_listbox"]//dd//h1//a//@href').extract():
yield scrapy.Request(
h1a_forum_url,
meta={"proxy": MongoClient.get_random_proxy()},
callback=self.generate_forum_content
)
# parse forum content and store
def generate_forum_url(self, response):
url_xpath = response.xpath(
'//div[@class="threadlist"]//div[@class="threadlist_title"]//a[@onclick="atarget(this)"]/@href').extract()
rep_time_path = response.xpath(
'//div[@class="threadlist_info"]//div[@class="lastreply"]//span/@title').extract()
page_key = int(response.meta['page_key']) + 1
if len(rep_time_path) > 0:
if self.check_rep_date(rep_time_path[0]) or page_key == 2:
# ?????
forum_key = response.meta['forum_key']
yield scrapy.Request(
"http://bbs.lenovomobile.cn/" + forum_key + "/" + str(page_key) + "/",
meta={"page_key": page_key, "forum_key": forum_key},
callback=self.generate_forum_url
)
logging.error(len(url_xpath))
# ????
for forum_url in url_xpath:
yield scrapy.Request(
# eg. /zui/t778232/
"http://bbs.lenovomobile.cn" + forum_url + '1/',
callback=self.generate_forum_content
)
def generate_article_comment_sum(self, response):
com_sum_script = response.xpath("//html//script[1]//text()").extract()
com_sum = 0
if len(com_sum_script) > 1:
com_sum_script = re.search(u'[\d]+', com_sum_script[1])
try:
com_sum = com_sum_script.group(0)
except:
com_sum = ''
ithome_item = YIthome2Item()
ithome_item._id = re.search(u'[\d]+', response.url).group(0)
ithome_item.replies = str(com_sum)
MongoClient.save_ithome_com_sum(ithome_item)
hash_key = response.xpath('//input[@id="hash"]/@value').extract()
if len(hash_key) > 0:
com_url = \
"http://dyn.ithome.com/ithome/getajaxdata.aspx?newsID=" + response.meta['article_id']
com_url += "&type=commentpage&order=false&hash="+hash_key[0]+"&page="
yield scrapy.Request(
com_url + str(1),
dont_filter='true',
callback=self.generate_article_comment
)
def start_requests(self):
# enter forum
yield scrapy.Request(
'http://jiyouhui.it168.com/forum.php',
meta={"page_key": 1},
callback=self.generate_forum_url_list
)
yield scrapy.Request(
'http://benyouhui.it168.com/forum.php',
meta={"page_key": 1},
callback=self.generate_forum_url_list
)
# yield scrapy.Request(
# 'http://benyouhui.it168.com/forum-962-1.html',
# meta={"page_key": 1},
# callback=self.generate_forum_page_list
# )
def parse(self, response):
list_types = Selector(response).xpath('//div[@class="listado_1"]//ul/li/a')
for types in list_types:
href= types.xpath("./@href").extract()
text = types.xpath("./text()").extract()
if Terms.filterBytype(text[0]):
type = Terms.getType(text[0])
initiative_url = Utils.createUrl(response.url,href[0])
yield scrapy.Request(initiative_url,errback=self.errback_httpbin,callback=self.initiatives, meta={'type': type})
"""
urlsa = ""
urlsa = "http://www.congreso.es/portal/page/portal/Congreso/Congreso/Iniciativas/Indice%20de%20Iniciativas?_piref73_1335503_73_1335500_1335500.next_page=/wc/servidorCGI&CMD=VERLST&BASE=IW12&PIECE=IWC2&FMT=INITXD1S.fmt&FORM1=INITXLUS.fmt&DOCS=100-100&QUERY=%28I%29.ACIN1.+%26+%28161%29.SINI."
yield scrapy.Request(urlsa, errback=self.errback_httpbin, callback=self.oneinitiative,
meta={'type': u"Proposición no de Ley en Comisión"})
"""
def initiatives(self, response):
type = response.meta['type']
first_url = Selector(response).xpath('//div[@class="resultados_encontrados"]/p/a/@href').extract()[0]
num_inis = Selector(response).xpath('//div[@class="SUBTITULO_CONTENIDO"]/span/text()').extract()
split = first_url.partition("&DOCS=1-1")
for i in range(1,int(num_inis[0])+1):
new_url = split[0]+"&DOCS="+str(i)+"-"+str(i)+split[2]
initiative_url = Utils.createUrl(response.url,new_url)
CheckItems.addElement(initiative_url)
if Blacklist.getElement(initiative_url):
if not Blacklist.getElement(initiative_url):
yield scrapy.Request(initiative_url,errback=self.errback_httpbin,
callback=self.oneinitiative, meta = {'type':type})
else:
yield scrapy.Request(initiative_url,errback=self.errback_httpbin,
callback=self.oneinitiative, meta = {'type':type})
def recursiveDS(self,response):
text = response.meta['texto']
item = response.meta['item']
links = response.meta['allDS']
text += self.searchDS(response, ref=item["ref"], name=item["url"])
if not links:
item["contenido"].append(text)
yield item
else:
first_url = links[0]
Utils.delfirstelement(links)
yield scrapy.Request(Utils.createUrl(response.url, first_url), callback=self.recursiveDS,
dont_filter=True, meta={'item': item, 'allDS': links, "texto": text})
def parse(self, response):
item = response.css('div.listBox ul li ')
hrefs = item.css('div.listimg a::attr(href)').extract()
# titles = item.css('div.listInfo h3 p::text').extract()
# logging.log(logging.INFO, "parse " + len(hrefs))
# ???????????????????????parse_movie??
for href in hrefs:
# logging.log(logging.INFO, "hrefs[" + index + "]=" + href)
try:
yield scrapy.Request(response.urljoin(href),
callback=self.parse_movie)
except Exception as e:
continue
# ????????
next_page_str = u'???'
rex = '//div[@class="pagebox"]/a[contains(text(), "%s")]/@href' % next_page_str
next_page = response.xpath(rex).extract_first()
# ????????????????????????????????????????????
if next_page is not None:
next_page = response.urljoin(next_page)
yield scrapy.Request(next_page, callback=self.parse)
def test_clear(self):
self.assertEqual(len(self.q), 0)
for i in range(10):
# XXX: can't use same url for all requests as SpiderPriorityQueue
# uses redis' set implemention and we will end with only one
# request in the set and thus failing the test. It should be noted
# that when using SpiderPriorityQueue it acts as a request
# duplication filter whenever the serielized requests are the same.
# This might be unwanted on repetitive requests to the same page
# even with dont_filter=True flag.
req = Request('http://example.com/?page=%s' % i)
self.q.push(req)
self.assertEqual(len(self.q), 10)
self.q.clear()
self.assertEqual(len(self.q), 0)
def test_queue(self):
req1 = Request('http://example.com/page1', priority=100)
req2 = Request('http://example.com/page2', priority=50)
req3 = Request('http://example.com/page2', priority=200)
self.q.push(req1)
self.q.push(req2)
self.q.push(req3)
out1 = self.q.pop()
out2 = self.q.pop()
out3 = self.q.pop()
self.assertEqual(out1.url, req3.url)
self.assertEqual(out2.url, req1.url)
self.assertEqual(out3.url, req2.url)
def test_scheduler_persistent(self):
# TODO: Improve this test to avoid the need to check for log messages.
self.spider.log = mock.Mock(spec=self.spider.log)
self.scheduler.persist = True
self.scheduler.open(self.spider)
self.assertEqual(self.spider.log.call_count, 0)
self.scheduler.enqueue_request(Request('http://example.com/page1'))
self.scheduler.enqueue_request(Request('http://example.com/page2'))
self.assertTrue(self.scheduler.has_pending_requests())
self.scheduler.close('finish')
self.scheduler.open(self.spider)
self.spider.log.assert_has_calls([
mock.call("Resuming crawl (2 requests scheduled)"),
])
self.assertEqual(len(self.scheduler), 2)
self.scheduler.persist = False
self.scheduler.close('finish')
self.assertEqual(len(self.scheduler), 0)
def parse(self,response):
origin_url = response.url
if "index" not in origin_url:
soup = BeautifulSoup(response.body,"lxml")
catalogue = soup.find("a",class_ = "blue CurrChnlCls").get("title").strip()
news_list = soup.find("div", class_ = "lie_main_m").find_all("li")
for news in news_list:
title = news.find("a").text.strip()
news_url = "http://www.cnta.gov.cn/xxfb" + news.find("a").get("href")[2:]
news_no = news_url.rsplit("/",1)[-1].split(".")[0]
item = NewsItem(
news_url =news_url,
title = title,
news_no = news_no,
catalogue = catalogue,
)
yield scrapy.Request(item["news_url"],callback=self.parse_news,meta={'item':item})
else:
topic_url = origin_url.rsplit(".",1)[0]
self.flag.setdefault(topic_url,0)
yield scrapy.Request(origin_url,callback=self.parse_topic)
def parse(self, response):
origin_url = response.url
#http://money.163.com/special/002526O5/transport_02.html
search_result = re.search(r"_(\d)*?\.",origin_url)
#????
pageindex = search_result.group(1) if search_result else 1
soup = BeautifulSoup(response.body,"lxml")
news_list = soup("div",class_="list_item clearfix")
for news in news_list:
news_date = news.find("span",class_="time").text if news.find("span",class_="time")else None
title = news.find("h2").text if news.find("h2") else None
news_url = news.find("h2").a.get("href",None) if news.find("h2") else None
abstract = news.find("p").contents[0] if news.find("p") else None
item = NewsItem(title=title,news_url=news_url,abstract=abstract,news_date=news_date)
item = judge_news_crawl(item) #??????????
if item:
request = scrapy.Request(news_url,callback=self.parse_news,meta={"item":item})
yield request
else:
self.flag = int(pageindex)
if not self.flag:
next_url = self.next_url % int(pageindex)+1
yield scrapy.Request(next_url)
def next_page_parse(self,response):
html = response.body
url = response.url
np_soup = BeautifulSoup(html,"lxml")
#???<div id="last2" lastTime="1467972702826" pageIndex="2" style="display:none;"></div>
res = np_soup.find(name="div",attrs={"lasttime":True})
lasttime = res.get("lasttime",None) if res else None
pageindex = res.get("pageindex",None)if res else None
for i in self.fetch_newslist(np_soup):
request = scrapy.Request(i['news_url'],callback=self.parse_news)
request.meta['item'] = i
request.meta["pageindex"] = i
yield request
#????
if not self.flag and lasttime:
pageindex = str(int(pageindex)+1)
new_url = re.sub(r'pageidx=.*?&lastTime=.*',"pageidx=%s&lastTime=%s" % (pageindex,lasttime),url,1)
yield scrapy.Request(new_url, callback=self.next_page_parse)
# else:
#log.msg("can't find lasttime or pageindex", level=log.INFO)
def parse(self , response):
origin_url = response.url
soup = BeautifulSoup(response.body,"lxml")
temp_soup = soup.find('div',id = "ess_ctr10789_ModuleContent") if soup.find('div',id = "ess_ctr10789_ModuleContent") else None
if temp_soup:
news_list = temp_soup.find_all("a" , href = re.compile("http://www.toptour.cn/tab"))
for news in news_list:
news_url = news.get("href")
title = news.text.strip()
item = NewsItem(
news_url = news_url,
title = title,
catalogue = u"???"
)
yield scrapy.Request(item["news_url"],callback=self.parse_news,meta={'item':item})
else:
logger.warning("%s can't find news_list " % origin_url)
def parse(self, response):
"""parse crawl page
:response: TODO
:returns: None
"""
# debug
# from scrapy.shell import inspect_response
# inspect_response(response, self)
for i in range(1, self.page+1):
yield scrapy.Request(
response.request.url + '%s' % (i),
self.parse_ip,
dont_filter=True,
)
def logged_in(self, response):
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
sql = 'select * from section'
cursor.execute(sql)
for row in cursor.fetchall():
item = ByrbbsArticleItem()
item['section_url'] = row[1]
yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS,
callback=self.parse_article_list)
# ???????????
# self.start_urls = ['https://bbs.byr.cn/board/BM_Market']
# item = ByrbbsArticleItem()
# item['section_url'] = 'board/BM_Market'
# return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item},
# headers=HEADERS, callback=self.parse_article_list)
# ??????????????????????
def logged_in(self, response):
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
sql = 'select * from section'
cursor.execute(sql)
for row in cursor.fetchall():
item = ByrbbsArticleItem()
item['section_url'] = row[1]
yield scrapy.Request(response.urljoin(row[1]), meta={'cookiejar': response.meta['cookiejar'], 'item': item}, headers=HEADERS,
callback=self.parse_article_list_pre)
# ???????????
# self.start_urls = ['https://bbs.byr.cn/board/BUPTPost']
# item = ByrbbsArticleItem()
# item['section_url'] = 'BUPTPost'
# return scrapy.Request(self.start_urls[0], meta={'cookiejar': response.meta['cookiejar'], 'item': item},
# headers=HEADERS, callback=self.parse_article_list)
# ?????????
def parse(self, response):
soup = BeautifulSoup(response.body, 'html.parser')
#?????
infos = soup.findAll(attrs={'class': 'item-mod'})
# ????
pagesUrl = soup.find(attrs={'class': 'list-page'})
print("????????")
# ????
number = int(pagesUrl.find(attrs={'class': 'total'}).em.string)
# ??????????50???
pages = number // 50
if (number % 50 > 0):
pages = pages + 1
print("??" + str(pages))
purl = pagesUrl.find(attrs={'class': 'pagination'}).a['href']
purl = purl[0:-3]
for i in range(1, pages + 1):
temp = purl + "p" + str(i) + "/"
print("???????????" + temp)
print("????" + temp)
yield scrapy.Request(temp, callback=self.parse_item)
print("??????")
def parse_item(self, response):
soup = BeautifulSoup(response.body, 'html.parser')
# ?????
infos = soup.findAll(attrs={'class': 'item-mod'})
for q in infos:
if 'data-link' in str(q):
item = AjkItem()
item['title'] = q.h3.a.string
print(q.h3.a.string)
item['detailUrl'] = q.h3.a.get('href')
print(q.h3.a.get('href'))
print(q.find(attrs={'class': 'address'}).a.string)
if q.find(attrs={'class': 'price'}) != None:
item['price'] = q.find(attrs={'class': 'price'}).span.string
print(q.find(attrs={'class': 'price'}).span.string)
else:
item['price'] = q.find(attrs={'class': 'favor-tag around-price'}).span.string + 'around'
print(q.find(attrs={'class': 'favor-tag around-price'}).span.string + 'around')
# item['telephone'] = q.find(attrs={'class': 'tel'}).contents[1]
# print(q.find(attrs={'class': 'tel'}).string)
yield scrapy.Request(url=q.h3.a.get('href'), callback=self.parse_item2)