def parse(self, response):
"""
1. ???????????url???scrapy????????
2. ??????url???scrapy????? ???????parse
"""
# ???????????url???scrapy????????
if response.status == 404:
self.fail_urls.append(response.url)
self.crawler.stats.inc_value("failed_url")
#?extra?list????????
post_nodes = response.css("#archive .floated-thumb .post-thumb a")
for post_node in post_nodes:
#??????url
image_url = post_node.css("img::attr(src)").extract_first("")
post_url = post_node.css("::attr(href)").extract_first("")
#request?????????parse_detail??????????
# Request(url=post_url,callback=self.parse_detail)
yield Request(url=parse.urljoin(response.url, post_url), meta={"front_image_url": image_url}, callback=self.parse_detail)
#??href?????????
#response.url + post_url
print(post_url)
# ????????scrapy????
next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
if next_url:
yield Request(url=parse.urljoin(response.url, next_url), callback=self.parse)
python类Request()的实例源码
def process_spider_output(self, response, result, spider):
"""record this page
"""
mongo_uri=spider.crawler.settings.get('MONGO_URI')
mongo_db=spider.crawler.settings.get('MONGO_DB')
client = pymongo.MongoClient(mongo_uri)
db = client[mongo_db]
def add_field(request, response):
if isinstance(request, Request):
db[self.collection_name].update_one(
{},
{'$set': {'page_url': response.request.url}},
upsert=True)
return True
ret = [req for req in result if add_field(req, response)]
client.close()
return ret
def start_requests(self):
url = 'https://www.assetstore.unity3d.com/login'
yield Request(
url = url,
headers = {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Host': 'www.assetstore.unity3d.com',
'Referer': 'https://www.assetstore.unity3d.com/en/',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:50.0) Gecko/20100101 '
'Firefox/50.0',
'X-Kharma-Version': '0',
'X-Requested-With': 'UnityAssetStore',
'X-Unity-Session': '26c4202eb475d02864b40827dfff11a14657aa41',
},
meta = {
},
dont_filter = True,
callback = self.get_unity_version,
errback = self.error_parse,
)
def login(self,response):
cookie_jar = CookieJar()
cookie_jar.extract_cookies(response,response.request)
for k,v in cookie_jar._cookies.items():
for i,j in v.items():
for m,n in j.items():
self.cookie_dict[m] = n.value
req = Request(
url='http://dig.chouti.com/login',
method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'},
body='phone=13331167937&password=zds819918&oneMonth=1',
cookies=self.cookie_dict,
callback=self.check_login
)
yield req
bova11_chrome_spider.py 文件源码
项目:spread-knowledge-repository
作者: danieldev13
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def parse(self, response):
"""
Parses the first request and request the click event on the confirmation button
"""
self.driver.get(settings.request_url)
while True:
try:
next_req = self.driver.find_element_by_class_name('submit')
yield Request(settings.confirmation_url, callback=self.parse_callback)
next_req.click()
break
except Exception as err:
logging.error(err)
break
# Waiting to close browser... This gives enough time to download the file.
time.sleep(settings.sleep_time)
downloaded_file = get_download_folder() + '\\' + settings.downloaded_file_name
moved_file = settings.destination_path + settings.new_file_name
move_file(downloaded_file, moved_file)
delete_file(downloaded_file)
bova11_firefox_spider.py 文件源码
项目:spread-knowledge-repository
作者: danieldev13
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def parse(self, response):
"""
Parses the first request and request the click event on the confirmation button
"""
self.driver.get(settings.request_url)
while True:
try:
next_req = self.driver.find_element_by_class_name('submit')
yield Request(settings.confirmation_url, callback=self.parse_callback)
next_req.click()
break
except Exception as err:
logging.error(err)
break
self.driver.close()
# Waiting to close browser... This gives enough time to download the file.
time.sleep(settings.sleep_time)
downloaded_file = get_download_folder() + '\\' + settings.downloaded_file_name
moved_file = settings.destination_path + settings.new_file_name
move_file(downloaded_file, moved_file)
def start_requests(self):
for cityid, cityname in cityids.items():
url = 'http://wthrcdn.etouch.cn/weather_mini?citykey=%s' % cityid
yield Request(
url = url,
method = 'GET',
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Host': 'wthrcdn.etouch.cn',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:50.0) Gecko/20100101 '
'Firefox/50.0',
},
meta = {
'cityid': cityid,
'cityname': cityname,
},
callback = self.get_sk_2d_weather,
)
def parse(self, response):
'''
1.????????????url,???scrapy??????????
2.??????url???scrapy?????????????parse
:param response:
:return:
'''
#???????????url????scrapy???????
post_nodes = response.css("#archive .floated-thumb .post-thumb a")
for post_node in post_nodes:
#image_url??????
image_url = post_node.css("img::attr(src)").extract_first("")
post_url = post_node.css("::attr(href)").extract_first("")
#????meta??????url????????parse.urljoin?????????????response.url???
# ???????response.url?post_url???
yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":parse.urljoin(response.url,image_url)},callback=self.parse_detail)
#????????scrapy??
next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
if next_url:
yield Request(url=next_url,callback=self.parse)
def relations(self,response):
self.obj.get(response.url)
followees_a = self.obj.find_elements_by_xpath('//a[@class="UserLink-link"]')
#pdb.set_trace()
#followees_a = response.xpath('//a[@class="UserLink-link"]/@href').extract()
followees = []
for one in followees_a:
try:
one = one.get_attribute('href')
followees.append(one.replace('https://www.zhihu.com/people/',''))
except:
pass
followees = list(set(followees))
#pdb.set_trace()
response.meta['item']['relations_id']+=followees
nextpage_button = response.xpath('//button[@class="Button PaginationButton PaginationButton-next Button--plain"]').extract()
if nextpage_button:
#pdb.set_trace()
nextpage_url = response.url.replace('?page='+str(response.meta['page']),'') + "?page=" + str(response.meta['page']+1)
yield Request(nextpage_url,callback=self.relations,meta={'page':response.meta['page']+1,'item':response.meta['item']})
else:
yield response.meta['item']
for user in followees:
yield Request('https://www.zhihu.com/people/'+user+'/answers',callback=self.parse)
def parse_relation(self,response):
json_result = str(response.body,encoding="utf8").replace('false','0').replace('true','1')
dict_result = eval(json_result)
relations_id = []
for one in dict_result['data']:
relations_id.append(one['url_token'])
response.meta['item']['relations_id'] = relations_id
if response.meta['offset'] == 0:
response.meta['item']['relation_type'] = response.meta['relation_type']
else:
response.meta['item']['relation_type'] = 'next:' + response.meta['relation_type']
#pdb.set_trace()
yield response.meta['item']
for one in response.meta['item']['relations_id']:
yield Request('https://www.zhihu.com/api/v4/members/'+one+'?include=locations,employments,industry_category,gender,educations,business,follower_count,following_count,description,badge[?(type=best_answerer)].topics',meta={'user_id':one},callback=self.parse)
#pdb.set_trace()
if dict_result['paging']['is_end'] == 0:
#pdb.set_trace()
offset = response.meta['offset'] + 20
next_page = re.findall('(.*offset=)\d+',response.url)[0]
#pdb.set_trace()
yield Request(next_page + str(offset),callback=self.parse_relation,meta={'item':response.meta['item'],'offset':offset,'relation_type':response.meta['relation_type']})
def parse_answer(self,response):
json_result = str(response.body,encoding="utf8").replace('false','0').replace('true','1')
dict_result = eval(json_result)
for one in dict_result['data']:
item = AnswerItem()
item['answer_user_id'] = response.meta['answer_user_id']
item['answer_id'] = one['id']
item['question_id'] = one['question']['id']
#pdb.set_trace()
item['cretated_time'] = one['created_time']
item['updated_time'] = one['updated_time']
item['voteup_count'] = one['voteup_count']
item['comment_count'] = one['comment_count']
item['content'] = one['content']
yield item
if dict_result['paging']['is_end'] == 0:
offset = response.meta['offset'] + 20
next_page = re.findall('(.*offset=)\d+',response.url)[0]
yield Request(next_page + str(offset),callback=self.parse_answer,meta={'answer_user_id':response.meta['answer_user_id'],'offset':offset})
def parse_question(self,response):
list_item = response.xpath('//div[@class="List-item"]')
for one in list_item:
item = QuestionItem()
item['ask_user_id'] = response.meta['ask_user_id']
title = one.xpath('.//div[@class="QuestionItem-title"]')
item['title'] = title.xpath('./a/text()').extract()[0]
item['question_id'] = title.xpath('./a/@href').extract()[0].replace('/question/','')
content_item = one.xpath('.//div[@class="ContentItem-status"]//span/text()').extract()
item['ask_time'] = content_item[0]
item['answer_count'] = content_item[1]
item['followees_count'] = content_item[2]
yield item
next_page = response.xpath('//button[@class="Button PaginationButton PaginationButton-next Button--plain"]/text()').extract()
if next_page:
response.meta['page'] += 1
next_url = re.findall('(.*page=)\d+',response.url)[0] + str(response.meta['page'])
yield Request(next_url,callback=self.parse_question,meta={'ask_user_id':response.meta['ask_user_id'],'page':response.meta['page']})
def parse_article(self,response):
json_result = str(response.body,encoding="utf8").replace('false','0').replace('true','1')
dict_result = eval(json_result)
for one in dict_result['data']:
item = ArticleItem()
item['author_id'] = response.meta['author_id']
item['title'] = one['title']
item['article_id'] = one['id']
item['content'] = one['content']
#pdb.set_trace()
item['cretated_time'] = one['created']
item['updated_time'] = one['updated']
item['voteup_count'] = one['voteup_count']
item['comment_count'] = one['comment_count']
yield item
if dict_result['paging']['is_end'] == 0:
offset = response.meta['offset'] + 20
next_page = re.findall('(.*offset=)\d+',response.url)[0]
yield Request(next_page + str(offset),callback=self.parse_article,meta={'author_id':response.meta['author_id'],'offset':offset})
def _crawl(self, start_file_path, fake_url, items=None, connector=None):
"""
:param start_file_path: file path of start file
:param fake_url: The fake url for Request
:param connector: Connector instance
:param items: List of jobs item to use as "job database". Default is empty list
:return: list of job items
"""
if items is None:
items = []
if connector is None:
connector = SpiderTestConnector(items)
request = Request(url=fake_url)
start_response = fake_response_from_file(
start_file_path,
request=request,
response_class=HtmlResponse
)
self._spider = self._get_prepared_spider()()
self._spider.set_connector(connector)
return list(self._parse_spider_response(self._spider.parse(start_response)))
def login_verify(self, response):
if response.url == self.login_verify_url:
self.is_login = True
self.login_time = time.mktime(time.strptime(\
response.headers['Date'], \
'%a, %d %b %Y %H:%M:%S %Z')) + (8 * 60 * 60)
time.sleep(1)
return [FormRequest(self.submit_url,
formdata = {
'problem_id': self.problem_id,
'language': LANGUAGE.get(self.language, '0'),
'source': self.source,
'submit': 'Submit',
'encoded': '1'
},
callback = self.after_submit,
dont_filter = True
)]
else:
return Request(self.start_urls[0], callback=self.parse_start_url)
def parse(self, response):
sel = Selector(response)
self.item = AccountItem()
self.item['oj'] = 'poj'
self.item['username'] = self.username
if self.is_login:
try:
self.item['rank'] = sel.xpath('//center/table/tr')[1].\
xpath('.//td/font/text()').extract()[0]
self.item['accept'] = sel.xpath('//center/table/tr')[2].\
xpath('.//td/a/text()').extract()[0]
self.item['submit'] = sel.xpath('//center/table/tr')[3].\
xpath('.//td/a/text()').extract()[0]
yield Request(self.accepted_url % self.username,
callback = self.accepted
)
self.item['status'] = 'Authentication Success'
except:
self.item['status'] = 'Unknown Error'
else:
self.item['status'] = 'Authentication Failed'
yield self.item
def accepted(self, response):
sel = Selector(response)
next_url = sel.xpath('//p/a/@href')[2].extract()
table_tr = sel.xpath('//table')[-1].xpath('.//tr')[1:]
for tr in table_tr:
name = tr.xpath('.//td/a/text()').extract()[0]
problem_id = tr.xpath('.//td[3]/a/text()').extract()[0].strip()
submit_time = tr.xpath('.//td/text()').extract()[-1]
self.solved[problem_id] = submit_time
self.item['solved'] = self.solved
if table_tr:
yield Request('http://' + self.allowed_domains[0] + '/' + next_url,
callback = self.accepted
)
yield self.item
def parse_search_page(self, response):
# handle current page
for item in self.parse_tweets_block(response.body):
yield item
# get next page
tmp = self.reScrollCursor.search(response.body)
if tmp:
query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
scroll_cursor = tmp.group(1)
url = 'https://twitter.com/i/search/timeline?q=%s&' \
'include_available_features=1&include_entities=1&max_position=%s' % \
(urllib.quote_plus(query), scroll_cursor)
yield http.Request(url, callback=self.parse_more_page)
# TODO: # get refresh page
# tmp = self.reRefreshCursor.search(response.body)
# if tmp:
# query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
# refresh_cursor=tmp.group(1)
def parse_user_0(self, response):
""" ??????-???????????????? """
user_item = UserItem()
selector = Selector(response)
text0 = selector.xpath('body/div[@class="u"]/div[@class="tip2"]').extract_first()
if text0:
num_tweets = re.findall(u'\u5fae\u535a\[(\d+)\]', text0) # ???
num_follows = re.findall(u'\u5173\u6ce8\[(\d+)\]', text0) # ???
num_fans = re.findall(u'\u7c89\u4e1d\[(\d+)\]', text0) # ???
if num_tweets:
user_item["ctweets"] = int(num_tweets[0])
if num_follows:
user_item["cfollows"] = int(num_follows[0])
if num_fans:
user_item["cfans"] = int(num_fans[0])
user_item["_id"] = response.meta["user_id"]
url_information1 = "http://weibo.cn/%s/info" % response.meta["user_id"]
yield Request(url=url_information1, meta={"item": user_item}, callback=self.parse_user_1)
def parse(self, response):
selector = Selector(response)
articles = selector.xpath('//ul[@class="article-list thumbnails"]/li')
for article in articles:
item = Jianshu2Item()
url = article.xpath('div/h4/a/@href').extract()
likeNum = article.xpath('div/div/span[2]/text()').extract()
posturl = 'http://www.jianshu.com'+url[0]
if len(likeNum) == 0:
item['likeNum'] = 0
else:
item['likeNum'] = int(likeNum[0].split(' ')[-1])
request = Request(posturl,callback=self.parse_donate)
request.meta['item'] = item
yield request
next_link = selector.xpath('//*[@id="list-container"]/div[@class="load-more"]/button/@data-url').extract()[0]
if next_link:
next_link = self.url + str(next_link)
yield Request(next_link,callback=self.parse)
def get_chapterurl(self, response):
resp = BeautifulSoup(response.text, 'lxml')
item = DingdianItem()
tds = resp.find('table').find_all('td')
category = resp.find('table').find('a').get_text()
author = tds[1].get_text()
base_url = resp.find(
'p', class_='btnlinks').find(
'a', class_='read')['href']
novel_id = str(base_url)[-6:-1].replace('/', '')
serialstatus = tds[2].get_text()
serialnumber = tds[4].get_text()
item['name'] = str(response.meta['name']).replace('\xa0', '')
item['novelurl'] = response.meta['url']
item['category'] = str(category).replace('/', '')
item['author'] = str(author).replace('\xa0', '')
item['novel_id'] = novel_id
item['serialstatus'] = str(serialstatus).replace('\xa0', '')
item['serialnumber'] = str(serialnumber).replace('\xa0', '')
yield item
yield Request(url=base_url, callback=self.get_chapter, meta={'novel_id': novel_id})
def parse_article(self,response):
hxs = Selector(response)
keyword = response.meta['keyword']
movie_name = hxs.xpath('//*[@id="content"]/h1/span[1]/text()').extract()
movie_roles_paths = hxs.xpath('//*[@id="info"]/span[3]/span[2]')
movie_roles = []
for movie_roles_path in movie_roles_paths:
movie_roles = movie_roles_path.select('.//*[@rel="v:starring"]/text()').extract()
movie_classification= hxs.xpath('//span[@property="v:genre"]/text()').extract()
douban_item = DoubanItem()
douban_item['movie_keyword'] = keyword
douban_item['movie_name'] = ''.join(movie_name).strip().replace(',',';').replace('\'','\\\'').replace('\"','\\\"').replace(':',';').replace(' ','')
douban_item['movie_roles'] = ';'.join(movie_roles).strip().replace(',',';').replace('\'','\\\'').replace('\"','\\\"').replace(':',';')
douban_item['movie_classification'] = ';'.join(movie_classification).strip().replace(',',';').replace('\'','\\\'').replace('\"','\\\"').replace(':',';')
article_link = hxs.xpath('//*[@id="review_section"]/div/div/div/h3/a/@href').extract()
tmp = "https://movie.douban.com/review/"
for item in article_link:
if tmp in item:
yield Request(item,meta={'item': douban_item},callback=self.parse_item,cookies=[{'name': 'COOKIE_NAME','value': 'VALUE','domain': '.douban.com','path': '/'},])
def parse(self, response):
# ?request.content ??? Element
items = response.xpath('//form[@name="moderate"]/*/div[@class="spaceborder"]/table/tr')
for item in items:
url_str = 'http://www.mayattt.com/'+item.xpath('./td[@class="f_title"]/a/@href').extract()[0]
title_str = ''
date_str = ''
try:
title_str = item.xpath('./td[@class="f_title"]/a/text()').extract()[0]
date_str = item.xpath('./td[@class="f_last"]/span/a/text()').extract()[0]
except:
self.logger.error('get list page failure!')
pass
yield Request(url_str, headers=self.headers, callback=self.parseImage, meta={'title': title_str,
'date': date_str})
# ??????? ??url , ??item?
def get_all_category(self, response):
self.write_file('%s/category.html' % self.log_dir, response.body)
tags = response.xpath('//table/tbody/tr/td/a/@href').extract()
for tag in tags:
res = tag.split('/')
res = res[len(res) - 1]
utils.log('tag:%s' % tag)
url = response.urljoin(tag)
yield Request(
url = url,
headers = self.headers,
dont_filter = True,
meta = {
'tag': res,
'download_timeout': 20,
# 'is_proxy': False,
},
callback = self.get_page_count,
errback = self.error_parse
)
def get_page_count(self, response):
pages = response.xpath('//div[@class="paginator"]/a/text()').extract()
page_count = int(pages[len(pages) - 1])
tag = response.meta.get('tag')
for i in range(page_count):
url = 'https://movie.douban.com/tag/%s?start=%s&type=T' % (tag, i * 20)
yield Request(
url = url,
headers = self.headers,
dont_filter = True,
meta = {
'tag': tag,
'page': i + 1,
'download_timeout': 20,
},
callback = self.get_page,
errback = self.error_parse
)
def get_page_count(self, response):
pages = response.xpath('//div[@class="paginator"]/a/text()').extract()
page_count = int(pages[len(pages) - 1])
tag = response.meta.get('tag')
for i in range(page_count):
url = 'https://book.douban.com/tag/%s?start=%s&type=T' % (tag, i * 20)
yield Request(
url = url,
headers = self.headers,
dont_filter = True,
meta = {
'tag': tag,
'page': i + 1,
'download_timeout': 20,
},
callback = self.get_page,
errback = self.error_parse
)
pictureSpider_demo.py 文件源码
项目:PythonCrawler-Scrapy-Mysql-File-Template
作者: lawlite19
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def parse(self, response):
se=Selector(response) #???????HtmlXPathSelector???
if(re.match("http://desk.zol.com.cn/fengjing/\d+x\d+/\d+.html", response.url)):#??url??????????url????
src=se.xpath("//ul[@class='pic-list2 clearfix']/li")#???ul?????li
for i in range(len(src)):#??li??
imgURLs=se.xpath("//ul[@class='pic-list2 clearfix']/li[%d]/a/img/@src"%i).extract() #??????????
titles=se.xpath("//ul[@class='pic-list2 clearfix']/li[%d]/a/img/@title"%i).extract()
if imgURLs:
realUrl=imgURLs[0].replace("t_s208x130c5","t_s2560x1600c5") #????????????????
file_name=u"%s.jpg"%titles[0] #????????
path=os.path.join("D:\pics",file_name)#??????????????F??pics????
type = sys.getfilesystemencoding()
print file_name.encode(type)
item=WebcrawlerScrapyItem() #??item??????item??,?????????????item???
item['name']=file_name
item['url']=realUrl
print item["name"],item["url"]
yield item #??item,???????item
urllib.urlretrieve(realUrl,path) #??????????????????????????????????????
all_urls=se.xpath("//a/@href").extract()#???????url
for url in all_urls:
if url.startswith("/fengjing/1920x1080/"):#??????????????
yield Request("http://desk.zol.com.cn"+url,callback=self.parse)
def start_requests(self):
#?aims????ID
#??????finished??
while self.db.Aims.find_one()!=None:
ID_item = self.db.Aims.find_one()
self.db.Aims.delete_one({'ID': ID_item['ID']})
print '-----------------------------------------'
print ID_item['ID']
print '-----------------------------------------'
ID = str(ID_item['ID'])
# self.finish_ID.add(ID)
#??????finish
if self.db.findin_finished(ID_item):
print '-----------------------------------------'
print 'WARNING: ', ID, ' already finished'
print '-----------------------------------------'
self.db.Aims.delete_one(ID_item)
continue
else:
# ????
url_information0 = "https://m.weibo.cn/api/container/getIndex?type=uid&value=%s" % ID
print url_information0
yield Request(url=url_information0, meta={"ID": ID_item['ID']}, callback=self.parseInformation)
def parseHome(self,response):
if len(response.body) > 50:
print "###########################"
print "Fetch Home Success"
print "###########################"
infos = json.loads(response.body)
if infos.get('cards', ''):
cards = infos['cards']
for card in cards:
if card['card_type'] == 6:
print '========================================='
#????ID??????
ori_ID = re.findall(r'\d+',card['actionlog']['oid'])[0]
ori_url = 'https://m.weibo.cn/api/container/getIndex?containerid={ori_id}_-_WEIBO_SECOND_PROFILE_WEIBO_ORI&type=uid&page_type=03&value={value}'.format(
ori_id = ori_ID,value=response.meta['ID']
)
print 'ori_ID:',ori_ID
yield Request(url=ori_url, meta={'ID': response.meta["ID"],'ori_id': ori_ID, 'owner':response.meta['owner']},
callback=self.parseTweets, dont_filter=True)
def parse_index(self, response):
post_nodes = response.css('#warp .list15 li')
for post_node in post_nodes:
post_url = post_node.css('::attr(href)').extract_first("")
url_get = parse.urljoin(response.url, post_url)
yield Request(url=url_get, dont_filter=True, callback=self.parse_detail)
print(parse.urljoin(response.url, post_url))
next_urls = response.css('#warp .list15 .list_sort > a:nth-child(3) ::attr(href)').extract_first("")
if next_urls:
next_url = parse.urljoin(response.url, next_urls)
last_second_url = response.css('#warp .list15 .list_sort > a:nth-child(2) ::attr(href)').extract_first("")
if last_second_url != 'index248.htm':
yield Request(url=next_url, dont_filter=True, callback=self.parse_index)