def parse1(self, response):
selector = Selector(response)
infoItem = response.meta["item"]
ID = response.meta["ID"]
text1 = ";".join(selector.xpath('body/div[@class="c"]/text()').extract()) # ????????text()
nickname = re.findall(u'\u6635\u79f0[:|\uff1a](.*?);', text1) # ??
gender = re.findall(u'\u6027\u522b[:|\uff1a](.*?);', text1) # ??
place = re.findall(u'\u5730\u533a[:|\uff1a](.*?);', text1) # ???????????
signature = re.findall(u'\u7b80\u4ecb[:|\uff1a](.*?);', text1) # ????
birthday = re.findall(u'\u751f\u65e5[:|\uff1a](.*?);', text1) # ??
sexorientation = re.findall(u'\u6027\u53d6\u5411[:|\uff1a](.*?);', text1) # ???
marriage = re.findall(u'\u611f\u60c5\u72b6\u51b5[:|\uff1a](.*?);', text1) # ????
url = re.findall(u'\u4e92\u8054\u7f51[:|\uff1a](.*?);', text1) # ????
if nickname:
infoItem['nickname'] = nickname[0]
if gender:
infoItem['gender'] = gender[0]
if place:
place = place[0].split(" ")
infoItem["province"] = place[0]
if len(place) > 1:
infoItem["city"] = place[1]
if signature:
infoItem["signature"] = signature[0]
if birthday:
try:
birthday = datetime.datetime.strptime(birthday[0], "%Y-%m-%d")
infoItem["birthday"] = birthday - datetime.timedelta(hours=8)
except Exception:
pass
if sexorientation:
if sexorientation[0] == gender[0]:
infoItem["sexorientation"] = "gay"
else:
infoItem["sexorientation"] = "Heterosexual"
if marriage:
infoItem["marriage"] = marriage[0]
if url:
infoItem["url"] = url[0]
infoItem["user_id"] = ID
yield infoItem
python类selector()的实例源码
def parse_page(self, response):
self.write(response.body)
sel = Selector(response)
infos = sel.xpath('//tr[@class="cells"]').extract()
for i, info in enumerate(infos):
val = Selector(text = info)
ip = val.xpath('//td[2]/text()').extract_first()
port = val.xpath('//td[3]/text()').extract_first()
country = val.xpath('//td[5]/text()').extract_first()
anonymity = val.xpath('//td[4]/text()').extract_first()
proxy = Proxy()
proxy.set_value(
ip = ip,
port = port,
country = country,
anonymity = anonymity,
source = self.name,
)
self.add_proxy(proxy = proxy)
def parse_page(self, response):
self.write(response.body)
sel = Selector(response)
infos = sel.xpath('//ul[@class="l2"]').extract()
for i, info in enumerate(infos):
val = Selector(text = info)
ip = val.xpath('//ul[@class="l2"]/span[1]/li/text()').extract_first()
port = val.xpath('//ul[@class="l2"]/span[2]/li/text()').extract_first()
anonymity = val.xpath('//ul[@class="l2"]/span[3]/li/text()').extract_first()
https = val.xpath('//ul[@class="l2"]/span[4]/li/text()').extract_first()
country = val.xpath('//ul[@class="l2"]/span[5]/li/a/text()').extract_first()
proxy = Proxy()
proxy.set_value(
ip = ip,
port = port,
country = country,
anonymity = anonymity,
source = self.name,
)
self.add_proxy(proxy = proxy)
def parse_page(self, response):
self.write(response.body)
sel = Selector(response)
infos = sel.xpath('//tbody/tr').extract()
for i, info in enumerate(infos):
if i == 0:
continue
val = Selector(text = info)
ip = val.xpath('//td[1]/text()').extract_first()
port = val.xpath('//td[2]/text()').extract_first()
country = val.xpath('//td[3]/div/text()').extract_first()
anonymity = val.xpath('//td[6]/text()').extract_first()
proxy = Proxy()
proxy.set_value(
ip = ip,
port = port,
country = country,
anonymity = anonymity,
source = self.name,
)
self.add_proxy(proxy = proxy)
def parse(self, response):
sel=scrapy.Selector(response)
links_in_a_page = sel.xpath('//a[@href]')
for link_sel in links_in_a_page:
item=OschinaItem()
link=str(link_sel.re('href="(.*?)"')[0])
if link:
if not link.startswith('http'):
link=response.url+link
yield scrapy.Request(link,callback=self.parse)
item['link']=link
link_text=link_sel.xpath('text()').extract()
if link_text:
item['link_text']=str(link_text[0].encode('utf-8').strip())
else:
item['link_text']=None
yield item
def parse_user_0(self, response):
""" ??????-???????????????? """
user_item = UserItem()
selector = Selector(response)
text0 = selector.xpath('body/div[@class="u"]/div[@class="tip2"]').extract_first()
if text0:
num_tweets = re.findall(u'\u5fae\u535a\[(\d+)\]', text0) # ???
num_follows = re.findall(u'\u5173\u6ce8\[(\d+)\]', text0) # ???
num_fans = re.findall(u'\u7c89\u4e1d\[(\d+)\]', text0) # ???
if num_tweets:
user_item["ctweets"] = int(num_tweets[0])
if num_follows:
user_item["cfollows"] = int(num_follows[0])
if num_fans:
user_item["cfans"] = int(num_fans[0])
user_item["_id"] = response.meta["user_id"]
url_information1 = "http://weibo.cn/%s/info" % response.meta["user_id"]
yield Request(url=url_information1, meta={"item": user_item}, callback=self.parse_user_1)
def parse_user_1(self, response):
""" ??????2 """
user_item = response.meta["item"]
selector = Selector(response)
text1 = ";".join(selector.xpath('body/div[@class="c"]/text()').extract()) # ????????text()
nickname = re.findall(u'\u6635\u79f0[:|\uff1a](.*?);', text1) # ??
intro = re.findall(u'\u7b80\u4ecb[:|\uff1a](.*?);', text1) # ??
auth = re.findall(u'\u8ba4\u8bc1[:|\uff1a](.*?);', text1) # ????
gender = re.findall(u'\u6027\u522b[:|\uff1a](.*?);', text1) # ??
place = re.findall(u'\u5730\u533a[:|\uff1a](.*?);', text1) # ???????????
birthday = re.findall(u'\u751f\u65e5[:|\uff1a](.*?);', text1) # ??
sexorientation = re.findall(u'\u6027\u53d6\u5411[:|\uff1a](.*?);', text1) # ???
marriage = re.findall(u'\u611f\u60c5\u72b6\u51b5[:|\uff1a](.*?);', text1) # ????
url = re.findall(u'\u4e92\u8054\u7f51[:|\uff1a](.*?);', text1) # ????
if nickname:
user_item["nickname"] = nickname[0]
if auth:
user_item["auth"] = auth[0]
if intro:
user_item["intro"] = intro[0]
user_item['t'] = time.strftime('%Y-%m-%d', time.localtime(time.time()))
yield user_item
def get_xicidaili():
url = "http://www.xicidaili.com/nn/%s"
for i in range(1, 2):
page_url = url % str(i)
print(page_url)
s = requests.session()
req = s.get(page_url, headers=headers)
selector = Selector(text=req.text)
ip_nodes = selector.xpath("//table//tr")
for each in ip_nodes[1:]:
ip = each.xpath("./td[2]/text()").extract()[0]
port = each.xpath("./td[3]/text()").extract()[0]
http_type = each.xpath("./td[6]/text()").extract()[0]
if http_type == "HTTP":
proxies = {
"http": "%s://%s:%s" % ("http", ip, port),
"https": "%s://%s:%s" % ("http", ip, port),
}
try:
r = requests.get('http://www.ip138.com/', proxies=proxies, timeout=5)
if r.status_code == 200:
print("%s:%s is valid" % (ip, port))
except:
print("%s:%s is not valid" % (ip, port))
def parse(self,response):
sel = scrapy.Selector(response)
article_info = sel.xpath("//a")
for info in article_info:
item = GovcrawlItem()
link = info.xpath('@href').extract()
if not link:
continue
position = link[0].find("/")
if position < 0 or "?" not in link[0]:
continue
elif "http" not in link[0]:
url = response.url + link[0][position:]
else:
url = link[0]
yield scrapy.Request(url,callback=self.parse)
item['link'] = url
title = info.xpath('text()').extract()
if title:
item['title'] = title[0]
else:
item['title'] = None
#print item['link']
yield item
def parse_page(self, response):
next_page = response.meta.get('page') + 1
json_data = json.loads(response.text)
if json_data.get('type') != 'success':
return
articles = scrapy.Selector(text=json_data.get('html')).css('article')
for article in articles:
yield {
'author': article.css('div.author-meta a ::text').extract_first(),
'date': article.css('div.clock-meta a ::text').extract_first(),
'title': article.css('h1.entry-title ::text').extract_first()
}
yield scrapy.FormRequest(
self.scrolling_url, formdata={'action': 'infinite_scroll', 'page': str(next_page), 'order': 'DESC'},
callback=self.parse_page, meta={'page': next_page}
)
def parse_item(self, response):
item = CrawlmeizituItem()
selector = scrapy.Selector(response)
image_title = selector.xpath('//h2/a/text()').extract()
image_url = selector.xpath('//h2/a/@href').extract()
image_tags = selector.xpath('//div[@class="metaRight"]/p/text()').extract()
if selector.xpath('//*[@id="picture"]/p/img/@src').extract():
image_src = selector.xpath('//*[@id="picture"]/p/img/@src').extract()
else:
image_src = selector.xpath('//*[@id="maincontent"]/div/p/img/@src').extract()
if selector.xpath('//*[@id="picture"]/p/img/@alt').extract():
pic_name = selector.xpath('//*[@id="picture"]/p/img/@alt').extract()
else:
pic_name = selector.xpath('//*[@id="maincontent"]/div/p/img/@alt').extract()
#//*[@id="maincontent"]/div/p/img/@alt
item['title'] = image_title
item['url'] = image_url
item['tags'] = image_tags
item['src'] = image_src
item['alt'] = pic_name
print(item)
time.sleep(1)
yield item
def parse(self,response):
sel = Selector(response)
keys = sel.xpath('//*[@class="menu_main job_hopping"]/h2/text()').extract()
i = 1
item = defaultdict(list)
for key in keys:
if key.strip() != '':
print "test"
print key.strip()
try:
print i
item[key.strip()].append(sel.xpath('//*[@class="menu_box"][{}]/div[2]/dl/dd/a/text()'.format(i)).extract())
i = i + 1
# item["key"].append(key)
except Exception, e:
print e
else:
continue
yield item
def fas_browse_suppliers_using_every_sector_filter(
context: Context, actor_alias: str):
actor = context.get_actor(actor_alias)
session = actor.session
response = fas_ui_find_supplier.go_to(session, term="")
context.response = response
sector_filters_selector = "#id_sectors input::attr(value)"
content = response.content.decode("utf-8")
sector_filters = Selector(text=content).css(
sector_filters_selector).extract()
results = {}
for sector in sector_filters:
logging.debug(
"%s will browse Suppliers by Industry sector filter '%s'",
actor_alias, sector
)
response = fas_ui_find_supplier.go_to(session, sectors=[sector])
results[sector] = {
"url": response.request.url,
"sectors": [sector],
"response": response
}
context.results = results
def fas_browse_suppliers_by_invalid_sectors(
context: Context, actor_alias: str):
actor = context.get_actor(actor_alias)
session = actor.session
response = fas_ui_find_supplier.go_to(session, term="")
context.response = response
sector_selector = "#id_sectors input::attr(value)"
content = response.content.decode("utf-8")
filters = Selector(text=content).css(sector_selector).extract()
sectors = list(set(choice(filters)
for _ in range(randrange(1, len(filters)))))
sectors.append("this_is_an_invalid_sector_filter")
logging.debug(
"%s will browse Suppliers by multiple Industry sector filters and will"
" inject an invalid filter: '%s'",
actor_alias, ", ".join(sectors)
)
context.response = fas_ui_find_supplier.go_to(session, sectors=sectors)
def fas_should_see_filtered_search_results(context, actor_alias):
results = context.results
sector_filters_selector = "#id_sectors input"
for industry, result in results.items():
context.response = result["response"]
content = result["response"].content.decode("utf-8")
filters = Selector(text=content).css(sector_filters_selector).extract()
for fil in filters:
sector = Selector(text=fil).css("input::attr(value)").extract()[0]
checked = True if Selector(text=fil).css("input::attr(checked)").extract() else False
if sector in result["sectors"]:
with assertion_msg(
"Expected search results to be filtered by '%s' sector"
" but this filter was not checked!"):
assert checked
else:
with assertion_msg(
"Expected search results to be filtered only by "
"following sectors '%s', but they are also filtered "
"by '%s'!", ", ".join(result['sectors']), sector):
assert not checked
logging.debug(
"%s was presented with '%s' industry search results correctly "
"filtered by following sectors: '%s'", actor_alias, industry,
", ".join(result['sectors']))
def fas_should_see_highlighted_search_term(context, actor_alias, search_term):
response = context.response
content = response.content.decode("utf-8")
search_summaries_selector = ".ed-company-search-summary"
summaries = Selector(text=content).css(search_summaries_selector).extract()
tag = "em"
keywords = [surround(keyword, tag) for keyword in search_term.split()]
founds = []
for summary in summaries:
founds += [(keyword in summary) for keyword in keywords]
with assertion_msg(
"Expected to see at least 1 search result with highlighted search "
"term: '%s'".format(", ".join(keywords))):
assert any(founds)
logging.debug(
"{alias} found highlighted search {term}: '{keywords}' {founds} {times}"
" in {results} search results".format(
alias=actor_alias, term="terms" if len(keywords) > 1 else "term",
keywords=", ".join(keywords), founds=len([f for f in founds if f]),
times="times" if len([f for f in founds if f]) > 1 else "time",
results=len(summaries)))
def parse_url_list(self,response):
sel = scrapy.Selector(response)
wait_text = sel.xpath("//p[@id='loading']//text()").extract()
if wait_text:
#???
meta = response.meta
meta['isscreen'] = 1
#scrapy ???URL?????????url???
yield scrapy.Request(response.url, meta=meta, callback=self.parse_validate,dont_filter=True)
else:
#????html??
url_list = sel.xpath("//h4[@class='weui_media_title']/@hrefs").extract()
for li in url_list:
href = li.strip()
url = 'http://mp.weixin.qq.com%s' % href
#print(url)
yield scrapy.Request(url, meta=self.meta, callback=self.parse_item)
def enrich_wrapper(func):
"""
item_loader???pickle ?????????response???selector??, ???????
???enrich??????selector????????selector
:param func:
:return:
"""
@wraps(func)
def wrapper(*args, **kwargs):
item_loader = args[1]
response = args[2]
selector = Selector(text=response.text)
item_loader.selector = selector
result = func(*args, **kwargs)
item_loader.selector = None
return result
return wrapper
def parse_page(self, response):
self.write(response.body)
sel = Selector(response)
infos = sel.xpath('//tr[@class="cells"]').extract()
for i, info in enumerate(infos):
self.log(info)
val = Selector(text = info)
ip = val.xpath('//td[2]/text()').extract_first()
port = val.xpath('//td[3]/text()').extract_first()
country = val.xpath('//td[5]/text()').extract_first()
anonymity = val.xpath('//td[4]/text()').extract_first()
proxy = Proxy()
proxy.set_value(
ip = ip,
port = port,
country = country,
anonymity = anonymity,
source = self.name,
)
self.add_proxy(proxy = proxy)
def parse_page(self, response):
self.write(response.body)
sel = Selector(response)
infos = sel.xpath('//ul[@class="l2"]').extract()
for i, info in enumerate(infos):
val = Selector(text = info)
ip = val.xpath('//ul[@class="l2"]/span[1]/li/text()').extract_first()
port = val.xpath('//ul[@class="l2"]/span[2]/li/text()').extract_first()
anonymity = val.xpath('//ul[@class="l2"]/span[3]/li/text()').extract_first()
https = val.xpath('//ul[@class="l2"]/span[4]/li/text()').extract_first()
country = val.xpath('//ul[@class="l2"]/span[5]/li/a/text()').extract_first()
proxy = Proxy()
proxy.set_value(
ip = ip,
port = port,
country = country,
anonymity = anonymity,
source = self.name,
)
self.add_proxy(proxy = proxy)
def parse_page(self, response):
self.write(response.body)
sel = Selector(response)
infos = sel.xpath('//tbody/tr').extract()
for i, info in enumerate(infos):
if i == 0:
continue
val = Selector(text = info)
ip = val.xpath('//td[1]/text()').extract_first()
port = val.xpath('//td[2]/text()').extract_first()
country = val.xpath('//td[3]/div/text()').extract_first()
anonymity = val.xpath('//td[6]/text()').extract_first()
proxy = Proxy()
proxy.set_value(
ip = ip,
port = port,
country = country,
anonymity = anonymity,
source = self.name,
)
self.add_proxy(proxy = proxy)
def parse_url_list(self, response):
sel = scrapy.Selector(response)
print(sel)
# first_url_list = sel.xpath('//title[1]//text()').extract()
# print(first_url_list)
article_xpath = ".//*[@id='news']/ul/li/div/a[1]/@href"
article_url_list = sel.xpath(article_xpath).extract()
for article_url in article_url_list:
print(article_url)
yield scrapy.Request(article_url,self.parse_article)
#yield self.parse_article(url)
#content = selenium_request(article_url_list)
#print(content)
def parse_info(self, response):
selector = scrapy.Selector(response)
item = WeiboWebInfoItem()
info = selector.xpath("body/div[@class='u']/div[@class='tip2']")
info_text = info.extract_first()
try:
item['ID'] = re.findall("uid=(.*?)\">", info_text)[0]
item['TweetsNum'] = re.findall("??\[(.*?)\]</span>", info_text)[0]
item['FollowerNum'] = re.findall("??\[(.*?)\]</span>", info_text)[0]
item['FanNum'] = re.findall("??\[(.*?)\]</span>", info_text)[0]
tweet_url, follower_url = url_generator_for_id(item['ID'])
item['URL'] = tweet_url
except:
pass
basic_info_url = 'http://weibo.cn/%s/info' % item['ID']
yield scrapy.Request(basic_info_url, meta={"item": item}, callback=self.parse_basic_info)
def parse(self, response):
sel = scrapy.Selector(response)
dataList = sel.xpath("//div[@class='m-fly-item s-oneway']")
items = []
for index,each in enumerate(dataList):
flight_each = "//div[@id='list-box']/div["+str(index+1)+"]"
detail_span = "//div[@class='fl-detail-nav']/ul/li[1]/span[@class='nav-label']"
f_route_div = "//div[@class='m-fl-info-bd']/div"
airports = sel.xpath(flight_each + f_route_div + '/p[3]//text()').extract()
company = sel.xpath(flight_each + f_route_div + '/p[1]//text()').extract()
flight_time = sel.xpath(flight_each + f_route_div + '/p[2]//text()').extract()
passtime = sel.xpath(flight_each + f_route_div + '/p[4]//text()').extract()
price = sel.xpath(flight_each + "//div[@class='fl-price-box']//em//text()").extract()
item = FindtripItem()
item['site'] = 'Qua'
item['company'] = company
item['flight_time'] = flight_time
item['airports'] = airports
item['passtime'] = passtime
item['price'] = price
items.append(item)
return items
def parse(self,response):
sel = Selector(response)
keys = sel.xpath('//*[@class="menu_main job_hopping"]/h2/text()').extract()
i = 1
item = defaultdict(list)
for key in keys:
if key.strip() != '':
print "test"
print key.strip()
try:
print i
item[key.strip()].append(sel.xpath('//*[@class="menu_box"][{}]/div[2]/dl/dd/a/text()'.format(i)).extract())
i = i + 1
# item["key"].append(key)
except Exception, e:
print e
else:
continue
yield item
def parse_detail(self, response):
res_dir = response.meta["RESDIR"]
print 'res_dir:', res_dir
rensel = scrapy.Selector(response)
text = rensel.xpath('//script/text()').extract()
tmp1 = re.findall(r'"url":\"(.*?)\"', str(text))
if len(tmp1) > 0:
uid_p_list = []
for i in tmp1:
uid_p_list.append(i.strip().replace('\\', ''))
for i in uid_p_list[1:]:
pid = i.split('/')[-3]
print i
r = Redis(host='192.168.5.24', port='6379')
print r.llen(self.MCOUNTRY)
r.lpush(self.MCOUNTRY,i)
def parse_detail(self, response):
res_dir = response.meta["RESDIR"]
print 'res_dir:', res_dir
rensel = scrapy.Selector(response)
text = rensel.xpath('//script/text()').extract()
tmp1 = re.findall(r'"url":\"(.*?)\"', str(text))
if len(tmp1) > 0:
uid_p_list = []
for i in tmp1:
uid_p_list.append(i.strip().replace('\\', ''))
for i in uid_p_list[1:]:
pid = i.split('/')[-3]
print i
r = Redis(host='192.168.5.24', port='6379')
print r.llen(self.MCOUNTRY)
r.lpush(self.MCOUNTRY,i)
def parse_job(self, response):
"""Parse a joblink into a JobItem.
"""
s = Selector(response)
item = JobItem()
item['url'] = response.url
item['site'] = 'Remote.co'
item['title'] = s.css('h1::text').extract_first()
item['company'] = s.xpath(
'//strong[@itemprop="name"]/text()').extract_first()
job = s.css('.job-description')
job.xpath('p[1]')
item['text'] = s.xpath(
'//div[@class="job_description"]//text()').extract()
try:
posted = s.xpath('//time//text()').extract_first()
item['date_posted'] = utilities.naturaltime(
posted.replace('Posted ', '')).isoformat()
except Exception as e:
self.logger.error(e)
yield item
def parse_job(self, response):
"""Parse a joblink into a JobItem.
"""
s = Selector(response)
item = JobItem()
item['url'] = response.url
item['site'] = 'RemoteWorking'
item['title'] = s.css('h1::text').extract_first()
item['text'] = s.xpath(
'//div[@itemprop="description"]//text()').extract()
try:
posted = s.xpath('//li[@class="date-posted"]//text()').extract_first()
item['date_posted'] = utilities.naturaltime(
posted.replace('Posted ', '')).isoformat()
except Exception as e:
self.logger.error(e)
yield item
def parse_job(self, response):
"""Parse a joblink into a JobItem.
"""
s = Selector(response)
item = JobItem()
item['url'] = response.url
item['site'] = 'Jobspresso'
item['title'] = s.xpath(
'//h2[@class="page-title"]//text()').extract_first()
item['text'] = s.xpath(
'//div[@itemprop="description"]//text()').extract()
try:
posted = s.xpath('//date/text()').extract_first()
item['date_posted'] = parse_time(posted).isoformat()
except Exception as e:
self.logger.error(e)
yield item