def __init__(self, conf=None, conn=None, date_from=None, date_to=None):
# Save conf/conn
self.conf = conf
self.conn = conn
# Make start urls
self.start_urls = _make_start_urls(
prefix='http://www.isrctn.com/search',
date_from=date_from, date_to=date_to)
# Make rules
self.rules = [
Rule(LinkExtractor(
allow=r'ISRCTN\d+',
), callback=parse_record),
Rule(LinkExtractor(
allow=r'page=\d+',
)),
]
# Inherit parent
super(Spider, self).__init__()
# Internal
python类LinkExtractor()的实例源码
def parse(self, response):
le = LinkExtractor()
for link in le.extract_links(response):
yield SplashRequest(
link.url,
self.parse_link,
endpoint='render.json',
args={
'har': 1,
'html': 1,
}
)
def parse(self, response):
le = LinkExtractor()
for link in le.extract_links(response):
yield SplashRequest(
link.url,
self.parse_link,
endpoint='render.json',
args={
'har': 1,
'html': 1,
}
)
def parse(self, response):
le = LinkExtractor()
for link in le.extract_links(response):
yield SplashRequest(
link.url,
self.parse_link,
endpoint='render.json',
args={
'har': 1,
'html': 1,
}
)
def __init__(self, domains, urls, *args, **kwargs):
"""Constructor for PageSpider.
Parameters
----------
domains : list
A list of domains for the site.
urls : list
A list of URLs of the site.
href_xpaths : list
A list of XPATH expression indicating the ancestors of `<a>`
element.
url_regex : string
URL pattern regular expression.
If you use this spider to store item into database, additional
keywords are required:
platform_id : int
The id of a platform instance.
session : object
An instance of SQLAlchemy session.
"""
self.session = kwargs.pop('session', None)
self.platform_id = kwargs.pop('platform_id', None)
self.href_xpaths = kwargs.pop('href_xpaths', ())
self.url_regex = kwargs.pop('url_regex', None)
self.start_urls = urls
self.allowed_domains = domains
self.link_extractor = LinkExtractor(
allow_domains=self.allowed_domains,
restrict_xpaths=self.href_xpaths,
unique=True)
super(PageSpider, self).__init__(*args, **kwargs)
def __init__(self, domains, urls, *args, **kwargs):
"""Constructor for SiteSpider.
Parameters
----------
domains : list
A list of domains for the site.
urls : list
A list of sitemap URLS of the site.
href_xpaths : list
A list of XPATH expression indicating the ancestors of `<a>`
element.
url_regex : string
URL pattern regular expression.
If you use this spider to store item into database, additional
keywords are required:
platform_id : int
The id of a platform instance.
session : object
An instance of SQLAlchemy session.
"""
self.session = kwargs.pop('session', None)
self.platform_id = kwargs.pop('platform_id', None)
self.url_regex = kwargs.pop('url_regex', None)
self.href_xpaths = kwargs.pop('href_xpaths', ())
self.start_urls = urls
self.allowed_domains = domains
self.rules = (Rule(
LinkExtractor(
allow_domains=self.allowed_domains,
restrict_xpaths=self.href_xpaths,
unique=True),
callback="parse_item",
follow=True),)
super(SiteSpider, self).__init__(*args, **kwargs)
def main():
url = 'http://scrapinghub.com/'
link_extractor = LinkExtractor()
total = 0
time = 0
tar = tarfile.open("sites.tar.gz")
for member in tar.getmembers():
f = tar.extractfile(member)
html = f.read()
start = timer()
response = HtmlResponse(url=url, body=html, encoding='utf8')
links = link_extractor.extract_links(response)
end = timer()
total = total + len(links)
time = time + end - start
print("\nTotal number of links extracted = {0}".format(total))
print("Time taken = {0}".format(time))
click.secho("Rate of link extraction : {0} links/second\n".format(
float(total / time)), bold=True)
with open("Benchmark.txt", 'w') as g:
g.write(" {0}".format((float(total / time))))
def __init__(self, **kw):
super(BroadBenchSpider, self).__init__(**kw)
self.link_extractor = LinkExtractor()
self.cookies_seen = set()
self.previtem = 0
self.items = 0
self.timesec = datetime.datetime.utcnow()
def __init__(self, **kw):
super(FollowAllSpider, self).__init__(**kw)
url = 'http://localhost/books.toscrape.com/index.html'
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://%s/' % url
self.url = url
self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)]
self.link_extractor = LinkExtractor()
self.cookies_seen = set()
self.previtem = 0
self.items = 0
self.timesec = datetime.datetime.utcnow()
def parse(self, response):
"""
Scrapy parse callback
"""
# Get current nesting level
curr_depth = response.meta.get('depth', 1)
if self.config['login']['enabled']:
curr_depth = curr_depth - 1 # Do not count the login page as nesting depth
# Store to disk?
if self.config['store']['enabled']:
path = response.url.replace(os.sep, '--') # Replace directory separator
path = self.config['store']['path'] + os.sep + path
with open(path, 'wb') as fpointer:
fpointer.write(response.body)
# Yield current url item
item = CrawlpyItem()
item['url'] = response.url
item['status'] = response.status
item['depth'] = curr_depth
item['referer'] = response.meta.get('referer', '')
yield item
# Get all links from the current page
links = LinkExtractor().extract_links(response)
# Iterate all found links and crawl them
for link in links:
deny = False
# Check requests to be ignored
for ignore in self.config['ignores']:
if (ignore in link.url) or (ignore.lower() in link.url.lower()):
# Ignore pattern found, stop looking into other patterns
deny = True
break
# [NO] Max depth exceeded
if curr_depth >= self.max_depth:
logging.info('[Not Crawling] Current depth (' + curr_depth + ') exceeds max depth (' + self.max_depth + ')')
pass
# [NO] Duplicate URL
elif link.url in self.duplicates:
logging.info('[Not Crawling] Url already crawled: ' + link.url)
pass
# [NO] URL denied
elif deny:
logging.info('[Not Crawling] Url denied (pattern: "' + ignore + '"): ' + link.url)
pass
# [OK] Crawl!
else:
self.duplicates.append(link.url)
yield Request(link.url, meta={'depth': curr_depth+1, 'referer': response.url})
def parse(self, response):
for li_item in response.css('#content div.entry-content ul.lcp_catlist li'):
title = li_item.css('h3.lcp_post a::text').extract_first().strip()
text_date = li_item.css('::text').extract_first().strip()
try:
date_obj = datetime.datetime.strptime(text_date, '%d %B %Y')
date = date_obj.date().isoformat()
except ValueError:
date = None
paragraphs = li_item.xpath('p').xpath("string()").extract()
description = '\n'.join(paragraphs)
feedback_days = None
feedback_date = self.get_feedback_date(description)
if feedback_date:
days_diff = feedback_date - date_obj
feedback_days = days_diff.days
links = li_item.css('a')
documents = self.get_documents_from_links(links)
item = JustPublication(
title=title,
type=self.get_type(title),
identifier=self.slugify(title)[0:127],
date=date,
institution='justitie',
description=description,
documents=documents,
contact=self.get_contacts(description),
feedback_days=feedback_days
)
yield item
paginationLinkEx = LinkExtractor(restrict_css='ul.lcp_paginator')
pages = paginationLinkEx.extract_links(response)
for page in pages:
yield scrapy.Request(page.url, callback=self.parse)
pass
def parse_item(self, response):
loader = ItemLoader(ChsiDaxueItem(), response)
loader.add_value('id', response.url, re=ur'schId-(\w+)\.dhtml')
loader.add_value('url', response.url)
loader.add_css('logo', u'.r_c_sch_logo>img::attr(src)', MapCompose(lambda url: urljoin('http://gaokao.chsi.com.cn/', url)))
loader.add_css('name', u'.topImg::text')
loader.add_css('badges', u'.r_c_sch_attr .r_c_sch_icon::attr(title)')
data_clean = MapCompose(lambda x: re.sub(r'\s+', ' ', x), unicode.strip)
loader.add_xpath('type', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('membership', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('province', u'//span[@class="f_bold" and span]/following-sibling::text()', data_clean)
loader.add_xpath('address', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('phone', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
loader.add_xpath('website', u'//span[@class="f_bold" and .="?????"]/following-sibling::a/@href', data_clean)
loader.add_xpath('backdoor', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean)
def parse_votes():
xpath = u'//td[@class="tdMydT" and .="{}"]/following-sibling::td/div[@class="rank"]/@rank'
get_vote = lambda what: float(response.xpath(xpath.format(what)).extract_first() or 0)
return {
'overall': get_vote(u'?????'),
'environment': get_vote(u'???????'),
'life': get_vote(u'?????'),
}
loader.add_value('votes', parse_votes())
def parse_trending():
css = u'{}>table tr:not(:first-child)'
def get_trending(what):
majors = []
for e in response.css(css.format(what)):
majors.append({
'id': e.css(u'.tdZytjTDiv>a::attr(href)').re_first(r'specId=(\w+)'),
'name': e.css(u'.tdZytjTDiv::attr(title)').extract_first(),
'vote': float(e.css(u'.avg_rank::text').extract_first()),
'count': int(e.css(u'.c_f00::text, .red::text').extract_first()),
})
return majors
return {
'count': get_trending(u'#topNoofPTable'),
'index': get_trending(u'#topIndexTable'),
'like': get_trending(u'.r_r_box_zymyd'),
}
loader.add_value('trending', parse_trending())
item = loader.load_item()
for link in LinkExtractor(restrict_xpaths=u'//ul[@id="topNav"]//a[.="????"]').extract_links(response):
yield Request(link.url, meta={'item': item}, callback=self.parse_jianjie)