def login(self,response):
cookie_jar = CookieJar()
cookie_jar.extract_cookies(response,response.request)
for k,v in cookie_jar._cookies.items():
for i,j in v.items():
for m,n in j.items():
self.cookie_dict[m] = n.value
req = Request(
url='http://dig.chouti.com/login',
method='POST',
headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'},
body='phone=13331167937&password=zds819918&oneMonth=1',
cookies=self.cookie_dict,
callback=self.check_login
)
yield req
python类http()的实例源码
def parse(self, response):
selector = Selector(response)
articles = selector.xpath('//ul[@class="article-list thumbnails"]/li')
for article in articles:
item = Jianshu2Item()
url = article.xpath('div/h4/a/@href').extract()
likeNum = article.xpath('div/div/span[2]/text()').extract()
posturl = 'http://www.jianshu.com'+url[0]
if len(likeNum) == 0:
item['likeNum'] = 0
else:
item['likeNum'] = int(likeNum[0].split(' ')[-1])
request = Request(posturl,callback=self.parse_donate)
request.meta['item'] = item
yield request
next_link = selector.xpath('//*[@id="list-container"]/div[@class="load-more"]/button/@data-url').extract()[0]
if next_link:
next_link = self.url + str(next_link)
yield Request(next_link,callback=self.parse)
def post_login(self, response):
self.logger.info('---- login start ----')
# ????????????????????? formhash ????, ????????
formhash = response.xpath('//input[@name="formhash"]/@value').extract()[0]
self.logger.info('formhash: ' + formhash)
# FormRequeset.from_response?Scrapy???????, ??post??
# ?????, ???after_login????
return [scrapy.FormRequest.from_response(response,
formdata={
'formhash': formhash,
'referer': 'http://www.mayattt.com/index.php',
'loginfield': 'username',
'username': 'mayajeiker',
'password': 'friendship',
'questionid': '0',
'cookietime': '12592000',
},
callback=self.after_login
)]
# ????????????
def parse(self, response):
# ?request.content ??? Element
items = response.xpath('//form[@name="moderate"]/*/div[@class="spaceborder"]/table/tr')
for item in items:
url_str = 'http://www.mayattt.com/'+item.xpath('./td[@class="f_title"]/a/@href').extract()[0]
title_str = ''
date_str = ''
try:
title_str = item.xpath('./td[@class="f_title"]/a/text()').extract()[0]
date_str = item.xpath('./td[@class="f_last"]/span/a/text()').extract()[0]
except:
self.logger.error('get list page failure!')
pass
yield Request(url_str, headers=self.headers, callback=self.parseImage, meta={'title': title_str,
'date': date_str})
# ??????? ??url , ??item?
pictureSpider_demo.py 文件源码
项目:PythonCrawler-Scrapy-Mysql-File-Template
作者: lawlite19
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def parse(self, response):
se=Selector(response) #???????HtmlXPathSelector???
if(re.match("http://desk.zol.com.cn/fengjing/\d+x\d+/\d+.html", response.url)):#??url??????????url????
src=se.xpath("//ul[@class='pic-list2 clearfix']/li")#???ul?????li
for i in range(len(src)):#??li??
imgURLs=se.xpath("//ul[@class='pic-list2 clearfix']/li[%d]/a/img/@src"%i).extract() #??????????
titles=se.xpath("//ul[@class='pic-list2 clearfix']/li[%d]/a/img/@title"%i).extract()
if imgURLs:
realUrl=imgURLs[0].replace("t_s208x130c5","t_s2560x1600c5") #????????????????
file_name=u"%s.jpg"%titles[0] #????????
path=os.path.join("D:\pics",file_name)#??????????????F??pics????
type = sys.getfilesystemencoding()
print file_name.encode(type)
item=WebcrawlerScrapyItem() #??item??????item??,?????????????item???
item['name']=file_name
item['url']=realUrl
print item["name"],item["url"]
yield item #??item,???????item
urllib.urlretrieve(realUrl,path) #??????????????????????????????????????
all_urls=se.xpath("//a/@href").extract()#???????url
for url in all_urls:
if url.startswith("/fengjing/1920x1080/"):#??????????????
yield Request("http://desk.zol.com.cn"+url,callback=self.parse)
def parse_detail(self, response):
content = response.css('#work span::text').extract()
reg = "^(http|https|ftp)://.*(.com|.cn|.html|.htm|.asp|.jsp)"
url = response.url
reg_url_name = ".*?(\d+)"
get_url = re.match(reg_url_name, url)
if get_url:
self.get_name = get_url.group(1)
reference_url_list = []
for each_line in content:
get_reference_url = re.match(reg, each_line)
if get_reference_url:
reference_url_list.append(get_reference_url.group(0))
self.count = 0
if reference_url_list:
for each_url in reference_url_list:
yield Request(url=each_url, dont_filter=True, callback=self.parse_reference)
self.count += 1
else:
pass
def extract_links(self, response):
"""Generate (url, source_anchor) tuples extracted from the page"""
for link in response.css('a'):
# extract the href & urljoin it to the current response
url = response.urljoin(link.xpath('@href').extract_first())
# Only follow http(s) URLs (i.e., no `javascript:` or `mailto:`).
if url.startswith('http'):
# merge text content of all child nodes of the link
anchor = " ".join(s.strip() for s in link.css('*::text').extract() if s.strip())
yield (url, anchor)
for frame in (response.css("frame") + response.css("iframe")):
relative_url = frame.css("::attr(src)").extract_first()
url = response.urljoin(relative_url)
if url.startswith("http"):
anchor = frame.css("::attr(name)").extract_first()
yield (url, anchor)
def post_get_playlist(self, response):
collection = self.db.playlist
result = json.loads(response.body, encoding='utf-8')['result']
# inserted = collection.update({'id': result['id']}, result, upsert=True) # upsert=True??insert or update
# logger.info('Update or Insert to playlist database[%s]' % (str(inserted),))
if result['id'] not in self.playlist_id_buffer:
collection.insert(result)
for song in result['tracks']:
artists = []
for detail in song['artists']:
artists.append(detail['name'])
comment_url = 'http://music.163.com/weapi/v1/resource/comments/%s/?csrf_token=' % (song['commentThreadId'],)
# ??FormRequest???POST??????????????
# Request(url, method='POST', body=json.dumps(data))
yield FormRequest(comment_url, formdata=self.post_data, callback=self.parse,
meta={'m_id': song['id'], 'm_name': song['name'], 'artists': artists})
def parse_list(self, response):
url = response.meta['splash']['args']['url']
pattern = re.compile(r'http://www.mogujie.com/book/\w+/\d+/')
if (pattern.match(url)):
page = int(pattern.split(url)[1])
url = pattern.findall(url)[0]
page += 1
url = url + str(page)
else:
url = url + '/2'
print '+++++++++++++++++++++++++ Next url:', url
req = SplashRequest(url = url, callback = self.parse_list)
yield req
pattern_detail = re.compile(r'http://shop.mogujie.com/detail/.{7}')
for item_url in pattern_detail.findall(response.body):
req = Request(url = item_url, callback = self.parse_item)
yield req
def parse(self, response):
#print '=========================', response.url
pattern_list = re.compile(r'http://www.mogujie.com/book/\w+/\d+')
#print '+++++++++++++++++++++++++', pattern_list.findall(response.body)
'''
for item_list in pattern_list.findall(response.body):
req = Request(url = item_list, callback = self.parse_list)
yield req
'''
'''
req = Request(url = 'http://www.mogujie.com/book/clothing/50249/', callback = self.parse_list, meta={
'splash': {
'endpoint': 'render.html'
},
#'dont_send_headers': True,
})
'''
for item_list in pattern_list.findall(response.body):
#req = SplashRequest(url = 'http://www.mogujie.com/book/clothing/50249/', callback = self.parse_list)
req = SplashRequest(url = item_list, callback = self.parse_list)
yield req
def __init__(self, **kw):
super(FollowAllSpider, self).__init__(**kw)
url = 'http://localhost/books.toscrape.com/index.html'
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://%s/' % url
self.url = url
self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)]
self.link_extractor = LinkExtractor()
self.cookies_seen = set()
self.previtem = 0
self.items = 0
self.timesec = datetime.datetime.utcnow()
def start_requests(self):
#####################################################################################
# topic_dict = {'1':[u'??', u'??'], '2':[u'??',u'??']}
topic_dict = {'1':[u'??'], '2':[u'??'], '3':[u'????'], '4':[u'??']}
index = 0
for id, kws_list in topic_dict.iteritems():
for kw in kws_list:
print kw
wd_code = urllib.quote(kw.encode('gbk'))
search_url = 'http://tieba.baidu.com/f/search/res?isnew=1&kw=&qw='+wd_code+'&un=&rn=10&pn=0&sd=&ed=&sm=1&only_thread=1'
# http://tieba.baidu.com/f/search/res?isnew=1&kw=&qw=%B1%B1%BE%A9&un=&rn=10&pn=0&sd=&ed=&sm=1&only_thread=1
# http://tieba.baidu.com/f/search/res?isnew=1&kw=&qw=%B1%B1%BE%A9&un=&rn=10&pn=0&sd=&ed=&sm=1
# print search_url
self.Flag_List.append(True)
self.Maxpage_List.append(self.MAX_PAGE_NUM)
print search_url
yield scrapy.Request(search_url,meta={'topic_id': id,'index':index, 'kw':kw},)
index += 1
#####################################################################################
def parse(self, response):
# print response.request.headers
# print u'~~~~', ("pp3288" in response.body)
# print u'~~~~', unicode(response.body, "utf8").encode("utf8")
#????????????????parse_albumm????
for box in response.xpath(self.config["xpathAlbumList"]):
url = box.xpath(self.config["xpathAlbumURL"]).extract()[0]
title = box.xpath(self.config["xpathAlbumTitle"]).extract()[0]
if not self.config.has_key("specificAlbums") or url in self.config["specificAlbums"]:
if not url.startswith("http") and self.config.has_key("baseAddress"):
url = self.config["baseAddress"] + url
# print u'?????', title, url
request = scrapy.Request(url, headers=self.headers, callback=self.parse_album, cookies={'title': title})
yield request
# break
#TODO????????????????parse_album_list
pass
#?????????
def get_url(self, level, key):
base_url = 'http://gaokao.chsi.com.cn/zyk/zybk/'
if level == 0:
page = 'ccCategory.action'
elif level == 1:
page = 'mlCategory.action'
elif level == 2:
page = 'xkCategory.action'
elif level == 3:
page = 'specialityesByCategory.action'
else:
raise Exception('invalid level')
return '{}{}?key={}'.format(base_url, page, key)
def start_requests(self):
# with open(getattr(self, "file", "company.csv"), "rU") as f:
# reader = csv.reader(f)
# for line in reader:
# request = Request('http://www.qichacha.com/search?key='+line[0].decode('gbk').encode('utf-8'),headers=self.headers)
# #request.meta['fields'] = line
# yield request
with open(("company.csv"), "rU") as f:
reader = csv.reader(f)
for line in reader:
request = Request('http://www.qichacha.com/search?key='+line[0],headers=self.headers)
#request.meta['fields'] = line
yield request
# def start_requests(self):
# yield Request('http://www.qichacha.com/search?key=%E5%89%8D%E6%B5%B7%E4%BA%BA%E5%AF%BF%E4%BF%9D%E9%99%A9%E8%82%A1%E4%BB%BD%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8',headers=self.headers)
def parse(self, response):
item=AutopjtItem()
#?Xpath????????
item["name"]=response.xpath("//a[@class='pic']/@title").extract()
item["price"]=response.xpath("//span[@class='price_n']/text()").extract()
item["link"]=response.xpath("//a[@class='pic']/@href").extract()
item["comnum"]=response.xpath("//a[@name='P_pl']/text()").extract()
#???item
yield item
#??????75?
for i in range(1,76):
#???????
url="http://category.dangdang.com/pg"+str(i)+"-cid4002203.html"
#?yieldRequest??????
#???
yield Request(url, callback=self.parse)
#15.5
#1
# Obey robots.txt rules
def test_nosplash():
mw = _get_mw()
cookie_mw = _get_cookie_mw()
req = scrapy.Request("http://example.com")
old_meta = copy.deepcopy(req.meta)
assert cookie_mw.process_request(req, None) is None
assert mw.process_request(req, None) is None
assert old_meta == req.meta
# response is not changed
response = Response("http://example.com", request=req)
response2 = mw.process_response(req, response, None)
response3 = cookie_mw.process_response(req, response, None)
assert response2 is response
assert response3 is response
assert response3.url == "http://example.com"
def test_magic_response2():
# check 'body' handling and another 'headers' format
mw = _get_mw()
req = SplashRequest('http://example.com/', magic_response=True,
headers={'foo': 'bar'}, dont_send_headers=True)
req = mw.process_request(req, None)
assert 'headers' not in req.meta['splash']['args']
resp_data = {
'body': base64.b64encode(b"binary data").decode('ascii'),
'headers': {'Content-Type': 'text/plain'},
}
resp = TextResponse("http://mysplash.example.com/execute",
headers={b'Content-Type': b'application/json'},
body=json.dumps(resp_data).encode('utf8'))
resp2 = mw.process_response(req, resp, None)
assert resp2.data == resp_data
assert resp2.body == b'binary data'
assert resp2.headers == {b'Content-Type': [b'text/plain']}
assert resp2.status == 200
assert resp2.url == "http://example.com/"
def test_magic_response_http_error():
mw = _get_mw()
req = SplashRequest('http://example.com/foo')
req = mw.process_request(req, None)
resp_data = {
"info": {
"error": "http404",
"message": "Lua error: [string \"function main(splash)\r...\"]:3: http404",
"line_number": 3,
"type": "LUA_ERROR",
"source": "[string \"function main(splash)\r...\"]"
},
"description": "Error happened while executing Lua script",
"error": 400,
"type": "ScriptError"
}
resp = TextResponse("http://mysplash.example.com/execute",
headers={b'Content-Type': b'application/json'},
body=json.dumps(resp_data).encode('utf8'))
resp = mw.process_response(req, resp, None)
assert resp.data == resp_data
assert resp.status == 404
assert resp.url == "http://example.com/foo"
def test_slot_policy_per_domain():
mw = _get_mw()
meta = {'splash': {
'slot_policy': scrapy_splash.SlotPolicy.PER_DOMAIN
}}
req1 = scrapy.Request("http://example.com/path?key=value", meta=meta)
req1 = mw.process_request(req1, None)
req2 = scrapy.Request("http://example.com/path2", meta=meta)
req2 = mw.process_request(req2, None)
req3 = scrapy.Request("http://fooexample.com/path?key=value", meta=meta)
req3 = mw.process_request(req3, None)
assert req1.meta.get('download_slot')
assert req3.meta.get('download_slot')
assert req1.meta['download_slot'] == req2.meta['download_slot']
assert req1.meta['download_slot'] != req3.meta['download_slot']
def test_adjust_timeout():
mw = _get_mw()
req1 = scrapy.Request("http://example.com", meta = {
'splash': {'args': {'timeout': 60, 'html': 1}},
# download_timeout is always present,
# it is set by DownloadTimeoutMiddleware
'download_timeout': 30,
})
req1 = mw.process_request(req1, None)
assert req1.meta['download_timeout'] > 60
req2 = scrapy.Request("http://example.com", meta = {
'splash': {'args': {'html': 1}},
'download_timeout': 30,
})
req2 = mw.process_request(req2, None)
assert req2.meta['download_timeout'] == 30
def test_nosplash():
mw = _get_mw()
cookie_mw = _get_cookie_mw()
req = scrapy.Request("http://example.com")
old_meta = copy.deepcopy(req.meta)
assert cookie_mw.process_request(req, None) is None
assert mw.process_request(req, None) is None
assert old_meta == req.meta
# response is not changed
response = Response("http://example.com", request=req)
response2 = mw.process_response(req, response, None)
response3 = cookie_mw.process_response(req, response, None)
assert response2 is response
assert response3 is response
assert response3.url == "http://example.com"
def test_magic_response2():
# check 'body' handling and another 'headers' format
mw = _get_mw()
req = SplashRequest('http://example.com/', magic_response=True,
headers={'foo': 'bar'}, dont_send_headers=True)
req = mw.process_request(req, None)
assert 'headers' not in req.meta['splash']['args']
resp_data = {
'body': base64.b64encode(b"binary data").decode('ascii'),
'headers': {'Content-Type': 'text/plain'},
}
resp = TextResponse("http://mysplash.example.com/execute",
headers={b'Content-Type': b'application/json'},
body=json.dumps(resp_data).encode('utf8'))
resp2 = mw.process_response(req, resp, None)
assert resp2.data == resp_data
assert resp2.body == b'binary data'
assert resp2.headers == {b'Content-Type': [b'text/plain']}
assert resp2.status == 200
assert resp2.url == "http://example.com/"
def test_magic_response_http_error():
mw = _get_mw()
req = SplashRequest('http://example.com/foo')
req = mw.process_request(req, None)
resp_data = {
"info": {
"error": "http404",
"message": "Lua error: [string \"function main(splash)\r...\"]:3: http404",
"line_number": 3,
"type": "LUA_ERROR",
"source": "[string \"function main(splash)\r...\"]"
},
"description": "Error happened while executing Lua script",
"error": 400,
"type": "ScriptError"
}
resp = TextResponse("http://mysplash.example.com/execute",
headers={b'Content-Type': b'application/json'},
body=json.dumps(resp_data).encode('utf8'))
resp = mw.process_response(req, resp, None)
assert resp.data == resp_data
assert resp.status == 404
assert resp.url == "http://example.com/foo"
def test_slot_policy_per_domain():
mw = _get_mw()
meta = {'splash': {
'slot_policy': scrapy_splash.SlotPolicy.PER_DOMAIN
}}
req1 = scrapy.Request("http://example.com/path?key=value", meta=meta)
req1 = mw.process_request(req1, None)
req2 = scrapy.Request("http://example.com/path2", meta=meta)
req2 = mw.process_request(req2, None)
req3 = scrapy.Request("http://fooexample.com/path?key=value", meta=meta)
req3 = mw.process_request(req3, None)
assert req1.meta.get('download_slot')
assert req3.meta.get('download_slot')
assert req1.meta['download_slot'] == req2.meta['download_slot']
assert req1.meta['download_slot'] != req3.meta['download_slot']
def test_adjust_timeout():
mw = _get_mw()
req1 = scrapy.Request("http://example.com", meta = {
'splash': {'args': {'timeout': 60, 'html': 1}},
# download_timeout is always present,
# it is set by DownloadTimeoutMiddleware
'download_timeout': 30,
})
req1 = mw.process_request(req1, None)
assert req1.meta['download_timeout'] > 60
req2 = scrapy.Request("http://example.com", meta = {
'splash': {'args': {'html': 1}},
'download_timeout': 30,
})
req2 = mw.process_request(req2, None)
assert req2.meta['download_timeout'] == 30
def test_nosplash():
mw = _get_mw()
cookie_mw = _get_cookie_mw()
req = scrapy.Request("http://example.com")
old_meta = copy.deepcopy(req.meta)
assert cookie_mw.process_request(req, None) is None
assert mw.process_request(req, None) is None
assert old_meta == req.meta
# response is not changed
response = Response("http://example.com", request=req)
response2 = mw.process_response(req, response, None)
response3 = cookie_mw.process_response(req, response, None)
assert response2 is response
assert response3 is response
assert response3.url == "http://example.com"
def test_magic_response2():
# check 'body' handling and another 'headers' format
mw = _get_mw()
req = SplashRequest('http://example.com/', magic_response=True,
headers={'foo': 'bar'}, dont_send_headers=True)
req = mw.process_request(req, None)
assert 'headers' not in req.meta['splash']['args']
resp_data = {
'body': base64.b64encode(b"binary data").decode('ascii'),
'headers': {'Content-Type': 'text/plain'},
}
resp = TextResponse("http://mysplash.example.com/execute",
headers={b'Content-Type': b'application/json'},
body=json.dumps(resp_data).encode('utf8'))
resp2 = mw.process_response(req, resp, None)
assert resp2.data == resp_data
assert resp2.body == b'binary data'
assert resp2.headers == {b'Content-Type': [b'text/plain']}
assert resp2.status == 200
assert resp2.url == "http://example.com/"
def test_magic_response_http_error():
mw = _get_mw()
req = SplashRequest('http://example.com/foo')
req = mw.process_request(req, None)
resp_data = {
"info": {
"error": "http404",
"message": "Lua error: [string \"function main(splash)\r...\"]:3: http404",
"line_number": 3,
"type": "LUA_ERROR",
"source": "[string \"function main(splash)\r...\"]"
},
"description": "Error happened while executing Lua script",
"error": 400,
"type": "ScriptError"
}
resp = TextResponse("http://mysplash.example.com/execute",
headers={b'Content-Type': b'application/json'},
body=json.dumps(resp_data).encode('utf8'))
resp = mw.process_response(req, resp, None)
assert resp.data == resp_data
assert resp.status == 404
assert resp.url == "http://example.com/foo"
def test_slot_policy_per_domain():
mw = _get_mw()
meta = {'splash': {
'slot_policy': scrapy_splash.SlotPolicy.PER_DOMAIN
}}
req1 = scrapy.Request("http://example.com/path?key=value", meta=meta)
req1 = mw.process_request(req1, None)
req2 = scrapy.Request("http://example.com/path2", meta=meta)
req2 = mw.process_request(req2, None)
req3 = scrapy.Request("http://fooexample.com/path?key=value", meta=meta)
req3 = mw.process_request(req3, None)
assert req1.meta.get('download_slot')
assert req3.meta.get('download_slot')
assert req1.meta['download_slot'] == req2.meta['download_slot']
assert req1.meta['download_slot'] != req3.meta['download_slot']