def parseNotFirstPage(self, response):
sipo = response.meta['sipo']
soup = BeautifulSoup(response.body_as_unicode(), 'lxml')
itemList = soup.find_all(attrs={"class": "item"})
for item in itemList:
sipocrawler = SipoCrawlerItem()
itemSoup = BeautifulSoup(item.prettify(), 'lxml')
patentid = itemSoup.find(attrs={'name': 'idHidden'}).get('value')
nrdAn = itemSoup.find(attrs={'name': 'nrdAnHidden'}).get('value')
nrdPn = itemSoup.find(attrs={'name': 'nrdPnHidden'}).get('value')
sipocrawler['patent_id'] = str(patentid)
formdata = url_config.detailSearch.get('formdata')
formdata.__setitem__('nrdAn', str(patentid).split('.')[0])
formdata.__setitem__('cid', str(patentid))
formdata.__setitem__('sid', str(patentid))
yield FormRequest(
url=url_config.detailSearch.get('url'),
formdata=formdata,
callback=self.parsePatentDetail,
meta={'sipo': sipo, 'sipocrawler': sipocrawler, 'lawinfo': {'nrdAn': nrdAn, 'nrdPn': nrdPn}}
)
# ??????
python类FormRequest()的实例源码
def login_after_captcha(self, response):
with open("captcha.jpg", "wb") as f:
f.write(response.body)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("?????\n>")
post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/phone_num"
post_data["captcha"] = captcha
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)]
def parse_dates(self, response):
"""
The data is organized by dates, the spider will
get the entire year relative data
"""
for date in response.css('select[name="mesano"] option'):
mesano = date.css('::attr(value)').extract_first()
if re.search(r"(\d{4})", mesano).group(1) == time.strftime("%Y"):
request = scrapy.FormRequest(
url=BASE_URL + 'holerite/index.html',
formdata={
'acao': '',
'grupo': GRUPO,
'mesano': mesano,
'tipo': '1'
},
callback=self.parse_entities
)
request.meta['mesano'] = mesano
yield request
def parseAfterSetting(self, response):
print(response.body_as_unicode())
for sipo in self.sipoList:
mainSearch = url_config.mainSearch
headers = mainSearch.get('headers')
searchExpCn = sipo.search_exp_cn
print('?????--- ', searchExpCn)
formData = mainSearch.get('formdata')
formData.__setitem__('searchCondition.searchExp', searchExpCn)
yield FormRequest(
url=url_config.mainSearch.get('url'),
callback=self.parseFirstPage,
method="POST",
headers=headers,
formdata=formData,
meta={'sipo': sipo}
)
# ????????
def parsePatentDetail(self, response):
sipo = response.meta['sipo']
sipocrawler = response.meta['sipocrawler']
detail = json.loads(response.body_as_unicode())
sipocrawler['abstract'] = BeautifulSoup(detail.get('abstractInfoDTO').get('abIndexList')[0].get('value'),
'lxml').text.replace('\n', '').strip()
sipocrawler['invention_name'] = detail.get('abstractInfoDTO').get('tioIndex').get('value')
for abitem in detail.get('abstractInfoDTO').get('abstractItemList'):
ItemCollection.resolveData(sipocrawler, abitem.get('indexCnName'), abitem.get('value'))
lawinfo = response.meta.get('lawinfo')
formdata = url_config.relatedInfo.get('formdata')
formdata.__setitem__('literaInfo.nrdAn', lawinfo.get('nrdAn'))
formdata.__setitem__('literaInfo.nrdPn', lawinfo.get('nrdPn'))
yield FormRequest(
url=url_config.relatedInfo.get('url'),
method='POST',
dont_filter=True, # ???????????????????????????
formdata=formdata,
callback=self.parseRelatedInfo,
meta={'sipo': sipo, 'sipocrawler': sipocrawler}
)
# ??????
def start_requests(self):
settings = get_project_settings()
city_list = settings["CITY_LIST"]
if self.city:
city_cn_name = city_list.get(self.city)
yield scrapy.FormRequest(
url=self.base_url + self.city + "_gongyu",
formdata={"startDate": self.start_date, "endDate": self.end_date},
callback=self.parse,
meta={'city_en_name': self.city, "city_cn_name": city_cn_name}
)
else:
for city_en_name, city_cn_name in city_list.items():
yield scrapy.FormRequest(
url=self.base_url + city_en_name + "_gongyu",
formdata={"startDate": self.start_date, "endDate": self.end_date},
callback=self.parse,
meta={'city_en_name': city_en_name, "city_cn_name": city_cn_name}
)
def start_requests(self):
if self.FIRST_TIME_RUNNING:
self.FIRST_TIME_RUNNING = False
for sid in (list(range(2014020000, 2014040000))
+ list(range(2015020000, 2015040000))
+ list(range(2016020000, 2016040000))):
yield scrapy.FormRequest(self.domain + self.login_url,
formdata={'zjh': str(sid), 'mm': '1'},
callback=self.parse,
meta={'sid': sid, 'password': '1', 'cookiejar': sid},
dont_filter=True)
else:
for password in self.load_passwords():
for sid in self.get_sids():
yield scrapy.FormRequest(self.domain + self.login_url,
formdata={'zjh': str(sid), 'mm': password},
callback=self.parse,
meta={'sid': sid, 'password': password, 'cookiejar': sid},
dont_filter=True)
def start_requests(self):
for start_url in self.database_urls:
url, body = start_url.split("?POST_BODY=", 1)
yield scrapy.FormRequest(
url,
method="POST",
headers={
'Content-Type': "application/x-www-form-urlencoded"
},
body=body,
meta={
'source_url': url,
'source_anchor': body
},
callback=self.parse
)
def contruct_request(self, response, post_data=None, next_page=False, other_info=None):
if post_data is not None:
encryptor = MeituanEncryptor(post_data, response.url)
else:
encryptor = response.meta["encryptor"]
post_data = encryptor.data
if next_page:
post_data["page_index"] = str(int(post_data["page_index"]) + 1)
encryptor.data = post_data
token = encryptor.get_token()
url = self.base_url2 + token
meta = {
"encryptor": encryptor,
"cookiejar": response.meta["cookiejar"],
"geo_point": response.meta["geo_point"],
"other_info": other_info if other_info is not None else {}
}
return scrapy.FormRequest(
url,
meta=meta,
formdata=post_data,
callback=self.parse_restaurant
)
def contruct_request(self, response, post_data=None, other_info=None):
if post_data is not None:
encryptor = MeituanEncryptor(post_data, response.url)
else:
encryptor = response.meta["encryptor"]
post_data = encryptor.data
token = encryptor.get_token(100010)
url = self.base_url2 + token
meta = {
"encryptor": encryptor,
"cookiejar": response.meta["cookiejar"],
"other_info": other_info if other_info is not None else {}
}
return scrapy.FormRequest(
url,
meta=meta,
formdata=post_data,
callback=self.parse_menu
)
def parse_page(self, response):
next_page = response.meta.get('page') + 1
json_data = json.loads(response.text)
if json_data.get('type') != 'success':
return
articles = scrapy.Selector(text=json_data.get('html')).css('article')
for article in articles:
yield {
'author': article.css('div.author-meta a ::text').extract_first(),
'date': article.css('div.clock-meta a ::text').extract_first(),
'title': article.css('h1.entry-title ::text').extract_first()
}
yield scrapy.FormRequest(
self.scrolling_url, formdata={'action': 'infinite_scroll', 'page': str(next_page), 'order': 'DESC'},
callback=self.parse_page, meta={'page': next_page}
)
def parse(self, response):
topic_xpath_rule = '//li[@class="zm-topic-cat-item"]/a/text()'
topic_names = response.selector.xpath(topic_xpath_rule).extract()
topic_xpath_rule = '//li[@class="zm-topic-cat-item"]/@data-id'
topic_ids = response.selector.xpath(topic_xpath_rule).extract()
# for i in range(len(topic_ids)):
print("?30???")
# for i in range(10):
for i in range(len(topic_ids)):
params = {"topic_id": int(topic_ids[i]), "offset": 0, "hash_id": "d17ff3d503b2ebce086d2f3e98944d54"}
yield FormRequest(
url='https://www.zhihu.com/node/TopicsPlazzaListV2',
method='POST',
# headers=self.set_headers2('https://www.zhihu.com/topics'),
headers=self.set_headers('https://www.zhihu.com/topics'),
cookies=cookielib.LWPCookieJar(filename='cookies'),
# formdata={'method': 'next', 'params': '{"topic_id":988,"offset":0,"hash_id":"d17ff3d503b2ebce086d2f3e98944d54"}'},
formdata={'method': 'next', 'params': str(params).replace("\'", "\"").replace(" ", "")},
callback=self.topic_parse,
meta={'topic_name': topic_names[i]}
)
def start_requests(self):
username = self.spider_settings.get('username')
password = self.spider_settings.get('password')
if username and password:
yield scrapy.FormRequest(
url='https://{}/login'.format(self.name),
formdata={'Username': username,
'Password': password,
'target': '/MyAccount/',
'submit': 'Log+in'},
callback=self._after_login,
meta={'dont_cache': True},
)
else:
# Username, password or section not found in feeds.cfg.
self.logger.info('Login failed: No username or password given. '
'Only free articles are available in full text.')
yield self._start_requests()
def start_requests(self):
abonr = self.spider_settings.get('abonr')
password = self.spider_settings.get('password')
if abonr and password:
yield scrapy.FormRequest(
url='https://www.{}/falter/e-paper/login'.format(self.name),
formdata={'login[abonr]': abonr,
'login[password]': password,
'redirect_url': '/archiv/'},
callback=self.parse_archive,
meta={'dont_cache': True},
)
else:
# Username, password or section falter.at not found in feeds.cfg.
self.logger.info('Login failed: No username or password given. '
'Only free articles are available in full text.')
yield scrapy.Request('https://www.{}/archiv/'.format(
self.name), self.parse_archive, meta={'dont_cache': True})
def parse_room_first(self, response):
id = re.findall(r'\d{3,10}', response.url)[0]
name = response.css('#listing_name::text').extract_first()
# equipment = response.css(
# 'div.row.row-condensed.text-muted.text-center.hide-sm > div > div.col-sm-3.icon--small-margin > span.text-small::text').extract()
# img = response.css('.cover-img::attr(style)').extract_first().replace('ackground-image:url', '')[1:-1]
# description = response.css('div.simple-format-container > p > span::text').extract()
# comment_num = response.css('div.col-md-8.review-header > div > h4 > span > span::text').extract_first()
owner = response.css(
'div.host-info.pull-left > div > span > a.link-reset::attr(href)').extract_first().split('?')[-1]
owner_id = response.css(
'div.host-info.pull-left > div > span > a.link-reset > span::text').extract_first()
f = furl(response.url)
f.path.add('personalization.json')
try:
del f.args['location']
except KeyError:
pass
f.args.addlist('review_ids[]',
['144474925', '141633062', '140450604', '139913674', '138701100', '138102086', '137690239'])
url = f.url
path = str(f.path) + str(f.query)
return scrapy.FormRequest(url=url, callback=self.parse_room_second,
meta={'room_id': id, 'name': name, 'owner': owner, 'owner_id': owner_id,
'parse': True})
def login(self, response):
response_text = response.text
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
if xsrf:
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": xsrf,
"phone_num": "18782902568",
"password": "admin123"
}
return [scrapy.FormRequest(
url = post_url,
formdata = post_data,
headers=self.headers,
callback=self.check_login
)]
def yield_formrequest(self, param, index, code, category):
"""
:param param: "POST" parameters
:param index: page number
:param code: company code
:param category: "abbr"??company_code?????????????; "full"??company_code?????????????
:return:
"""
post_data = {
# "Param": "????:????,????:????",
"Param": param,
"Index": repr(index),
"Page": repr(self.cases_per_page),
"Order": "????",
"Direction": "asc",
}
data = copy.deepcopy(post_data)
data["case_parties"] = code # parties: ???
data["abbr_full_category"] = category # ????????, ???
return scrapy.FormRequest(url=self.url, formdata=post_data, callback=lambda response: self.parse(response, data), dont_filter=True) # ??URL??(??url??????????yield?????URL??, ?????????)
def yield_formrequest(self, param, index, code, category):
"""
:param param: "POST" parameters
:param index: page number
:param code: company code
:param category: "abbr"??company_code?????????????; "full"??company_code?????????????
:return:
"""
post_data = {
# "Param": "????:????,????:????",
"Param": param,
"Index": repr(index),
"Page": repr(self.cases_per_page),
"Order": "????",
"Direction": "asc",
}
data = copy.deepcopy(post_data)
data["case_parties"] = code # parties: ???
data["abbr_full_category"] = category # ????????, ???
return scrapy.FormRequest(url=self.url, formdata=post_data, callback=lambda response: self.parse(response, data), dont_filter=True) # ??URL??(??url??????????yield?????URL??, ?????????)
def login(self, response):
captcha = "captcha.jpg"
with open(captcha, "wb") as f:
f.write(response.body)
try:
Image.open(captcha).show()
except:
pass
post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/{}".format(self.user_type)
post_data["captcha"] = input("Please input the captcha: ")
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)]
# ???????????start_urls??url??parse?????????
def parse(self, response):
#?? ?????? URL
for url in response.xpath('//a[@class="see-more play-button small id-track-click apps id-responsive-see-more"]'):
targetURL = "https://play.google.com" + url.xpath('@href')[0].extract()
#??POST , ??? 100 ?
yield scrapy.FormRequest(
targetURL,
formdata = {'start':'0',
'num':'100',
'numChildren':'0',
'cctcss':'square-cover',
'cllayout':'NORMAL',
'ipf':'1',
'xhr':'1',
'token':'zNTXc17yBEzmbkMlpt4eKj14YOo:1458833715345'},
callback = self.parse_data
)
def parse_login(self, response):
self._check_login_params()
self._login = False
form_data = {
self.username_field: self.username,
self.password_field: self.password
}
if hasattr(self, 'form_xpath'):
return scrapy.FormRequest.from_response(
response,
formxpath=self.form_xpath,
formdata=form_data,
callback=self.parse_after_login
)
elif hasattr(self, 'form_url'):
return scrapy.FormRequest(
self.form_url,
formdata=form_data,
callback=self.parse_after_login
)
def login_after_captcha(self, response):
with open("captcha.jpg", "wb") as f:
f.write(response.body)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("?????\n>")
post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/phone_num"
post_data["captcha"] = captcha
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)]
def parse_video(self, response):
item = response.meta['item']
# item['info']['play_count'] = response.xpath(xpath).extract_first(default='')
# if (item['info']['play_count'] == '') and (not re.findall(r'????', response.body)):
# item['info']['play_count'] = (response.xpath('//em[@id="mod_cover_playnum"]/text()')
# .extract_first(default=''))
if not self.__get_json(response):
return
if not self.__get_media_urls(item):
return
item['media_urls'] = self.media_urls
item['file_name'] = self.file_name
url = 'https://v.qq.com/x/page/{}.html'.format(self.kwargs['vid'])
meta = {
'item': item,
'vid': self.kwargs['vid'],
}
yield scrapy.FormRequest(url, method='GET', meta=meta, callback=self.parse_play_count)
def parse_video(self, response):
item = response.meta['item']
url = 'https://interface.bilibili.com/playurl'
if not self.__get_json(response):
return
try:
item['info']['play_count'] = self.json_data['play']
item['info']['intro'] = self.json_data['description']
item['info']['date'] = self.json_data['created_at']
item['info']['author'] = self.json_data['author']
except:
pass
try:
cid = self.json_data['list'][0]['cid']
except Exception as err:
self.logger.error('url: {}, error: {}'.format(self.page_url, str(err)))
return
params = self.bilibili_common.get_params(cid)
yield scrapy.FormRequest(url=url, method='GET', meta={'item': item},
formdata=params, callback=self.parse_video_urls)
def parse_video_custom(self, response):
item = response.meta['item']
json_data = json.loads(response.body[response.body.find('{'): response.body.rfind('}') + 1])
vid = self.url.split('/')[-1]
url = 'https://ups.youku.com/ups/get.json'
params = {
'vid': vid,
'ccode': '0590',
'client_ip': '0.0.0.0',
'client_ts': str(int(time.time())),
'utid': 'aKCuEcCdq38CAbaWLjWeW3TI',
'r': json_data['stealsign'],
'callback': 'json' + str(int(time.time() * 1000)),
}
yield scrapy.FormRequest(url=url, method='GET', meta={'item': item},
formdata=params, callback=self.parse_video_urls)
def parse_item(self, response):
item = MultimediaCrawlerItem()
item['host'] = 'baozoumanhua'
item['media_type'] = 'video'
item['stack'] = []
item['download'] = 0
item['extract'] = 0
item['file_dir'] = os.path.join(settings['FILES_STORE'], item['media_type'], self.name)
item['url'] = response.url
item['info'] = {
'link': item['url'],
'title': (response.xpath(r'//h1[@class="v-title"]/text()').extract_first(default='').strip()),
'intro': '',
'author': 'baozoumanhua',
}
player = self.__get_player(item['url'], response)
if player is None:
self.logger.error('url: {}, error: does not match any player'.format(item['url']))
return
yield scrapy.FormRequest(url=player.url, method=player.method, meta={'item': item},
formdata=player.params, callback=player.parse_video)
def parse(self, response):
item = MultimediaCrawlerItem()
item['host'] = 'ergeng'
item['media_type'] = 'video'
item['stack'] = []
item['download'] = 0
item['extract'] = 0
item['file_dir'] = os.path.join(settings['FILES_STORE'], item['media_type'], self.name)
item['url'] = response.url
timestamp = re.search(r'"create_at"\s*:\s*(\d+),|$', response.body).group(1)
item['info'] = {
'link': item['url'],
'title': (response.xpath(r'//div[contains(@class, "new-video-info")]/h3/text()').
extract_first(default='').strip()),
'intro': response.xpath(r'//div[contains(@class, "tj")]/text()').extract_first(default='').strip(),
'date': time.strftime('%Y-%m-%d', time.localtime(int(timestamp))) if timestamp is not None else '',
'author': re.search(r'"user_nickname"\s*:\s*"(.*?)"|$', response.body).group(1),
}
player = self.__get_player(item['url'], response)
if player is None:
self.logger.error('url: {}, error: does not match any player'.format(item['url']))
return
yield scrapy.FormRequest(url=player.url, method=player.method, meta={'item': item},
formdata=player.params, callback=player.parse_video)
def parse_video_url(self, response):
item = response.meta['item']
vid = re.search(r'id_(.*?).html|$', response.url).group(1)
if vid is None:
self.logger.error('url: {}, error: failed to find vid'.format(response.url))
return
params = {
'vid': vid,
'ccode': '0401',
'client_ip': '192.168.1.1',
'utid': 'tB2PEWHIKgECAbaWLjUeiFyE',
'client_ts': str(round(time.time())),
}
url = 'https://ups.youku.com/ups/get.json'
yield scrapy.FormRequest(url, method='GET', meta={'item': item}, formdata=params,
callback=self.parse_download_url)
def parse(self, response):
page_size = 30
user = response.meta['user']
url = 'https://space.bilibili.com/ajax/member/getSubmitVideos'
json_data = json.loads(response.body)
total = json_data['data']['video']
pages = total // page_size if not (total % page_size) else (total // page_size + 1)
for page in range(1, pages + 1):
params = {
'mid': user.id,
'pagesize': str(page_size),
'tid': '0',
'page': str(page),
'keyword': '',
'order': 'pubdate',
}
yield scrapy.FormRequest(url=url, method='GET', meta={'user': user},
formdata=params, callback=self.parse_items)
def parse(self, response):
user = response.meta['user']
num = 24
count = response.xpath('//div[@id="headBgMod"]//ul[@class="user_count"]/li[3]/span[2]/text()').extract()[0]
for page in range(1, int(math.ceil(int(count) / num)) + 1):
aa = "1.9.1"
callback = ''.join(['jQuery', re.sub(r'\D', '', aa + str(random.random())),
'_', str(int(time.time() * 1000))])
params = {
'otype': 'json',
'pagenum': str(page),
'callback': callback,
'qm': '1',
'num': str(num),
'sorttype': '0',
'orderflag': '0',
'low_login': '1',
'uin': re.search(r'data-vuin="(.*?)"', response.body).group(1),
'_': str(int(time.time() * 1000)),
}
url = 'http://c.v.qq.com/vchannelinfo'
yield scrapy.FormRequest(url, method='GET', meta={'user': user}, formdata=params, callback=self.parse_page)