def get_torrent(self, response):
sel = Selector(response)
cl_title = sel.xpath('//td[@class="h"]/text()[2]').extract_first()
cl_bankuai = sel.xpath('//div[@class="t3"]/table/tr/td/b/a[2]/text()').extract_first()
cl_url = response.url
torrent = re.search('rmdown\.com(.+?)</a>', response.body)
torrent_url = 'http://www.' + torrent.group()[:-4]
posted = sel.xpath('//div[@class="tipad"]/text()').extract()[1]
posted = posted.encode('utf-8')[9:-7]
yield Request(
url=torrent_url,
meta={
'cl_title': cl_title,
'cl_bankuai': cl_bankuai,
'cl_url': cl_url,
'posted': posted,
},
callback=self.parse_item,
dont_filter=True)
python类Selector()的实例源码
def get_first_page(self, response):
request_state = self.if_too_many_request(response.body, 'first_page')
registrant = response.meta['registrant']
if (request_state == False):
s = Selector(text=response.body)
content = u'//table[@class="sf-grid" and @id = "sf-grid"]/tr/td[@class = "lf"]/a/img[@alt="..."]/../@href'
domain_url_list = s.xpath(content).extract()
content2 = u'//table[@class="sf-grid" and @id = "sf-grid"]/tr'
s_list = s.xpath(content2)
domain_url_list2 = []
for s in s_list:
url2 = s.xpath('td[@class = "lf"]/a/img[@alt="..."]/../@href').extract()[0]
domain_url_list2.append(url2)
for url in domain_url_list2:
cookie = get_cookie()
url = "https://www.benmi.com" + url
item = RwhoisRegistrantItem()
item['registrant'] = registrant
yield scrapy.Request(url, headers=self.head, meta={'cookie': cookie, 'item': item},
cookies={"__cfduid": cookie[1], "cf_clearance": cookie[2],
"BenmiUserInfo2": "Benmi-UN=hahaha321",
"SITEINFO": "66b/UN0Nvf1MujwHhivXoluFewMFC48CdOZ9YpNXKEg=; "},
callback=self.get_domain_name, dont_filter=True)
def get_first_page(self, response):
request_state = self.if_too_many_request(response.body, 'first_page')
registrant = response.meta['registrant']
if (request_state == False):
s = Selector(text=response.body)
content = u'//table[@class="sf-grid" and @id = "sf-grid"]/tr/td[@class = "lf"]/a/img[@alt="..."]/../@href'
domain_url_list = s.xpath(content).extract()
content2 = u'//table[@class="sf-grid" and @id = "sf-grid"]/tr'
s_list = s.xpath(content2)
domain_url_list2 = []
for s in s_list:
url2 = s.xpath('td[@class = "lf"]/a/img[@alt="..."]/../@href').extract()[0]
domain_url_list2.append(url2)
for url in domain_url_list2:
cookie = get_cookie()
url = "https://www.benmi.com" + url
item = RwhoisRegistrantItem()
item['registrant'] = registrant
yield scrapy.Request(url, headers=self.head, meta={'cookie': cookie, 'item': item},
cookies={"__cfduid": cookie[1], "cf_clearance": cookie[2],
"BenmiUserInfo2": "Benmi-UN=hahaha321",
"SITEINFO": "66b/UN0Nvf1MujwHhivXoluFewMFC48CdOZ9YpNXKEg=; "},
callback=self.get_domain_name, dont_filter=True)
def get_first_page(self, response):
request_state = self.if_too_many_request(response.body, 'first_page')
registrant = response.meta['registrant']
if (request_state == False):
s = Selector(text=response.body)
content = u'//table[@class="sf-grid" and @id = "sf-grid"]/tr/td[@class = "lf"]/a/img[@alt="..."]/../@href'
domain_url_list = s.xpath(content).extract()
content2 = u'//table[@class="sf-grid" and @id = "sf-grid"]/tr'
s_list = s.xpath(content2)
domain_url_list2 = []
for s in s_list:
url2 = s.xpath('td[@class = "lf"]/a/img[@alt="..."]/../@href').extract()[0]
domain_url_list2.append(url2)
for url in domain_url_list2:
cookie = get_cookie()
url = "https://www.benmi.com" + url
item = RwhoisRegistrantItem()
item['registrant'] = registrant
yield scrapy.Request(url, headers=self.head, meta={'cookie': cookie, 'item': item},
cookies={"__cfduid": cookie[1], "cf_clearance": cookie[2],
"BenmiUserInfo2": "Benmi-UN=hahaha321",
"SITEINFO": "66b/UN0Nvf1MujwHhivXoluFewMFC48CdOZ9YpNXKEg=; "},
callback=self.get_domain_name, dont_filter=True)
def parse(self, response):
item = DoubanspiderItem()
selector = Selector(response)
Movies = selector.xpath('//div[@class="info"]')
for eachMovie in Movies:
title = eachMovie.xpath('div[@class="hd"]/a/span[@class="title"]/text()').extract()
movieInfo = eachMovie.xpath('div[@class="bd"]/p/text()').extract()
star = eachMovie.xpath('div[@class="bd"]/div[@class="star"]/span[@class="rating_num"]/text()').extract()
quote = eachMovie.xpath('div[@class="bd"]/p[@class="quote"]/span/text()').extract()
item['title'] = title
item['movieInfo'] = ';'.join(movieInfo)
item['star'] = star
item['quote'] = quote
# ??item
yield item
nextLink = selector.xpath('//span[@class="next"]/link/@href').extract()
if nextLink:
nextLink = nextLink[0]
print(nextLink)
yield Request(self.url + nextLink,callback=self.parse)
def response_parse(response):
global pending_requests
# using scrapy selector to extract data from the html
selector = Selector(text=response['body'])
# get the url of repositories
for href in selector.css("#subcategories-div > section > div > div.cat-item > a::attr('href')"):
# we count the number of requests using this var
pending_requests += 1
# open a new request
write_line('''
{
"type": "request",
"id": "category",
"url": "http://www.dmoz.org%s"
}
''' % href.extract())
def response_category(response):
global pending_requests
# this response is no longer pending
pending_requests -= 1
# using scrapy selector
selector = Selector(text=response['body'])
# get div with link and title
divs = selector.css('div.title-and-desc')
for div in divs:
url = div.css("a::attr('href')").extract_first();
title = div.css("a > div.site-title::text").extract_first();
result[title] = url
# if finished all requests, we can close the spider
if pending_requests == 0:
# serialize the extracted data and close the spider
open('outputs/dmoz_data.json', 'w').write(json.dumps(result))
write_line('{"type": "close"}')
def parse(self, response):
selector = Selector(response=response)
articles = selector.xpath('//*[@id="main"]/*/div[@class="post-box"]')
timeline = db.get_collection('timeline')
for item in articles:
try:
title = item.xpath('div[@class="post-header"]/p/a/text()').extract()[0]
# link URL
url = item.xpath('div[@class="post-header"]/p/a/@href').extract()[0]
description = item.xpath('*/div[@class="post-expert"]/text()').extract()[0]
description = self._join_text(description)
# image URL
img = item.xpath('*/div[@class="post-info"]/a/img/@data-original').extract()[0]
# YYYY-MM-DD
#date = item.xpath('*/div[@class="post-date"]/text()').extract()[0].strip()
date = item.xpath('div[@class="post-content"]/div[@class="post-footer"]/div[@class="post-date"]/text()').extract()[0]
date = datetime.strptime(date, '%Y-%m-%d')
self.save(title=title, url=url, description=description,
img=img, date=date)
except IndexError:
continue
next_page = selector.xpath(u'//*/div[@class="page-navigator"]/li/a[text()="??? »"]/@href').extract()[0]
yield Request(response.urljoin(next_page), self.parse)
def parse_ph_key(self, response):
selector = Selector(response)
logging.debug('request url:------>' + response.url)
# logging.info(selector)
divs = selector.xpath('//div[@class="phimage"]')
for div in divs:
viewkey = re.findall('viewkey=(.*?)"', div.extract())
# logging.debug(viewkey)
yield Request(url='https://www.pornhub.com/embed/%s' % viewkey[0],
callback=self.parse_ph_info)
url_next = selector.xpath(
'//a[@class="orangeButton" and text()="Next "]/@href').extract()
logging.debug(url_next)
if url_next:
# if self.test:
logging.debug(' next page:---------->' + self.host + url_next[0])
yield Request(url=self.host + url_next[0],
callback=self.parse_ph_key)
# self.test = False
def parse_ph_info(self, response):
phItem = PornVideoItem()
selector = Selector(response)
_ph_info = re.findall('flashvars_.*?=(.*?);\n', selector.extract())
logging.debug('PH???JSON:')
logging.debug(_ph_info)
_ph_info_json = json.loads(_ph_info[0])
duration = _ph_info_json.get('video_duration')
phItem['video_duration'] = duration
title = _ph_info_json.get('video_title')
phItem['video_title'] = title
image_url = _ph_info_json.get('image_url')
phItem['image_url'] = image_url
link_url = _ph_info_json.get('link_url')
phItem['link_url'] = link_url
quality_480p = _ph_info_json.get('quality_480p')
phItem['quality_480p'] = quality_480p
logging.info('duration:' + duration + ' title:' + title + ' image_url:'
+ image_url + ' link_url:' + link_url)
yield phItem
def parse_downurl(self,response):
try:
antivirus1 =response.css("#static_antivirus").extract()[0]
antivirus = Selector(response).css("#static_antivirus").extract()[0]
# ?Static Analysis ------ Antivirus????
antiresult = re.findall("((Microsoft|Kaspersky|ESET\-NOD32)</td>\n\s*<td>\n\s*<span class=\"text\-error\")",antivirus.encode("utf-8"),re.S)
# ?????????????????eset????????????????????
if antiresult == []:
return
# ?????????????
url = response.xpath("//a[contains(@class,'btn-primary')]/@href").extract()[0].encode('utf-8')
url = urlparse.urljoin("https://malwr.com",url)
item = MalwrItem()
item['file_urls'] = [url]
return item
except Exception,e:
pass
return
android_apps_spider.py 文件源码
项目:Android-Repackaged-App-Detection-System
作者: M157q
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def parse_xpath(self, response, xpath):
appItemList = []
sel = Selector(response)
for url in sel.xpath(xpath).extract():
url = urljoin(response.url, url)
log.msg("Catch an application: %s" % url, level=log.INFO)
appItem = AppItem()
appItem['url'] = url
appItemList.append(appItem)
return appItemList
#def parse_anzhi(self, response, xpath):
# appItemList = []
# hxs = HtmlXPathSelector(response)
# for script in hxs.select(xpath).extract():
# id = re.search(r"\d+", script).group()
# url = "http://www.anzhi.com/dl_app.php?s=%s&n=5" % (id,)
# appItem = AppItem()
# appItem['url'] = url
# appItemList.append(appItem)
# return appItemList
def parse_articles(self, response):
article_ptn = "http://www.theglobeandmail.com/opinion/(.*?)/article(\d+)/"
resp_url = response.url
article_m = re.match(article_ptn, resp_url)
article_id = ''
if article_m != None:
article_id = article_m.group(2)
if article_id == '32753320':
print('***URL***', resp_url)
soup = BeautifulSoup(response.text, 'html.parser')
text = Selector(text=response.text).xpath('//*[@id="content"]/div[1]/article/div/div[3]/div[2]').extract()
if text:
print("*****in Spider text*****", soup.title.string)
yield {article_id: {"title": soup.title.string, "link": resp_url, "article_text": text}}
comments_link = response.url + r'comments/'
if comments_link == 'http://www.theglobeandmail.com/opinion/a-fascists-win-americas-moral-loss/article32753320/comments/':
yield Request(comments_link, callback=self.parse_comments)
def parse(self, response):
sel = Selector(response)
#items = []
#????url???
item = CSDNBlogItem()
article_url = str(response.url)
article_name = sel.xpath('//div[@id="article_details"]/div/h1/span/a/text()').extract()
item['article_name'] = [n.encode('utf-8') for n in article_name]
item['article_url'] = article_url.encode('utf-8')
yield item
#????????url
urls = sel.xpath('//li[@class="next_article"]/a/@href').extract()
for url in urls:
print url
url = "http://blog.csdn.net" + url
print url
yield Request(url, callback=self.parse)
def parse(self, response):
while True:
try:
products = Selector(response).xpath('//div[@class="grid-uniform grid--center wide--grid--middle"]//div[contains(@class,"grid__item")]')
for product in products:
item = KithItem()
item['name'] = product.xpath('div/div/a[1]/img/@alt').extract()[0]
item['link'] = "https://kith.com" + product.xpath('div/div/a[1]/@href').extract()[0]
# item['image'] = "https:" + product.xpath('div/div/a[1]/img/@src').extract()[0]
item['size'] = "https://kith.com/cart/add.js?id=" + product.xpath('div/div/a[2]/div/*/div[1]/@data-value').extract()[0] + "&quantity=1"
yield item
yield Request(KithURL, callback=self.parse, dont_filter=True, priority=0)
except:
pass
def crawl_ips():
#???????ip??
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0"}
for i in range(1568):
re = requests.get("http://www.xicidaili.com/nn/{0}".format(i), headers=headers)
selector = Selector(text=re.text)
all_trs = selector.css("#ip_list tr")
ip_list = []
for tr in all_trs[1:]:
speed_str = tr.css(".bar::attr(title)").extract()[0]
if speed_str:
speed = float(speed_str.split("?")[0])
all_texts = tr.css("td::text").extract()
ip = all_texts[0]
port = all_texts[1]
proxy_type = all_texts[5]
ip_list.append((ip, port, proxy_type, speed))
for ip_info in ip_list:
cursor.execute(
"insert proxy_ip(ip, port, speed, proxy_type) VALUES('{0}', '{1}', {2}, 'HTTP')".format(
ip_info[0], ip_info[1], ip_info[3]
)
)
conn.commit()
def finishtext(self,response):
finishitem = response.meta['fisnishitem']
finishitem['contenido'] = []
text = Selector(response).xpath('//div[@class="texto_completo"]').extract()[0]
text= self.extractbyref(text=text,ref=finishitem['ref'])
if text=="":
try:
text += Selector(response).xpath('//div[@class="texto_completo"]').extract()[0]
except:
CheckSystem.systemlog("No tiene texto para 'TEXTOFINAL' " + response.url + "ITEM URL "+finishitem['url'])
finishitem['contenido'].append(Utils.removeHTMLtags(text))
yield finishitem
def searchDS(self, response , number = None ,ref = None , name = None):
try:
text = Selector(response).xpath('//div[@class="texto_completo"]').extract()
return Utils.removeForDS(text[0])
except:
return "URL rota"
def extracttext(self, response, number, ref):
textfragment = self.fragmenttxt(response,number)
res = ""
#Es el texto entero y no hay que fragmentar
if not Utils.checkownRef(textfragment,ref):
return Utils.removeHTMLtags(textfragment)
texto = self.extractbyref(textfragment,ref,number)
pages = Selector(response).xpath('//a/@name').extract()
#para empezar desde el indice
#bbusca mas texto
hasfirsttext = False
if Utils.isDiferentFirstTime(textfragment,ref):
hasfirsttext=True
if not hasfirsttext:
pages = Utils.convertPagToNum(pages)
try:
index = pages.index(number)
except:
index=0
for page in pages[index:]:
if int(page) > int(number):
textfragment = self.fragmenttxt(response, page)
texto += self.extractother(textfragment, ref)
#si encuentra el otro rompe bucle
if Utils.checkotherRefandnotOwn(textfragment,ref):
break
res = Utils.removeHTMLtags(texto)
return res
def fragmenttxt(self, response,number):
pages = Selector(response).xpath('//p/a/@name').extract()
text = Selector(response).xpath('//div[@class="texto_completo"]').extract()
result = []
control = False
try:
firstopage = Utils.getnumber(pages[0])
except:
firstopage= "1"
control = True
# selecciona del texto solo la pagina que nos resulta útil
splittext = text[0].split("<br><br>")
for i in splittext:
if Utils.checkPage(i,number):
control = True
continue
elif int(number) < int(firstopage):
control = True
if control and Utils.checkPage(i,str(int(number)+1)):
break
if control:
result.append(i)
return Utils.concatlist(result)