def crawl_feed(self, feed_url: str=None) -> List[str]:
urls = []
if not feed_url:
feed_url = constants.rss_url
feed = feedparser.parse(
feed_url,
handlers=ProxyHandler,
request_headers=self.settings.requests_headers
)
for item in feed['items']:
if any([item['title'].startswith(category) for category in self.own_settings.accepted_rss_categories]):
urls.append(item['link'])
return urls
python类parse()的实例源码
generatefeedvector.py 文件源码
项目:Programming-Collective-Intelligence
作者: clyyuanzi
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def getwordcounts(url):
d = feedparser.parse(url)
wc ={}
#???????? d.entries
for e in d.entries:
if 'summary' in e:
summary = e.summary
else:
summary = e.description
#??????
words = getwords(e.title+' '+summary)
for word in words:
wc.setdefault(word,0)
wc[word]+=1
print d.feed.title
return d.feed.title,wc
def news():
"""Get news from different ATOM RSS feeds."""
import feedparser
from pybossa.core import sentinel
from pybossa.news import get_news, notify_news_admins, FEED_KEY
try:
import cPickle as pickle
except ImportError: # pragma: no cover
import pickle
urls = ['https://github.com/pybossa/pybossa/releases.atom',
'http://scifabric.com/blog/all.atom.xml']
score = 0
notify = False
if current_app.config.get('NEWS_URL'):
urls += current_app.config.get('NEWS_URL')
for url in urls:
d = feedparser.parse(url)
tmp = get_news(score)
if (len(tmp) == 0) or (tmp[0]['updated'] != d.entries[0]['updated']):
sentinel.master.zadd(FEED_KEY, float(score),
pickle.dumps(d.entries[0]))
notify = True
score += 1
if notify:
notify_news_admins()
def newscaster(p, l):
""" Dictate the latest news (which are essentially entries in the RSS feed) """
respond("fetching news", prepend_positive_response=True)
feeds = [feedparser.parse(url) for url in preferences.get_news_feed_urls()]
counter = 1
for feed in feeds:
for entry in feed.entries:
data = []
parser = NewsFeedParser(data)
try:
description = entry.description
except AttributeError:
description = "None given"
parser.feed(description)
news = "News #" + str(counter) + ": title: " + entry.title + ". description: " + " ".join(data)
respond(news, override_subtitle=True)
counter += 1
def parse_job_list_page(self, response):
self.get_connector().log(self.name, self.ACTION_CRAWL_LIST, response.url)
feed_parser = feedparser.parse(response.body)
for job_entry in feed_parser.entries:
job_url = job_entry.link
job_publication_date = datetime.fromtimestamp(mktime(job_entry.published_parsed))
job_publication_time = mktime(job_publication_date.timetuple())
last_job_publication_time = mktime(self._last_job_date.timetuple())
if job_publication_time <= last_job_publication_time:
self.get_connector().log(self.name,
self.ACTION_MARKER_FOUND,
"%s <= %s" % (job_publication_time, last_job_publication_time))
return
prepared_job = JobItem()
request = Request(job_url, self.parse_job_page)
request.meta['item'] = prepared_job
prepared_job['title'] = job_entry.title
prepared_job['description'] = job_entry.description
prepared_job['publication_datetime'] = job_publication_date
yield request
def fetch(self):
fetch_time = int(time.time())
feed = feedparser.parse(self.url, etag=self.status.last_result)
last_updated = self.status.updated
self.status = ChoreStatus(fetch_time, feed.get('etag'))
for e in feed.entries:
evt_time = int(calendar.timegm(e.updated_parsed))
if last_updated and evt_time > last_updated:
evturl = e.link
match = RE_BADURL.match(evturl)
if match:
evturl = urllib.parse.urljoin(self.url, match.group(1))
else:
evturl = urllib.parse.urljoin(self.url, evturl)
if not self.title_regex or self.title_regex.search(e.title):
yield Event(self.name, self.category,
evt_time, e.title, e.summary, evturl)
def fetch(self):
if self.category == 'release':
url = 'https://github.com/%s/releases.atom' % self.repo
elif self.category == 'tag':
url = 'https://github.com/%s/tags.atom' % self.repo
elif self.category == 'commit':
url = 'https://github.com/%s/commits/%s.atom' % \
(self.repo, self.branch or 'master')
else:
raise ValueError('unknown category: %s' % self.category)
fetch_time = int(time.time())
feed = feedparser.parse(url, etag=self.status.last_result)
last_updated = self.status.updated
self.status = ChoreStatus(fetch_time, feed.get('etag'))
for e in feed.entries:
evt_time = calendar.timegm(e.updated_parsed)
if last_updated and evt_time > last_updated:
yield Event(self.name, self.category,
evt_time, e.title, e.summary, e.link)
def detect(cls, name, url, **kwargs):
urlp = urllib.parse.urlparse(url)
if urlp.netloc != 'github.com':
return
pathseg = urlp.path.lstrip('/').split('/')
if pathseg[0] == 'downloads':
pathseg.pop(0)
repo = '/'.join(pathseg[:2])
if repo.endswith('.git'):
repo = repo[:-4]
if len(pathseg) > 2:
if pathseg[2] == 'releases':
return cls(name, repo, 'release')
elif pathseg[2] == 'tags':
return cls(name, repo, 'tag')
elif pathseg[2] == 'commits':
return cls(name, repo, 'commit', pathseg[3])
for category, url in (
('release', 'https://github.com/%s/releases.atom' % repo),
('tag', 'https://github.com/%s/tags.atom' % repo),
('commit', 'https://github.com/%s/commits/master.atom' % repo)):
feed = feedparser.parse(url)
if feed.entries:
return cls(name, repo, category)
def detect(cls, name, url, **kwargs):
urlp = urllib.parse.urlparse(url)
if urlp.netloc != 'bitbucket.org':
return
pathseg = urlp.path.lstrip('/').split('/')
repo = '/'.join(pathseg[:2])
if repo.endswith('.git'):
repo = repo[:-4]
if len(pathseg) > 2:
if pathseg[2] == 'downloads':
return cls(name, repo, 'release')
elif pathseg[2] == 'get':
return cls(name, repo, 'tag')
for category, url in (
('release', 'https://api.bitbucket.org/2.0/repositories/%s/downloads' % repo),
('tag', 'https://api.bitbucket.org/2.0/repositories/%s/refs/tags' % repo)):
req = HSESSION.get(url, timeout=30)
if req.status_code == 200:
d = req.json()
if d.get('values'):
return cls(name, repo, category)
def detect_name(url, title):
urlp = urllib.parse.urlparse(url)
if urlp.netloc == 'github.com':
return urlp.path.strip('/').split('/')[1].lower()
else:
urlpath = os.path.splitext(urlp.path.strip('/'))[0].lower().split('/')
urlkwd = [x for x in urlpath if x not in URL_FILTERED and not RE_IGN.match(x)]
titlel = title.lower()
candidates = []
for k in urlkwd:
if k in titlel:
candidates.append(k)
if candidates:
return candidates[-1]
else:
host = urlp.hostname.split('.')
cand2 = [x for x in urlp.hostname.split('.') if x not in URL_FILTERED]
if cand2:
return cand2[0]
else:
return host[-2]
def getNewsFeed(self):
# parse the feed and get the result in res
res = feedparser.parse(self.rssFeedUrl)
# get the total number of entries returned
resCount = len(res.entries)
# exit out if empty
if resCount == 0:
return ""
# if the resCount is less than the feedCount specified cap the feedCount to the resCount
if resCount < self.feedCount:
self.feedCount = resCount
# create empty array
resultList = []
# loop from 0 to feedCount so we append the right number of entries to the return list
for x in range(0, self.feedCount):
resultList.append(res.entries[x])
return resultList
def get_arxiv_mail(title_words, abstract_words,
author_words, feed_url, my_mail):
feed = feedparser.parse(feed_url)
filtered_entries = [entry for entry in feed.entries if filter(entry)]
msg = ["<h1>arXiv results for {}</h1>".format(date_str)]
for entry in filtered_entries:
msg.append('<h2>{}</h2>'.format(entry.title))
msg.append('<h3>{}</h3>'.format(strip_html(entry.author)))
msg.append('<p>{}</p>'.format(strip_html(entry.description)))
num = 'arXiv:' + entry['id'].split('/')[-1]
link = '<a href="{}">{}</a>'.format(entry['id'], num)
pdf_link = '[<a href="{}">pdf</a>]'.format(entry.id.replace('abs', 'pdf'))
msg.append(link + " " + pdf_link)
keywords = ', '.join(title_words + abstract_words)
authors = ', '.join(author_words)
footer = ("<p><em>Selected keywords: {}. Selected authors: {}. " +
"From feed: {}</em></p>")
msg.append(footer.format(keywords, authors, feed_url))
msg = "".join(msg)
return msg
def test_latest_feeds(self):
packages = Project.objects.all().order_by('-created')[:15]
for feed_type in ('rss', 'atom'):
url = reverse('feeds_latest_packages_%s' % feed_type)
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
feed = feedparser.parse(response.content)
expect_titles = [p.title for p in packages]
actual_titles = [e['title'] for e in feed.entries]
for expected_title, actual_title in zip(expect_titles, actual_titles):
self.assertEqual(expected_title, actual_title)
expect_summaries = [p.repo_description for p in packages]
actual_summaries = [e['summary'] for e in feed.entries]
for expected_summary, actual_summary in zip(expect_summaries, actual_summaries):
self.assertEqual(expected_summary, actual_summary)
def _parse_episodes_from_feed(self):
feed = feedparser.parse(settings.RSS_FEED)
if not feed.entries:
logging.error('No episodes found in RSS feed, please check URL')
episodes = []
for feed_item in feed.entries:
show = self._get_matching_show(feed_item)
if show:
episode = self._get_episode_data_from_item(feed_item, show)
quality_check = episode.quality is not None and \
episode.quality >= show.minimum_quality
follow_check = episode.season > show.follow_from_season or \
(episode.season == show.follow_from_season and
episode.episode >= show.follow_from_episode)
is_downloaded = self._is_episode_downloaded(episode)
if quality_check and follow_check and not is_downloaded:
episodes.append(episode)
return episodes
def read_RSS_feed(assistant, player_vlc, instance_vlc, rss_dic, number_records_to_read):
assistant.speak("Tell me the name of the rss feed")
msg = assistant.active_listen()
if msg in rss_dic.keys():
rss = rss_dic[msg]
else:
rss = DEFAULT_RSS
assistant.speak("ok! I am calling my assistant, she will read the RSS feed.")
res = feedparser.parse(rss)
number_records_in_feed = len(res.entries)
if number_records_in_feed < number_records_to_read:
number_records_to_read = number_records_in_feed
entries_to_read = [entry.title_detail.value for entry in res.entries[0:number_records_to_read]]
txt=". ".join(entries_to_read)
read_nicely_text(txt, instance_vlc, player_vlc)
'''
for entry in entries_to_read:
assistant.speak(entry.title_detail.value)
time.sleep(1)
'''
def play_podcast(assistant, player_vlc, instance_vlc, podcast_dic, podcast_index=None):
assistant.speak("Tell me the name of the podcast")
msg = assistant.active_listen()
if msg in podcast_dic.keys():
rss = podcast_dic[msg]
else:
rss = DEFAULT_PODCAST
assistant.speak("There you go!")
res = feedparser.parse(rss)
number_records_in_feed = len(res.entries)
if podcast_index is None:
podcast_index = random.randint(0,len(res.entries) - 1)
if number_records_in_feed < podcast_index:
podcast_index = number_records_in_feed
href = ""
for link in res.entries[podcast_index].links:
if ".mp3" in link.href:
href = link.href
break
if href != "":
media = instance_vlc.media_new(href)
player_vlc.set_media(media)
player_vlc.play()
else:
assistant.speak("I am sorry, but the podcast requested is not available!")
def handle(msg):
"""
This function handle all messages incoming from users
"""
content_type, chat_type, chat_id = telepot.glance(msg)
command_input = msg['text']
if command_input == '/start':
# Check if already registred
if register_user(chat_id):
bot.sendMessage(chat_id, start_msg)
feed = feedparser.parse(feed_url)
# Send all news from older to newest
for entry in reversed(feed.entries):
msg = entry.title + '\n' + entry.link
bot.sendMessage(chat_id, msg)
if command_input == '/stop':
bot.sendMessage(chat_id, stop_msg)
remove_user(chat_id)
def get_data_from_feed(feed, posts, loop):
try:
data = parse(feed)
if data.bozo == 0:
category = data['feed']['title']
if len(category) > 0:
gather(*[parse_item(posts=posts, data=data, feed=feed, \
category=category, i=i, loop=loop) for i in range(0, \
len(data.entries))], return_exceptions=True)
else:
err = data.bozo_exception
print(colored.red("Feed {0} is malformed: {1}".format(feed, err)))
source_obj = Sources.objects.get(feed=feed)
if source_obj.failures < 5:
source_obj.failures = source_obj.failures + 1
else:
source_obj.failures = source_obj.failures + 1
source_obj.active = False
source_obj.save()
except Exception as err:
print(colored.red("At get_data_from_feed {}".format(err)))
def get(query='', lang='en'):
d = feedparser.parse('https://news.google.it/news?cf=all&hl={l}&query={q}&pz=1&ned={l}&output=rss'
.format(l=lang, q=query))
text = d.feed.title
for e in d.entries:
soup = bs4.BeautifulSoup(e.description, 'html.parser')
news = soup.find_all('font', size="-1")[1].get_text()
title = e.title.rsplit('-')[0]
author = e.title.rsplit('-')[1]
title, author = title.rstrip().lstrip(), author.rstrip().lstrip()
link = e.link
text += (
'\n?? <b>{title}</b> • <a href="{link}">{author}</a>'
'\n{news}\n'.format(title=title, news=news, link=link, author=author)
)
return text
def update(feed):
last_etag = feed.etag
last_modified = feed.modified
feed_update = feedparser.parse(url, etag=last_etag, modified=last_modified)
o = feed['entries']
o = o[0]
if feed_update.status == 304:
return "304"
else:
return "200"
#####################################################################################################################################################
def alog(self, *, username):
"""Gets a users recent adventure log"""
username = username.replace(" ", "_")
if feedparser is None:
await self.bot.say("You'll need to run `pip3 install feedparser` "
"before you can get a user's adventure log.")
return
url = self.alog_url + username
try:
page = await aiohttp.get(url)
text = await page.text()
text = text.replace("\r", "")
except:
await self.bot.say("No user found.")
feed = feedparser.parse(text)
titles = [post.title for post in feed.entries]
await self.bot.say(self._fmt_alog(username, titles))
def get_entries(feed):
NEW_POST = u"""New post, author {author}, title {title} {content}"""
for entry in feed.entries:
if "http" in entry.id:
nid = hashlib.md5(str(entry.id))
entry.id = nid.hexdigest()
entry_content = entry.content[0].value
soup = BeautifulSoup(entry_content, 'html.parser')
chunks = split_content_by_dot(soup, REQUEST_LIMIT-len(NEW_POST))
chunks = list(chunks)
published = dateutil.parser.parse(entry.published)
for i, chunk in enumerate(chunks):
if i == 0:
chunk = NEW_POST.format(
author=entry.author,
title=entry.title,
content=chunk)
yield dict(
content=chunk,
id="%s_%d" % (entry.id, i),
title=entry.title,
published=published - datetime.timedelta(0, i),
)
remaining = chunk
def make_rss_dictionary():
""" Grabs the RSS data and makes a dictionary out of the wanted information """
print('*** Al Jazeera ***')
print('\nFetching Al Jazeera feed...')
feed = feedparser.parse(url)
rss_dict = []
for article in feed['entries']:
rss_dict.append({
"title": article.title,
"description": article.summary,
"url": article.link,
})
print('Done\n')
return rss_dict
def topics_id_rss(self):
logging.debug('fetch rss feeds')
topic_ids=list()
for v2ex_rss_url in self.v2ex_rss_url_list:
feed=feedparser.parse(v2ex_rss_url)
logging.debug('fetch rss feed: %s' % v2ex_rss_url)
items=feed["items"]
for item in items:
author=item["author"]
title=item["title"]
link=item["link"]
published=item[ "date" ]
summary=item["summary"]
topic_id=int(re.findall(r't\/(\d+)#?', link)[0])
topic_ids.append(topic_id)
topic_ids=set(topic_ids)
return topic_ids
def fetch_feed_if_updated(url, date):
"""
Fetches an RSS feed if has been updated since a given date.
Args:
url: URL to the RSS feed
date: Date as time_struct.
Returns:
FeedParser object representing the feed if the feed has been
updated, None otherwise.
"""
feed = feedparser.parse(url)
if feed_updated(feed, date):
return feed
else:
return None
def news():
url = 'https://www.bunq.com/en/news/feed.rss'
feed = feedparser.parse(url)
data = []
for item in feed['items']:
s = MLStripper()
s.feed(item['summary'])
obj = {
'title': item['title'],
'date': item['published'],
'summary': s.get_data(),
'link': item['link'],
'author': item['author']
}
data.append(obj)
with open('bunq_bot/responses/commands/news.md', 'r') as f:
return TRender(f.read()).render({'data': data[:5]})
def crawl(url,username,full_articles=True):
articles = list()
d = feedparser.parse(url)
for entry in d["entries"]:
if 'published_parsed' in entry:
pubdate = pytz.utc.localize(datetime.fromtimestamp(mktime(entry['published_parsed'])))
else:
pubdate = pytz.utc.localize(datetime.fromtimestamp(mktime(entry['updated_parsed'])))
articles.append(Article(
title=entry['title'],
url= entry['link'],
body=entry["content"][0]["value"] if 'content' in entry else entry["summary"],
username=username,
pubdate=pubdate,
))
return articles
def get_feed_entries(self, url):
parse = feedparser.parse(url)
num = len(parse.entries)
if num > 0:
for entry in parse.entries:
title = getattr(entry, 'title', None)
url = getattr(entry, 'link', None)
desc = getattr(entry, 'description', None)
image = parse.get('image', '')
if not desc:
desc = getattr(entry, 'summary', None)
description = BeautifulSoup(desc).get_text()
item, created = Article.objects.get_or_create(
title=title, url=url, desc=desc)
pubdate = getattr(entry, 'published', None)
if pubdate:
item.created = tparser.parse(pubdate, ignoretz=True)
udate = getattr(entry, 'updated', None)
if udate:
item.updated = tparser.parse(udate, ignoretz=True)
item.save()
print item.title
def get_feed_entries_from_url(url):
"""
Gets feed entries from an url that should be an
RSS or Atom feed.
>>> get_feed_entries_from_url("http://delhomme.org/notfound.html")
Error 404 while fetching "http://delhomme.org/notfound.html".
>>> feed = get_feed_entries_from_url("http://blog.delhomme.org/index.php?feed/atom")
>>> feed.status
200
"""
feed = feedparser.parse(url)
if 'status' in feed:
feed = manage_http_status(feed, url)
else:
# An error happened such that the feed does not contain an HTTP response
manage_non_http_errors(feed, url)
feed = None
return feed
# End of get_feed_entries_from_url() function
def get(self, url):
""" Wrapper for API requests. Take a URL, return a json array.
>>> url = 'http://rss.denverpost.com/mngi/rss/CustomRssServlet/36/213601.xml'
>>> parser = build_parser()
>>> args = parser.parse_args([url])
>>> rf = RecentFeed(args)
>>> rf.get(url)
True
>>> rf.parse()
#>>> articles = rf.recently()
"""
h = httplib2.Http('.tmp')
(response, xml) = h.request(url, "GET")
if response['status'] != '200':
if 'verbose' in self.args and self.args.verbose:
print "URL: %s" % url
raise ValueError("URL %s response: %s" % (url, response.status))
self.xml = xml
return True