def _extract_item(self, response):
#?scrapy shell???response
#inspect_response(response, self)
#???????scrapy????response?????????????
#open_in_browser(response)
#???????
l = ItemLoader(response=response, item=MyspiderItem(), type='html')
l.add_xpath('movie_name', '//h1/span[@property="v:itemreviewed"]/text()')
l.add_xpath('movie_year', '//span[@property="v:initialReleaseDate"]/text()')
l.add_xpath('movie_type', '//span[@property="v:genre"]/text()')
l.add_xpath('movie_rate', '//strong[@class="ll rating_num"]/text()')
l.add_value('url', response.url)
#????????????load_item()????scrapy.Item??
#?scrapy-redis????json?item???????redis?item???
#??json?????python?????????????item?????
return dict(l.load_item())
python类Item()的实例源码
def parse_node(self, response, node):
i = MyxmlItem()
#XPath??????Item
i['title'] = node.xpath("/rss/channel/item/title/text()").extract()
i['link'] = node.xpath("/rss/channel/item/link/text()").extract()
i['author'] = node.xpath("/rss/channel/item/author/text()").extract()
#?for????item??
for j in range(len(i['title'])):
print(""+str(j+1)+"?")
print("?")
print(i['title'][j])
print("??")
print(i['link'][j])
print("??")
print(i['author'][j])
print("----------------------")
return i
#(4)
def _extract_item(self, response):
#???????
l = ItemLoader(response=response, item=MyspiderItem(), type='html')
l.add_xpath('movie_name', '//h1/span[@property="v:itemreviewed"]/text()')
l.add_xpath('movie_year', '//span[@property="v:initialReleaseDate"]/text()')
l.add_xpath('movie_type', '//span[@property="v:genre"]/text()')
l.add_xpath('movie_rate', '//strong[@class="ll rating_num"]/text()')
l.add_value('url', response.url)
#????????????load_item()????scrapy.Item??
#?scrapy-redis????json?item???????redis?item???
#??json?????python?????????????item?????
return dict(l.load_item())
def parse_details(self, response):
# response = get(response.url)
institution = response.xpath('//h2/text()').extract()[0].strip()
logging.warn("scrapping: %s - %s"%(response.url, institution))
for tr in response.xpath('//table[@class="fancy"]/tr'):
if tr.xpath('td[1]'):
item = Item()
titlu = xtract(tr, 'td[1]//div/text()')
type_ = xtract(tr, 'td[2]//div//strong/text()')
consult = xtract(tr, 'td[3]//div/text()')
avizare = xtract(tr, 'td[4]//div/text()')
avizori = xtract(tr, 'td[5]//div/text()')
termen_avize = xtract(tr, 'td[6]//div/text()')
mfp_mj = xtract(tr, 'td[7]//div/text()')
reavizare = xtract(tr, 'td[8]//div/text()')
init_1 = xtract(tr, 'td[9]//a/@href')
init_2 = xtract(tr, 'td[10]//a/@href')
final_1 = xtract(tr, 'td[11]//a/@href')
final_2 = xtract(tr, 'td[12]//a/@href')
docs = [{"type": "nota", "url": response.urljoin(f)} for f in [init_1, init_2, final_1, final_2] if f]
item['identifier'] = identify(institution, titlu)
item['title'] = titlu
item['type'] = type_
item['institution'] = "sgg"
item['date'] = consult
item['description'] = ""
item['feedback_days'] = None
item['contact'] = None
item['documents'] = docs
yield item
def parse_item(self,response):
self.logger.info('Hi,this is an item page! %s',response.url)
item = scrapy.Item()
item['id'] = response.xpath('//td[@id="item_id"]/text()').re(r'ID: (\d+)')
item['name'] = response.xpath('//td[@id="item_name"]/text()').extract()
item['description'] = response.xpath('//td[@id="item_description"]/text()').extract()
return item
def default(self, obj):
if isinstance(obj, Item):
return dict(obj)
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def _bogus_item(self, item):
max_style = len('advanced intermediate hip hop with something else mixed in')
max_teacher = len('someones longish-teacher and-last-name sub for crazy-long foreign-teacher different-name')
if len(item['style']) > max_style or len(item['teacher']) > max_teacher:
logging.error("Item contained too long properties: %s", item)
return True
return False
def _bogus_item(self, item):
max_style = len('advanced intermediate hip hop with something else mixed in')
max_teacher = len('someones longish-teacher and-last-name sub for crazy-long foreign-teacher different-name')
if len(item['style']) > max_style or len(item['teacher']) > max_teacher:
logging.error("Item contained too long properties: %s", item)
return True
return False
def page_item(self, response: HtmlResponse) -> Item:
media_urls = []
get_urls = lambda le: (link.url for link in le.extract_links(response))
if self.settings.get('FILES_STORE'):
media_urls.extend(get_urls(self.images_le))
media_urls.extend(
set(get_urls(self.files_le)) - set(get_urls(self.le)))
metadata = {
'id': _url_hash(response.url, as_bytes=False),
'parent': _url_hash_as_str(response.meta.get('parent')),
'depth': response.meta.get('depth'),
'priority': response.request.priority,
}
if (self.settings.get('AUTOLOGIN_ENABLED') and
not self.queue.has_login_form(response.url)):
for form_el, form_meta in extract_forms(
response.text, fields=False):
if form_meta.get('form') == 'login':
self.queue.add_login_form(response.url)
metadata['has_login_form'] = True
return text_cdr_item(
response,
crawler_name=self.settings.get('CDR_CRAWLER'),
team_name=self.settings.get('CDR_TEAM'),
objects=media_urls,
metadata=metadata,
)
def page_item(self, response: HtmlResponse) -> Item:
item = super().page_item(response)
if self.page_clf:
item['metadata']['page_score'] = self.page_score(response)
return item
def test_process_item(self):
normal_item = Item()
class DummyDocument(Document):
pass
DummyDocument.save = MagicMock()
document_item = document_to_item(DummyDocument)()
after = self.pipe.process_item(normal_item, None)
self.assertEqual(normal_item, after)
after = self.pipe.process_item(document_item, None)
self.assertIsInstance(after, DummyDocument)
def document_to_item(document_class):
class DocumentAsItemClass(Item):
def concrete(self):
return document_class(**self)
exclude_fields = dir(EmptyDocument)
document_fields = [field for field in dir(document_class) if field not in exclude_fields]
for field in document_fields + ['id']:
DocumentAsItemClass.fields[field] = Field()
return DocumentAsItemClass
def get_scrapy_item_classes():
"""
Get a list of tuples containing (1) the class name and (2) the class for all of the Scrapy item
classes defined in the crawling module.
:return: A list of tuples containing (1) the class name and (2) the class for all of the Scrapy item
classes defined in the crawling module.
"""
import lib.inspection.web.crawling.item
import scrapy
return list(set(IntrospectionHelper.get_all_classes_of_type(
to_find=scrapy.Item,
path="lib/inspection/web/crawling",
)))
def process_spider_output(self, response, result, spider):
for i in result:
if isinstance(i, scrapy.Item) and (i['info'].get('player', '') == 'iqiyi'):
key = i['url']
if key not in self.items.keys():
self.items[key] = copy.deepcopy(i)
else:
self.items[key]['media_urls'].append(i['media_urls'][0])
if i['info']['count'] == len(self.items[key]['media_urls']):
yield self.__sort_item(key)
else:
yield i
def __sort_item(self, key):
item = self.items.pop(key)
item['media_urls'].sort(key=lambda url: int(re.findall(r'qd_index=(\d+)&', url)[0]))
item['info'].pop('index', None)
item['info'].pop('count', None)
item['info'].pop('player', None)
return item
# class MultimediaCrawlerMiddleware(object):
# @classmethod
# def from_crawler(cls, crawler):
# # This method is used by Scrapy to create your spiders.
# s = cls()
# crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
# return s
#
# def process_spider_input(self, response, spider):
# # Called for each response that goes through the spider
# # middleware and into the spider.
#
# # Should return None or raise an exception.
# return None
#
# def process_spider_output(self, response, result, spider):
# # Called with the results returned from the Spider, after
# # it has processed the response.
# # Must return an iterable of Request, dict or Item objects.
# for i in result:
# yield i
#
# def process_spider_exception(self, response, exception, spider):
# # Called when a spider or process_spider_input() method
# # (from other spider middleware) raises an exception.
#
# # Should return either None or an iterable of Response, dict
# # or Item objects.
# pass
#
# def process_start_requests(self, start_requests, spider):
# # Called with the start requests of the spider, and works
# # similarly to the process_spider_output() method, except
# # that it doesn’t have a response associated.
#
# # Must return only requests (not items).
# for r in start_requests:
# yield r
#
# def spider_opened(self, spider):
# spider.logger.info('Spider opened: %s' % spider.name)
def main():
"""Rutina principal para la ejecución del Spider"""
# set up signal to catch items scraped
def catch_item(sender, item, **kwargs):
print "Item extracted:", item
dispatcher.connect(catch_item, signal=signals.item_passed)
settings = Settings()
settings.set("USER_AGENT", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36")
settings.set("LOG_ENABLED",False)
# setup crawler
from scrapy.crawler import CrawlerProcess
crawler = CrawlerProcess(settings)
# definir el spider para el crawler
crawler.crawl(EuropythonSpyder())
# iniciar scrapy
print "STARTING ENGINE"
crawler.start() #iniciar el crawler llamando al spider definido
print "ENGINE STOPPED"
def main():
from scrapy.xlib.pydispatch import dispatcher
"""Rutina principal para la ejecución del Spider"""
# set up signal to catch items scraped
def catch_item(sender, item, **kwargs):
print "Item extracted:", item
dispatcher.connect(catch_item, signal=signals.item_passed)
settings = Settings()
settings.set("USER_AGENT", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36")
settings.set("LOG_ENABLED",False)
# setup crawler
from scrapy.crawler import CrawlerProcess
crawler = CrawlerProcess(settings)
# define spyder for the crawler
crawler.crawl(PydataSpiderDetails())
print "STARTING ENGINE"
crawler.start() #start the crawler
print "ENGINE STOPPED"
def __setattr__(self, name, value):
if name in self.fields:
raise AttributeError("Use item[{!r}] = {!r} to set field value".format(name, value))
super(BaseItem, self).__setattr__(name, value)
def process_response(self, response):
item = EuropythonItem()
print response
item['title'] = response.xpath("//div[contains(@class, 'grid-100')]//h1/text()").extract()
item['author'] = response.xpath("//div[contains(@class, 'talk-speakers')]//a[1]/text()").extract()
item['description'] = response.xpath("//div[contains(@class, 'cms')]//p//text()").extract()
item['date'] = response.xpath("//section[contains(@class, 'talk when')]/strong/text()").extract()
item['tags'] = response.xpath("//div[contains(@class, 'all-tags')]/span/text()").extract()
return item
def parse_details(self, response):
print 'link parseado %s' %response.url
hxs = scrapy.Selector(response)
item = PydatascheduleItem()
item['speaker'] = hxs.select('//div[@class="col-md-8"]/h4/a/text()').extract()[0].strip()
item['url'] = response.url
item['talk'] = hxs.select('//div[@class="col-md-8"]/h2/text()').extract()[0].strip()
item['time'] = hxs.select('//div[@class="col-md-8"]/h4/text()').extract()[0].replace("\n","").strip()
item['description'] = hxs.select('//div[@class="description"]/p/text()').extract()[0]
return item