def main(args):
rf = RecentFeed(args)
if args:
articles = []
for arg in args.urls[0]:
if args.verbose:
print arg
rf.get(arg)
rf.parse()
articles.append(rf.recently())
for article in articles[0]:
if args.output == 'html':
if type(article['title']) is types.UnicodeType:
article['title'] = article['title'].encode('utf-8', 'replace')
print '<li><a href="{0}">{1}</a></li>'.format(article['id'], article['title'])
elif args.output == 'json':
json.dumps({'title': article['title'], 'url': article['id']})
python类parse()的实例源码
def getRss(self, url):
d = feedparser.parse(url)
os.system("rm -r /tmp/rss.html")
with open('/tmp/rss.html', 'a') as the_file:
the_file.write('<!DOCTYPE html><html><head><meta')
the_file.write('charset="utf-8"><meta')
the_file.write('name="viewport" content="width=device-width, initial-scale=1"><title>' + d['feed']['title'] + '</')
the_file.write('title><style type="text/css">body{margin:40px auto;')
the_file.write('max-width:650px;line-height:1.6;font-size:18px;color:#444;padding:0')
the_file.write('10px}h1,h2,h3{line-height:1.2}a{text-decoration: none; color:black;};</style></head><body><!-- RSS Feed --><header><h1>')
the_file.write( d['feed']['title'] + '</h1>')
#the_file.write('<aside>' + '-' + '</aside>')
the_file.write('</header><hr noshade>')
the_file.write('<p>')
for post in d.entries:
the_file.write('<a href="' + post.link.encode('ascii', 'ignore') + '">' + post.title.encode('ascii', 'ignore') + "</a><br><br>")
the_file.write('</p>')
the_file.write('</body>')
url = QUrl( 'file:///' + 'tmp' + '/rss.html' )
self.webView.load(url)
def _get_channel_data_from_cache(self, key, config):
"""Fetch channel feed from cache."""
channel_path = self._get_channel_cache_path(key)
if os.path.exists(channel_path):
if "ttl" in config and isinstance(config["ttl"], int):
ttl = config["ttl"]
else:
ttl = self._settings.get_int(["ttl"])
ttl *= 60
now = time.time()
if os.stat(channel_path).st_mtime + ttl > now:
d = feedparser.parse(channel_path)
self._logger.debug(u"Loaded channel {} from cache at {}".format(key, channel_path))
return d
return None
def _get_channel_data_from_network(self, key, config):
"""Fetch channel feed from network."""
import requests
url = config["url"]
try:
start = time.time()
r = requests.get(url)
self._logger.info(u"Loaded channel {} from {} in {:.2}s".format(key, config["url"], time.time() - start))
except Exception as e:
self._logger.exception(
u"Could not fetch channel {} from {}: {}".format(key, config["url"], str(e)))
return None
response = r.text
channel_path = self._get_channel_cache_path(key)
with codecs.open(channel_path, mode="w", encoding="utf-8") as f:
f.write(response)
return feedparser.parse(response)
def parse(self, content):
"""Parses feed content of http response body into multiple
:class:`news.models.abstract.Readable`s.
Internally uses :mod:`~feedparser` library to extract entries from the
response body.
:param content: Http response body
:type content: :class:`str`
:returns: An iterator of parsed readables
:rtype: An iterator of :class:`news.models.abstract.Readable`
"""
f = feedparser.parse(content)
return (Readable(
author=e.author, title=e.title, content=e.content, url=e.link,
summary=e.summary, image=f.image) for e in f.entries)
def parse(self, content):
"""Parses feed content of http response body into multiple
:class:`news.models.abstract.Readable`s.
Internally uses :mod:`~feedparser` library to extract entries from the
response body.
:param content: Http response body
:type content: :class:`str`
:returns: An iterator of parsed readables
:rtype: An iterator of :class:`news.models.abstract.Readable`
"""
f = feedparser.parse(content)
return (Readable(
author=e.author, title=e.title, content=e.content, url=e.link,
summary=e.summary, image=f.image) for e in f.entries)
def update(self, mark_read=False):
# Brad Frost's feed starts with a newline,
# throwing off feedparser.
try:
content = requests.get(self.url).content.strip()
except requests.exceptions.ConnectionError:
logger.error('Could not sync %s' % self.url)
return
data = feedparser.parse(content)
for entry in data["entries"][:25]:
obj, created = Entry.objects.get_or_create(
source=self,
url=entry["link"],
defaults={
"title": entry["title"],
"author": (entry.get("author") or
data["feed"].get("author") or
self.name),
"summary": entry["summary"],
"sent": mark_read,
})
self.last_updated = datetime.datetime.now(pytz.utc)
self.save()
def parse_non_wp_blogs(blog):
from wsgi import non_wp_blogs
feed = feedparser.parse(blog)
post_table = []
for item in feed.entries:
title = item.title
url = item.link
post_date = DateTime(item.published).ISO()[:-9]
try:
author = item.author
except:
author = "N/A"
tags = get_tags(url)
curr_content = ""#get_content(non_wp_url = url)
post_table.append({'title': title, 'author': author, 'post_date': post_date, 'tags': tags, 'url': url, 'views': 0, 'content': curr_content})
return post_table
def remove_feed(chat_id, feed_url):
'''Function to remove (unsubscribe) a feed from the chat feeds file'''
# Create TSjson object for feeds of chat file and read the content
fjson_chat_feeds = TSjson.TSjson('{}/{}.json'.format(CONST['CHATS_DIR'], chat_id))
subs_feeds = fjson_chat_feeds.read_content()
subs_feeds = subs_feeds[0]
# Get the feed and set json data
feed = {}
feedpars = parse(feed_url)
feed['Title'] = feedpars['feed']['title']
feed['URL'] = feed_url
feed['SEARCH_TERMS'] = []
for sub_feed in subs_feeds['Feeds']:
if sub_feed['URL'] == feed['URL']:
feed['SEARCH_TERMS'] = sub_feed['SEARCH_TERMS']
break
# Remove the specific feed and update json file
subs_feeds['Feeds'].remove(feed)
fjson_chat_feeds.update(subs_feeds, 'Chat_id')
def get_context_data(self, **kwargs):
context = super(FeedReaderNavlet, self).get_context_data(**kwargs)
blogurl = None
feed = None
maxposts = 5
navlet = AccountNavlet.objects.get(pk=self.navlet_id)
if navlet.preferences:
blogurl = navlet.preferences.get('blogurl')
maxposts = int(navlet.preferences.get('maxposts', maxposts))
if self.mode == NAVLET_MODE_VIEW and blogurl:
feed = feedparser.parse(blogurl)
feed['maxentries'] = feed['entries'][:maxposts]
context.update({
'feed': feed,
'blogurl': blogurl,
'maxposts': maxposts
})
return context
def handle(text, mic, profile):
if 'INDIA' in text:
url = 'http://news.google.com/news?pz=1&cf=all&ned=in&hl=en&output=rss'
elif 'CRICKET' in text:
url = 'http://www.espncricinfo.com/rss/content/story/feeds/6.xml'
elif 'TECH' in text:
url = 'http://www.theregister.co.uk/headlines.atom'
else:
url = 'http://news.google.com/news?pz=1&cf=all&ned=us&hl=en&output=rss'
feed = feedparser.parse(url)
if not feed:
mic.say("I'm sorry. I could not get the news for you")
return
mic.say("Here is the headline news")
for post in feed.entries:
mic.say(post.title)
def get_headlines(self):
try:
# remove all children
for widget in self.headlinesContainer.winfo_children():
widget.destroy()
if news_country_code == None:
headlines_url = "https://news.google.com/news?ned=us&output=rss"
else:
headlines_url = "https://news.google.com/news?ned=%s&output=rss" % news_country_code
feed = feedparser.parse(headlines_url)
for post in feed.entries[0:5]:
headline = NewsHeadline(self.headlinesContainer, post.title)
headline.pack(side=TOP, anchor=W)
except Exception as e:
traceback.print_exc()
print "Error: %s. Cannot get news." % e
self.after(600000, self.get_headlines)
def getwordcounts(url):
# Parse the feed
d = feedparser.parse(url)
wc = {}
# Loop over all the entries
for e in d.entries:
if 'summary' in e:
summary = e.summary
else:
summary = e.description
# Extract a list of words
words = getwords(e.title + ' ' + summary)
for word in words:
wc.setdefault(word, 0)
wc[word] += 1
return d.feed.title, wc
def __init__(self, user, passwd, codec='iso-8859-1', api_request=dlcs_api_request, xml_parser=dlcs_parse_xml):
"""Initialize access to the API with ``user`` and ``passwd``.
``codec`` sets the encoding of the arguments.
The ``api_request`` and ``xml_parser`` parameters by default point to
functions within this package with standard implementations to
request and parse a resource. See ``dlcs_api_request()`` and
``dlcs_parse_xml()``. Note that ``api_request`` should return a
file-like instance with an HTTPMessage instance under ``info()``,
see ``urllib2.openurl`` for more info.
"""
assert user != ""
self.user = user
self.passwd = passwd
self.codec = codec
# Implement communication to server and parsing of respons messages:
assert callable(api_request)
self._api_request = api_request
assert callable(xml_parser)
self._parse_response = xml_parser
def read(feed, classifier):
# Get feed entries and loop over them
f = feedparser.parse(feed)
for entry in f['entries']:
print
print '-----'
# Print the contents of the entry
print 'Title: ' + entry['title'].encode('utf-8')
print 'Publisher: ' + entry['publisher'].encode('utf-8')
print
print entry['summary'].encode('utf-8')
# Combine all the text to create one item for the classifier
fulltext = '%s\n%s\n%s' % (entry['title'], entry['publisher'], entry['summary'])
# Print the best guess at the current category
print 'Guess: ' + str(classifier.classify(entry))
# Ask the user to specify the correct category and train on that
cl = raw_input('Enter category: ')
classifier.train(entry, cl)
def get_wet():
# Get the weather data
print("Updating weather for", postcode)
d = feedparser.parse(url)
entries = int(len(d['entries']))
val = " " + d['entries'][0]['title']
val +=" " + d['entries'][1]['title']
val +=" " + d['entries'][2]['title']
# Tidy & shorten the message for the scroll display
val = val.replace("Maximum", "Max")
val = val.replace("Minimum", "Min")
val = val.replace("Temperature: ", "")
val = val.replace(u"\u00B0","")
val = val.replace(",", "")
val = val.replace("(", "")
val = val.replace(")", "")
return val
def tor_search(self, keyword):
self.mode = ''
self.sender.sendMessage('Searching torrent..')
self.navi = feedparser.parse(self.rssUrl + parse.quote(keyword))
outList = []
if not self.navi.entries:
self.sender.sendMessage('Sorry, No results')
self.mode = self.MENU1_1
return
for (i, entry) in enumerate(self.navi.entries):
if i == 10:
break
title = str(i + 1) + ". " + entry.title
templist = []
templist.append(title)
outList.append(templist)
show_keyboard = {'keyboard': self.put_menu_button(outList)}
self.sender.sendMessage('Choose one from below',
reply_markup=show_keyboard)
self.mode = self.MENU1_2
def handle_headlines(self, message):
"""Speak the latest headlines from the selected feed."""
title = message.data['TitleKeyword']
feed = feedparser.parse(self.feeds[title])
items = feed.get('items', [])
# Only read three items
if len(items) > 3:
items = items[:3]
self.cache(title, items)
self._is_reading_headlines = True
self.speak('Here\'s the latest headlines from ' +
message.data['TitleKeyword'])
for i in items:
if not self._is_reading_headlines:
break
logger.info('Headline: ' + i['title'])
self.speak(i['title'])
time.sleep(5)
self._is_reading_headlines = False
def get_items(self, name):
"""
Get items from the named feed, if cache exists use cache otherwise
fetch the feed and update.
"""
cache_timeout = 10 * 60
cached_time = float(self.cache_time.get(name, 0))
if name in self.cached_items \
and (time.time() - cached_time) < cache_timeout:
logger.debug('Using cached feed...')
return self.cached_items[name]
else:
logger.debug('Fetching feed and updating cache')
feed = feedparser.parse(self.feeds[name])
feed_items = feed.get('items', [])
self.cache(name, feed_items)
if len(feed_items) > 5:
return feed_items[:5]
else:
return feed_items
def get_status_fm(service):
response = feedparser.parse(service["url"])
for item in response.entries:
status = item.title.split(" - ")[-1]
date = datetime(*item.published_parsed[:6])
icon = ICON_STATUS_GOOD if status == "Up" else None
icon = ICON_STATUS_MINOR if status == "Warning" else icon
icon = ICON_STATUS_MAJOR if status == "Down" else icon
wf.add_item(
title=status.capitalize(),
subtitle=date.strftime('%d %B %Y - ') + item.description,
icon=icon,
icontype="file"
)