def test_adjust_timeout():
mw = _get_mw()
req1 = scrapy.Request("http://example.com", meta = {
'splash': {'args': {'timeout': 60, 'html': 1}},
# download_timeout is always present,
# it is set by DownloadTimeoutMiddleware
'download_timeout': 30,
})
req1 = mw.process_request(req1, None)
assert req1.meta['download_timeout'] > 60
req2 = scrapy.Request("http://example.com", meta = {
'splash': {'args': {'html': 1}},
'download_timeout': 30,
})
req2 = mw.process_request(req2, None)
assert req2.meta['download_timeout'] == 30
python类http()的实例源码
def parse(self, response):
selector = Selector(response)
ID = response.meta["ID"]
text0 = selector.xpath('body/div[@class="u"]/div[@class="tip2"]').extract_first()
info = InfoItem()
if text0:
num_tweets = re.findall(u'\u5fae\u535a\[(\d+)\]', text0) # ???
num_follows = re.findall(u'\u5173\u6ce8\[(\d+)\]', text0) # ???
num_fans = re.findall(u'\u7c89\u4e1d\[(\d+)\]', text0) # ???
if num_tweets:
info["num_tweets"] = int(num_tweets[0])
if num_follows:
info["num_follows"] = int(num_follows[0])
if num_fans:
info["num_fans"] = int(num_fans[0])
url_information1 = "http://weibo.cn/%s/info" % ID
yield Request(url=url_information1, meta={"item":info,"ID":ID}, dont_filter=True, callback=self.parse1)
def parse3(self, response):
""" ????????????ID """
selector = Selector(response)
text2 = selector.xpath('body//table/tr/td/a/@href').extract()
next_urls = []
for elem in text2:
elem = re.findall('uid=(\d+)', elem)
if elem:
next_urls.append(int(elem[0]))
self.next_ID.pop()
self.next_ID.append(random.choice(next_urls))
self.temp = next_urls[0]
try:
next_url = "http://weibo.cn/u/%s" % self.next_ID[-1]
yield Request(url=next_url, dont_filter=True, callback=self.parse)
except:
self.next_ID.pop()
self.next_ID.append(self.temp)
next_url = "http://weibo.cn/u/%s" % self.temp
yield Request(url=next_url, dont_filter=True, callback=self.parse)
def parse(self, response):
selector = Selector(response)
ID = response.meta["ID"]
text0 = selector.xpath('body/div[@class="u"]/div[@class="tip2"]').extract_first()
info = InfoItem()
if text0:
num_tweets = re.findall(u'\u5fae\u535a\[(\d+)\]', text0) # ???
num_follows = re.findall(u'\u5173\u6ce8\[(\d+)\]', text0) # ???
num_fans = re.findall(u'\u7c89\u4e1d\[(\d+)\]', text0) # ???
if num_tweets:
info["num_tweets"] = int(num_tweets[0])
if num_follows:
info["num_follows"] = int(num_follows[0])
if num_fans:
info["num_fans"] = int(num_fans[0])
url_information1 = "http://weibo.cn/%s/info" % ID
yield Request(url=url_information1, meta={"item":info,"ID":ID}, dont_filter=True, callback=self.parse1)
def parse3_fans(self, response):
""" ????????????ID """
selector = Selector(response)
text2 = selector.xpath('body//table/tr/td/a/@href').extract()
url_main = response.meta["url_main"]
ID_ = response.meta["ID"]
for elem in text2:
elem = re.findall('uid=(\d+)', elem)
if elem:
ID = int(elem[0])
if ID not in self.friends_id: # ??ID????????????
self.friends_id.add(ID)
url_next = selector.xpath(
u'body//div[@class="pa" and @id="pagelist"]/form/div/a[text()="\u4e0b\u9875"]/@href').extract()
if url_next:
yield Request(url="http://weibo.cn%s" % url_next[0], meta={"url_main":url_main,"ID":ID_}, callback=self.parse3_fans)
else:
self.fans_finish = True
if self.fans_finish and self.follows_finish:
yield Request(url=url_main, meta={"ID":ID_}, dont_filter=True, callback=self.parse)
def parse3_follows(self, response):
""" ????????????ID """
selector = Selector(response)
text2 = selector.xpath('body//table/tr/td/a/@href').extract()
url_main = response.meta["url_main"]
ID_ = response.meta["ID"]
for elem in text2:
elem = re.findall('uid=(\d+)', elem)
if elem:
ID = int(elem[0])
if ID not in self.friends_id: # ??ID????????????
self.friends_id.add(ID)
url_next = selector.xpath(
u'body//div[@class="pa" and @id="pagelist"]/form/div/a[text()="\u4e0b\u9875"]/@href').extract()
if url_next:
yield Request(url="http://weibo.cn%s" % url_next[0], meta={"url_main":url_main,"ID":ID_}, callback=self.parse3_follows)
else:
self.follows_finish = True
if self.fans_finish and self.follows_finish:
yield Request(url=url_main, meta={"ID":ID_}, dont_filter=True, callback=self.parse)
def parse(self, response):
hxs = scrapy.Selector(response)
slots_tutorials = hxs.xpath('//td[@class="slot slot-tutorial"]')
for slot in slots_tutorials:
speakers_tutorials = slot.xpath('//span[@class="speaker"]/text()').extract()
urls_tutorials = slot.xpath('//span[@class="title"]//@href').extract()
talks_tutorials = slot.xpath('//span[@class="title"]//a/text()').extract()
indexSpeaker=0
for speaker in speakers_tutorials:
yield Request(url=''.join(('http://www.pydata.org', urls_tutorials[indexSpeaker])),
callback=self.parse_details,
meta={'speaker': speaker.strip(), 'url': urls_tutorials[indexSpeaker],
'talk': talks_tutorials[indexSpeaker]}
)
indexSpeaker=indexSpeaker+1
def parse(self, response):
hxs = scrapy.Selector(response)
slots_tutorials = hxs.xpath('//td[@class="slot slot-tutorial"]')
for slot in slots_tutorials:
speakers_tutorials = slot.xpath('//span[@class="speaker"]/text()').extract()
urls_tutorials = slot.xpath('//span[@class="title"]//@href').extract()
talks_tutorials = slot.xpath('//span[@class="title"]//a/text()').extract()
indexSpeaker=0
for speaker in speakers_tutorials:
yield Request(url=''.join(('http://www.pydata.org', urls_tutorials[indexSpeaker])),
callback=self.parse_details,
meta={'speaker': speaker.strip(), 'url': urls_tutorials[indexSpeaker],
'talk': talks_tutorials[indexSpeaker]}
)
indexSpeaker=indexSpeaker+1
def parse(self,response):
# filename = 'xueshu.html'
# with open(filename, 'wb') as f:
# f.write(response.body)
for sel in response.xpath('//div[@srcid]'):
item=XueshuItem()
for cell in sel.xpath('div[1]'):
item['title']=cell.xpath('h3//a//text()').extract()
item['link']=cell.xpath('h3/a/@href').extract()
item['author']=cell.xpath('div[1]/span[1]//a/text()').extract()
link='http://xueshu.baidu.com'+cell.xpath('h3/a/@href').extract()[0]
item['publish']=cell.xpath('div[1]/span[2]/a/@title').extract()
item['year']=cell.xpath('div[1]/span[3]/text()').extract()
item['cite']=cell.xpath('div[1]/span[4]/a/text()').extract()
item['abstract']=self.get_abstract(link)
# self.log(self.get_abstract(link))
item['subject']=sel.xpath('div[2]/div[1]//a/text()').extract()
yield item
def parse(self, response):
article_nodes = response.css('#block-content-article .mainer .item a.title')
for article_node in article_nodes:
article_url = urlparse.urljoin(response.url, str(article_node.css("::attr(href)").extract_first(
""))) # "http://www.acfun.cn" + str(article_node.css("::attr(href)").extract_first(""))
yield Request(url=article_url, callback=self.parse_detail, dont_filter=True)
next_nodes = response.css(".pager")
next_node = next_nodes[len(next_nodes) - 1]
next_url = str(next_node.css("::attr(href)").extract_first(""))
if next_url:
next_url = urlparse.urljoin(response.url, next_url)
yield Request(url=next_url, callback=self.parse, dont_filter=True)
def check_login(self):
req = Request(
url='http://dig.chouti.com/',
method='GET',
callback=self.show,
cookies=self.cookie_dict,
dont_filter=True
)
yield req
def show(self, response):
# print(response)
hxs = HtmlXPathSelector(response)
news_list = hxs.select('//div[@id="content-list"]/div[@class="item"]')
for new in news_list:
# temp = new.xpath('div/div[@class="part2"]/@share-linkid').extract()
link_id = new.xpath('*/div[@class="part2"]/@share-linkid').extract_first()
yield Request(
url='http://dig.chouti.com/link/vote?linksId=%s' % (link_id,),
method='POST',
cookies=self.cookie_dict,
callback=self.do_favor
)
page_list = hxs.select('//div[@id="dig_lcpage"]//a[re:test(@href, "/all/hot/recent/\d+")]/@href').extract()
for page in page_list:
page_url = 'http://dig.chouti.com%s' % page
import hashlib
hash = hashlib.md5()
hash.update(bytes(page_url, encoding='utf-8'))
key = hash.hexdigest()
if key in self.has_request_set:
pass
else:
self.has_request_set[key] = page_url
yield Request(
url=page_url,
method='GET',
callback=self.show
)
def parse_page(self, response):
item = BroadItem()
soup = BeautifulSoup(response.text, "lxml")
title = response.xpath('//title/text()').extract()
if len(title) > 0:
item['title'] = ''.join(title[0].replace('|', ',').\
replace('\"', '').replace('\'', '').\
replace('(', '[').replace(')', ']').\
replace('#', '').split())
else:
item['title'] = ''
print item['title']
print response.url
item['url'] = response.url
item['date'] = obtain_d(response)
print item['date']
divs = soup.findAll('div')
div_dic = {}
for div in divs:
ps = div.findAll('p')
div_dic[len(ps)] = div
if len(div_dic) == 0:
item['content'] = "none"
else:
div_dic = sorted(div_dic.iteritems(), key=lambda d:d[0], reverse=True)
ps = div_dic[0][1].findAll('p')
images = div_dic[0][1].findAll('img')
item['image_urls'] = ''
for img in images:
try:
if 'http' in img['src']:
item['image_urls'] += img['src'] + '\n'
except Exception as e:
pass
text = ""
for p in ps:
text += p.text
item['content'] = text.replace('"', '\'\'')
return item
def parse_item0(self,response):
provinceUrlList=re.findall(r'<b><a href="(/w/.*?)" title=".*?">.*?</a></b>',response.body)
for url in provinceUrlList:
yield Request(url="http://www.a-hospital.com{}".format(url),callback=self.parse_item)
def parse_item(self, response):
i = HospitalItem() #http://www.a-hospital.com/w/%E5%9B%9B%E5%B7%9D%E7%9C%81%E5%8C%BB%E9%99%A2%E5%88%97%E8%A1%A8
province=urllib.unquote(response.url[len("http://www.a-hospital.com/w/"):])
for name,content in re.findall(r'<li><b><a href=".*?" title=".*?">(.*?)</a>.*?</b>[\s\S]*?<ul>([\s\S]*?)</ul>[\s\S]*?</li>',response.body):
i['hospitalName'] = name.decode('utf-8')
content=content.decode("utf-8")
hospitalAddress=re.findall(u"<b>????</b>[:|?](.*?)</li>",content)
hospitalPhoneNumber= re.findall(u"<b>????</b>[:|?](.*?)</li>",content)
hospitalLevel = re.findall(u"<b>????</b>[:|?](.*?)</li>",content)
hospitalType=re.findall(u"<b>????</b>[:|?](.*?)</li>",content)
hospitalFaxNumber=re.findall(u"<b>????</b>[:|?](.*?)</li>",content)
hospitalEmail= re.findall(u"<b>????</b>[:|?](.*?)</li>",content)
hospitalWebsite= re.findall(u'<b>????</b>[:|?]<a href="(.*?)" class="external free" rel="nofollow" target="_blank">.*?</a></li>',content)
if hospitalAddress:
i["hospitalAddress"]=hospitalAddress[0]
if hospitalPhoneNumber:
i['hospitalPhoneNumber']= hospitalPhoneNumber[0]
if hospitalLevel:
i['hospitalLevel']=hospitalLevel[0]
if hospitalType:
i['hospitalType']=hospitalType[0]
if hospitalFaxNumber:
i['hospitalFaxNumber']=hospitalFaxNumber[0]
if hospitalEmail:
i['hospitalEmail']=hospitalEmail[0]
if hospitalWebsite:
i['hospitalWebsite']=hospitalWebsite[0]
i['hospitalProvince']=province.decode('utf-8')
yield i
def start_requests(self):
for i in range(1, 11):
url = self.base_url + str(i) + '_1' + self.end_Url
yield Request(url, self.parse) # ???????
yield Request('http://www.23us.com/quanben/1', self.parse) # ???????
def parse(self, response):
max_num = BeautifulSoup(response.text, 'lxml').find(
'div', class_='pagelink').find_all('a')[-1].get_text()
baseurl = str(response.url)[:27]
for num in range(1, int(max_num) + 1):
if baseurl == 'http://www.23us.com/quanben':
url = baseurl + '/' + str(num)
else:
url = baseurl + '_' + str(num) + self.end_Url
yield Request(url, callback=self.get_name)
def start_requests(self):
for page_num in range(1, 10, 1):
# ??????
url = 'http://www.ximalaya.com/dq/' + str(page_num) + '/'
yield Request(url=url, headers=self.headers, callback=self.parse)
# ?????
def content_parse(self, response):
logging.info(response.url)
# ??????
sound_ids = response.xpath('//div[@class="personal_body"]/@sound_ids').extract_first().split(',')
for i in sound_ids:
sound_json_url = 'http://www.ximalaya.com/tracks/{}.json'.format(i)
yield Request(url=sound_json_url, headers=self.headers, callback=self.json_parse)
def start_requests(self):
for page_num in range(1, 33, 1):
# ??????
url = 'http://www.tuzigh.com/forum/299653{id}171299380/6{tid}' + str(page_num) + '0178299/6897{name}.html'
yield Request(url=url, headers=self.headers, callback=self.parse)
# ?????