def parse_question(self, response):
# ??question??? ??????????question item
if "QuestionHeader-title" in response.text:
# ?????
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-Comment button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")
question_item = item_loader.load_item()
else:
# ????????item??
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
# item_loader.add_css("title", ".zh-question-title h2 a::text")
item_loader.add_xpath("title",
"//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
item_loader.add_css("content", "#zh-question-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", "#zh-question-answer-num::text")
item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
# item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
item_loader.add_xpath("watch_user_num",
"//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
item_loader.add_css("topics", ".zm-tag-editor-labels a::text")
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers,
callback=self.parse_answer)
yield question_item
python类ItemLoader()的实例源码
def parse_news(self, response):
self.logger.info('parse_news: %s' % response)
# Initialize item loader
# extract news title, published_at, author, content, url
# Required: title, raw_content, published_at
loader = ItemLoader(item=News(), response=response)
loader.add_value('url', response.url)
title_selectors = response.css('h1.detailtitle::text')
if not title_selectors:
# If error, drop from the item pipeline
return loader.load_item()
title = title_selectors.extract_first().strip()
loader.add_value('title', title)
# Parse date information
date_time = response.css('body > div > div.container > div.page-header > div::text').extract_first().strip()
date_time = date_time.split(',')[-1].strip()
date_time = ' '.join([_(w) for w in date_time.split(' ')]) # October => Oktober
try:
published_at_wib = datetime.strptime(date_time, '%d %B %Y %H:%M')
except ValueError:
# If error, drop from the item pipeline
return loader.load_item()
published_at = wib_to_utc(published_at_wib)
loader.add_value('published_at', published_at)
# If multipage
multipage_selectors = response.css('.newsPagingWrap > a')
if multipage_selectors:
return self.parse_indices(multipage_selectors, loader)
# Else if not multipage
author_name_selectors = response.css('.newsContent > p > strong::text')
if not author_name_selectors:
loader.add_value('author_name', '')
else:
author_name = author_name_selectors.extract()[-1].strip()
loader.add_value('author_name', author_name)
# Extract the news content
raw_content_selectors = response.css('.newsContent > p')
if not raw_content_selectors:
# Drop from the item pipeline
return loader.load_item()
raw_content = ' '.join(raw_content_selectors.extract())
raw_content = raw_content.strip()
loader.add_value('raw_content', raw_content)
# Move scraped news to pipeline
return loader.load_item()
def parse_news(self, response):
self.logger.info('parse_news: %s' % response)
parsed_news = json.loads(str(response.body))[0]
# Initialize item loader
# extract news title, published_at, author, content, url
loader = ItemLoader(item=News(), response=response)
loader.add_value('url', parsed_news['url'])
if not parsed_news['title']:
# Will be dropped on the item pipeline
return loader.load_item()
loader.add_value('title', parsed_news['title'])
# Convert HTML text to a scrapy response
html_response = HtmlResponse(url=parsed_news['url'],
body=parsed_news['content'].encode('utf-8', 'ignore'))
xpath_query = '''
//body/node()
[not(descendant-or-self::comment()|
descendant-or-self::style|
descendant-or-self::script|
descendant-or-self::div|
descendant-or-self::span|
descendant-or-self::image|
descendant-or-self::img|
descendant-or-self::iframe
)]
'''
raw_content_selectors = html_response.xpath(xpath_query)
if not raw_content_selectors:
# Will be dropped on the item pipeline
return loader.load_item()
raw_content = raw_content_selectors.extract()
raw_content = ' '.join([w.strip() for w in raw_content])
raw_content = raw_content.strip()
loader.add_value('raw_content', raw_content)
if not parsed_news['published']:
# Will be dropped on the item pipeline
return loader.load_item()
# Parse date information
# Example: 12 Oct 2016 - 05:25
date_time_str = ' '.join([_(w) for w in parsed_news['published'].split(',')[1].strip()[:-4].split(' ')])
try:
published_at_wib = datetime.strptime(date_time_str,
'%d %b %Y - %H:%M')
except ValueError:
# Will be dropped on the item pipeline
return loader.load_item()
published_at = wib_to_utc(published_at_wib)
loader.add_value('published_at', published_at)
if not parsed_news['author']:
loader.add_value('author_name', '')
else:
loader.add_value('author_name', parsed_news['author'])
# Move scraped news to pipeline
return loader.load_item()
def parse_news_metro(self, response):
loader = ItemLoader(item=News(), response=response)
loader.add_value('url', response.url)
date_selector = response.css('.artikel > div.block-tanggal::text')
if not date_selector:
return self.parse_news_pilkada(loader, response)
try:
date_time_str = date_selector.extract()[0].split(',')[1].strip()[:-4]
date_time_str = ' '.join([_(x) for x in date_time_str.split(' ')])
published_at_wib = datetime.strptime(date_time_str, '%d %B %Y | %H:%M')
except Exception:
return loader.load_item()
published_at = wib_to_utc(published_at_wib)
if (self.media['last_scraped_at'] >= published_at):
is_no_update = True
self.logger.info('Media have no update')
raise CloseSpider('finished')
loader.add_value('published_at', published_at)
title_selector = response.css('.artikel > h1::text')
if not title_selector:
return loader.load_item()
loader.add_value('title', title_selector.extract()[0])
# Select all p which don't have iframe inside it
raw_content_selector = response.xpath('//div[@class="artikel"]//p[not(iframe)]')
if not raw_content_selector:
return loader.load_item()
raw_content = ''
for rsl in raw_content_selector:
raw_content = raw_content + rsl.extract().strip()
# Go to next page while there is next page button
next_page_selector = response.css('.pagination-nb').xpath('//a[text()="next"]/@href')
if next_page_selector:
return Request(next_page_selector.extract()[0], callback=lambda x, loader=loader, raw_content=raw_content: self.parse_next_page_metro(x, loader, raw_content))
loader.add_value('raw_content', raw_content)
# The author usually put inside <strong> tag, however, some news is not using <strong> tag.
# NOTE: this block of code may need revision in the future
author_name = ''
for author_name_selector in reversed(raw_content_selector):
author_name_selector = author_name_selector.css('strong::text')
for tmp in reversed(author_name_selector.extract()):
tmp = tmp.strip()
if tmp and all((x.isalpha() and x.isupper()) or x.isspace() or x == '.' or x == '|' for x in tmp):
author_name = tmp
break
if author_name:
break
author_name = ','.join(author_name.split(' | '))
loader.add_value('author_name', author_name)
return loader.load_item()
def parse_news(self, response):
self.logger.info('parse_news: %s' % response)
# Initialize item loader
# extract news title, published_at, author, content, url
loader = ItemLoader(item=News(), response=response)
loader.add_value('url', response.url)
title_selectors = response.css('h1[itemprop="headline"]::text')
if not title_selectors:
# Will be dropped on the item pipeline
return loader.load_item()
title = title_selectors.extract()[0]
loader.add_value('title', title)
author_name_selectors = response.css('a[rel="author"] > span::text')
if not author_name_selectors:
loader.add_value('author_name', '')
else:
author_name = author_name_selectors.extract()[0]
loader.add_value('author_name', author_name)
raw_content_selectors = response.css('.content')
if not raw_content_selectors:
# Will be dropped on the item pipeline
return loader.load_item()
raw_content = raw_content_selectors.extract()
raw_content = ' '.join([w.strip() for w in raw_content])
raw_content = raw_content.strip()
loader.add_value('raw_content', raw_content)
date_time_str_selectors = response.css('article > div.time::text')
if not date_time_str_selectors:
# Will be dropped on the item pipeline
return loader.load_item()
# Parse date information
# Example: Selasa, 6 Oktober 2015 - 05:23 WIB
date_time_str = date_time_str_selectors.extract()[0]
date_time_str = date_time_str.split(',')[1].strip()[:-4]
date_time_str = ' '.join([_(w) for w in date_time_str.split(' ')])
try:
published_at_wib = datetime.strptime(date_time_str, '%d %B %Y - %H:%M')
except ValueError:
# Will be dropped on the item pipeline
return loader.load_item()
published_at = wib_to_utc(published_at_wib)
loader.add_value('published_at', published_at)
# Move scraped news to pipeline
return loader.load_item()
def parse_news(self, response):
self.logger.info('parse_news: %s' % response)
loader = ItemLoader(item=News(), response=response)
json_response = json.loads(response.body)
try:
url = json_response['NewsML']['NewsItem']['NewsComponent']['NewsComponent']['NewsComponent']['NewsLines']['MoreLink']
except KeyError:
return loader.load_item()
loader.add_value('url', url)
try:
title = json_response['NewsML']['NewsItem']['NewsComponent']['NewsComponent']['NewsComponent']['NewsLines']['HeadLine']
except KeyError:
return loader.load_item()
if not title:
return loader.load_item()
loader.add_value('title', title)
try:
raw_content = json_response['NewsML']['NewsItem']['NewsComponent']['NewsComponent']['NewsComponent']['ContentItem']['DataContent']['nitf']['body']['body.content']['p']
except KeyError:
return loader.load_item()
if not raw_content:
return loader.load_item()
loader.add_value('raw_content', raw_content)
try:
author_name = json_response['NewsML']['NewsItem']['NewsComponent']['NewsComponent']['Author']
except KeyError:
return loader.load_item()
if not author_name:
loader.add_value('author_name', '')
else:
loader.add_value('author_name', author_name)
try:
date_time_str = json_response['NewsML']['NewsItem']['NewsManagement']['FirstCreated']
except KeyError:
return loader.load_item()
if not date_time_str:
return loader.load_item()
date_time_str = date_time_str.split('T')
date_time_str[1] = '0' * (6 - len(date_time_str[1])) + date_time_str[1]
try:
published_at_wib = datetime.strptime(' '.join(date_time_str), '%Y%m%d %H%M%S');
except Exception:
return loader.load_item()
published_at = wib_to_utc(published_at_wib)
loader.add_value('published_at', published_at)
return loader.load_item()
def parse_item(self, response):
loader = ItemLoader(GaokaopaiZhuanyeItem(), response)
loader.add_value('url', response.url)
loader.add_css('name', u'.majorTitle>h1::text')
loader.add_xpath('code', u'//div[@class="majorBase"]/h3[starts-with(., "?????")]/text()', re=ur'?(.+)')
loader.add_xpath('degree', u'//div[@class="majorBase"]/h3[starts-with(., "?????")]/text()', re=ur'?(.+)')
loader.add_xpath('period', u'//div[@class="majorBase"]/h3[starts-with(., "?????")]/text()', re=ur'?(.+)')
loader.add_xpath('courses', u'//div[@class="course"]/h3[.="?????"]/following-sibling::p/text()')
def parse_related():
for e in response.xpath(u'//div[@class="course"]/h3[.="?????"]/following-sibling::a'):
yield {
'url': e.css('::attr(href)').extract_first(),
'code': e.css('::attr(href)').re_first(ur'-([^-]+)\.html'),
'name': e.css('::text').extract_first(),
}
loader.add_value('related', list(parse_related()))
def parse_category():
category = []
for i in [u"????", u"????", u"????"]:
x = u'//h3[.="{}"]/following-sibling::ul[1]/li[@class="current"]/a'.format(i)
e = response.xpath(x)
category.append({
'url': e.css('::attr(href)').extract_first(),
'code': e.css('::attr(href)').re_first(ur'/zhuanye([-0-9]*)\.html').strip('-'),
'name': e.css('::text').extract_first(),
})
return category
loader.add_value('category', parse_category())
loader.add_css('detail', u'.majorCon')
item = loader.load_item()
return Request(
url='http://www.gaokaopai.com/zhuanye-jiuye-{}.html'.format(item['code'][0]),
meta={'item': item},
callback=self.parse_jiuye
)
def parse_item(self, response):
loader = ItemLoader(ChsiDaxueItem(), response)
loader.add_value('id', response.url, re=ur'schId-(\w+)\.dhtml')
loader.add_value('url', response.url)
loader.add_css('logo', u'.r_c_sch_logo>img::attr(src)', MapCompose(lambda url: urljoin('http://gaokao.chsi.com.cn/', url)))
loader.add_css('name', u'.topImg::text')
loader.add_css('badges', u'.r_c_sch_attr .r_c_sch_icon::attr(title)')
data_clean = MapCompose(lambda x: re.sub(r'\s+', ' ', x), unicode.strip)
loader.add_xpath('type', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('membership', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('province', u'//span[@class="f_bold" and span]/following-sibling::text()', data_clean)
loader.add_xpath('address', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('phone', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('website', u'//span[@class="f_bold" and .="?????"]/following-sibling::a/@href', data_clean)
loader.add_xpath('backdoor', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
def parse_votes():
xpath = u'//td[@class="tdMydT" and .="{}"]/following-sibling::td/div[@class="rank"]/@rank'
get_vote = lambda what: float(response.xpath(xpath.format(what)).extract_first() or 0)
return {
'overall': get_vote(u'?????'),
'environment': get_vote(u'???????'),
'life': get_vote(u'?????'),
}
loader.add_value('votes', parse_votes())
def parse_trending():
css = u'{}>table tr:not(:first-child)'
def get_trending(what):
majors = []
for e in response.css(css.format(what)):
majors.append({
'id': e.css(u'.tdZytjTDiv>a::attr(href)').re_first(r'specId=(\w+)'),
'name': e.css(u'.tdZytjTDiv::attr(title)').extract_first(),
'vote': float(e.css(u'.avg_rank::text').extract_first()),
'count': int(e.css(u'.c_f00::text, .red::text').extract_first()),
})
return majors
return {
'count': get_trending(u'#topNoofPTable'),
'index': get_trending(u'#topIndexTable'),
'like': get_trending(u'.r_r_box_zymyd'),
}
loader.add_value('trending', parse_trending())
item = loader.load_item()
for link in LinkExtractor(restrict_xpaths=u'//ul[@id="topNav"]//a[.="????"]').extract_links(response):
yield Request(link.url, meta={'item': item}, callback=self.parse_jianjie)
def parse_item(self, response):
""" Parse a response into a DocumentItem. """
doc_loader = ItemLoader(item=DocumentItem(), response=response)
doc_loader.add_value('url', response.url)
doc_loader.add_xpath('meta', '//meta[@name=\'description\']/@content')
doc_loader.add_value('domain', urlparse(response.url).hostname)
doc_loader.add_xpath('title', '//title/text()')
hxs = HtmlXPathSelector(response) # For HTML extractions
# Extract links
# For each link on this page
links = []
a_links = hxs.xpath('//a')
for link in a_links:
link_obj = {}
# Extract the link's URL
link_str = " ".join(link.xpath('@href').extract())
link_obj['link'] = link_str.replace("\n", "")
# Extract the links value
link_name_str = " ".join(link.xpath('text()').extract())
link_name_str = link_name_str.replace("\n", "")
link_name_str = link_name_str.lstrip()
link_name_str = link_name_str.rstrip()
link_obj['link_name'] = link_name_str
links.append(link_obj)
doc_loader.add_value('links', links)
# Populate text field
title_list = hxs.xpath('//title/text()').extract()
title = ' '.join(title_list)
body_text = self.html2string(response)
text = title + " " + body_text
doc_loader.add_value('content', text)
doc_loader.add_value('raw_text', text)
doc_loader.add_value('raw_title', title)
doc_loader.add_value('raw_url', response.url)
h1_list = hxs.xpath("//h1/text()").extract()
doc_loader.add_value('h1', " ".join(h1_list))
doc_loader.add_value('content_type', response.headers['Content-type'])
doc_loader.add_value('updated_on', datetime.datetime.now().strftime(
"%Y-%m-%dT%H:%M:%S"))
item = doc_loader.load_item()
return item
def parse_item(self, response):
"""
Extract fields from the individual email page and load them into the
item.
@url http://lkml.iu.edu/hypermail/linux/kernel/0111.3/0036.html
@returns items 1 1
@scrapes senderName senderEmail timeSent timeReceived subject body
@scrapes replyto url
"""
load = ItemLoader(item=Email(), selector=response)
# Take care of easy fields first
load.add_value('url', response.url)
pattern_replyto = '//ul[1]/li[contains((b|strong), "In reply to:")]'
pattern_replyto += '/a/@href'
link = response.xpath(pattern_replyto).extract()
link = [''] if not link else link
load.add_value('replyto', link[0])
# Sometime in 2003, the archive changes and the email pages
# require specific procedure to extract the following fields:
specific_fields = {
'senderName': None,
'senderEmail': None,
'timeSent': None,
'timeReceived': None,
'subject': None
}
# Detect new archive system with HTML comment
new_system = response.xpath('/comment()[1][contains(., "MHonArc")]')
if len(new_system) >= 1:
# If new archive system is detected...
specific_fields = self.parse_new_system(response, specific_fields)
body_before_comment = '<!--X-Body-of-Message-->'
body_after_comment = '<!--X-Body-of-Message-End-->'
else:
# Otherwise...
specific_fields = self.parse_old_system(response, specific_fields)
body_before_comment = '<!-- body="start" -->'
body_after_comment = '<!-- body="end" -->'
# Load all the values from these specific fields
for key, val in specific_fields.items():
load.add_value(key, val)
if self.get_body:
# Final field, the body of the email
pattern_body = body_before_comment + '\n?(.*)' + body_after_comment
# Ignore invalid bytes when necessary
page_body = response.body.decode('utf-8', 'ignore')
body = re.search(pattern_body, page_body, flags=re.S)
load.add_value('body', body.group(1))
return load.load_item()