def refresh_posts(posts):
if not posts:
return posts
t = get_twitter_for_acc(posts[0].author)
if not t:
return
try:
tweets = t.statuses.lookup(
_id=",".join((post.twitter_id for post in posts)),
trim_user=True, tweet_mode='extended')
except (URLError, TwitterError) as e:
handle_error(e)
refreshed_posts = list()
for post in posts:
tweet = next(
(tweet for tweet in tweets if tweet['id_str'] == post.twitter_id),
None)
if not tweet:
db.session.delete(post)
else:
post = db.session.merge(post_from_api_tweet_object(tweet))
refreshed_posts.append(post)
return refreshed_posts
python类TwitterError()的实例源码
def test_follow_congresspeople(self, logging_mock):
profiles = pd.DataFrame([
['DepEduardoCunha', 'DepEduardoCunha2'],
['DepRodrigomaia', None],
[None, None]
], columns=['twitter_profile', 'secondary_twitter_profile'])
self.subject._profiles = profiles
calls = [
mock.call.CreateFriendship(screen_name='DepEduardoCunha'),
mock.call.CreateFriendship(screen_name='DepEduardoCunha2'),
mock.call.CreateFriendship(screen_name='DepRodrigomaia'),
]
self.subject.follow_congresspeople()
self.api.assert_has_calls(calls, any_order=True)
self.assertEqual(3, self.api.CreateFriendship.call_count)
self.api.CreateFriendship.side_effect = TwitterError('Not found')
self.subject.follow_congresspeople()
logging_mock.warning.assert_called()
self.assertEqual(3, logging_mock.warning.call_count)
def enf_type(field, _type, val):
""" Checks to see if a given val for a field (i.e., the name of the field)
is of the proper _type. If it is not, raises a TwitterError with a brief
explanation.
Args:
field:
Name of the field you are checking.
_type:
Type that the value should be returned as.
val:
Value to convert to _type.
Returns:
val converted to type _type.
"""
try:
return _type(val)
except ValueError:
raise TwitterError({
'message': '"{0}" must be type {1}'.format(field, _type.__name__)
})
def twitter_login_step1():
try:
return redirect(libforget.twitter.get_login_url(
callback=url_for('twitter_login_step2', _external=True),
**app.config.get_namespace("TWITTER_")
))
except (TwitterError, URLError):
if sentry:
sentry.captureException()
return redirect(
url_for('about', twitter_login_error='', _anchor='log_in'))
def autopost_twitter(self):
posts = FeedPost.objects.filter(for_network=conf.NETWORK_TWITTER)[:MAX_POSTS_PER_CALL]
for post in posts:
try:
api.twitter.post(post.text, url=post.url)
except twitter.TwitterError as e:
logger.error("Twitter Autopost: error on #{0.pk}: {1.args}".format(post, e))
else:
logger.info("Twitter Autopost: posted #{0.pk} ('{0}')".format(post))
post.scheduled = False
post.posted = now()
post.save()
def tweet_incident(sender, instance, created=False, **kwargs):
if created:
try:
instance.tweet()
except TwitterError as exc:
print 'Exception while tweeting incident: {}'.format(str(exc))
def follow_congresspeople(self):
"""
Friend all congresspeople accounts on Twitter.
"""
profiles = pd.concat([self.profiles()['twitter_profile'],
self.profiles()['secondary_twitter_profile']])
for profile in profiles[profiles.notnull()].values:
try:
self.api.CreateFriendship(screen_name=profile)
except twitter.TwitterError:
logging.warning('{} profile not found'.format(profile))
publish_playlist_twitter.py 文件源码
项目:homemadescripts
作者: helioloureiro
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def TwitterPost(video_info):
# App python-tweeter
# https://dev.twitter.com/apps/815176
tw = twitter.Api(
consumer_key = cons_key,
consumer_secret = cons_sec,
access_token_key = acc_key,
access_token_secret = acc_sec
)
title = video_info.keys()[0]
print "Title: %s" % title
image = video_info[title]['thumb']
print "Image: %s" % image
url = video_info[title]['url']
print "Video: %s" % url
fd, file_image = tempfile.mkstemp()
GetImage(image, fd)
msg = "Watch again: %s\n%s #pyconse" % (title, url)
print "Posting: %s" % msg
print "Image: %s" % file_image
try:
tw.PostMedia(status = msg,media = file_image)
os.unlink(file_image)
except twitter.TwitterError as err:
# twitter.TwitterError: [{u'message': u'Status is a duplicate.', u'code': 187}]
print "ERROR: %s" % err.args[0][0]['message']
print "Post on Twitter failed. Trying again..."
if os.path.exists(file_image):
os.unlink(file_image)
video = GetVideo()
TwitterPost(video)
return
try:
msg = "Don't forget to send you talk proposal for #pyconse (September 6th in Stockholm). key=%d " % random.randint(1000,5000) + \
"https://goo.gl/RjHpoS"
print "Posting: %s" % msg
tw.PostUpdate(msg)
except twitter.TwitterError as err:
error = err.args[0][0]
print "ERROR: %s" % error['message']
def SendingList(api, tag, list_name):
"""
Sending message
"""
global FFS, CONTROLE, counter, dryrun
print "Running for %s: %d" % (tag, len( FFS[list_name] ))
MSG = tag + " "
for name in FFS[list_name]:
if CONTROLE.has_key(name):
continue
else:
CONTROLE[name] = 1
print counter, name
name = '@' + name
if ( len(MSG + " " + name) > SIZE):
print MSG
if not dryrun:
api.PostUpdate(MSG)
sleep(randrange(1,30) * 60)
MSG = tag + " " + name
else:
MSG += " " + name
counter += 1
print MSG
if not dryrun:
try:
if not re.search("@", MSG):
# empty
return
api.PostUpdate(MSG)
except twitter.TwitterError as e:
if ( re.search('Status is a duplicate.', e.message) ):
pass
sleep(randrange(1,30) * 60)
def testTwitterError(self):
'''Test that twitter responses containing an error message are wrapped.'''
self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json',
curry(self._OpenTestData, 'public_timeline_error.json'))
# Manually try/catch so we can check the exception's value
try:
statuses = self._api.GetUserTimeline()
except twitter.TwitterError, error:
# If the error message matches, the test passes
self.assertEqual('test error', error.message)
else:
self.fail('TwitterError expected')
def __init__(self, credentials):
err_msg = ''
exception = None
self._logger = logging.getLogger(__name__)
try:
self._twitter_api = twitter.Api(
consumer_key=credentials['consumer_key'],
consumer_secret=credentials['consumer_secret'],
access_token_key=credentials['access_token_key'],
access_token_secret=credentials['access_token_secret']
)
except twitter.TwitterError as exc:
err_msg = 'The following error: %s, occurred while connecting ' \
'to the twitter API.' % exc.message
exception = exc
except KeyError as exc:
err_msg = 'Malformed credentials dictionary: %s.' % exc.message
exception = exc
except Exception as exc:
err_msg = 'An unhandled error: %s, occurred while connecting ' \
'to the twitter API.' % exc
exception = exc
if err_msg:
logging.getLogger(__name__).log(logging.ERROR, err_msg)
raise exception
def _push_job_offers_to_twitter(self, num_tweets_to_push):
# Do not push every job offer at once. The Twitter API won't allow it.
# We thus push them num_offers_to_push at a time.
if num_tweets_to_push > self.MAX_TWEETS_TO_PUSH:
err_msg = 'Cannot push %s tweets at once, pushing %s tweets ' \
'instead.' % (num_tweets_to_push, self.MAX_TWEETS_TO_PUSH)
self._logging(logging.WARNING, err_msg)
num_tweets_to_push = self.MAX_TWEETS_TO_PUSH
self._logging(logging.INFO, 'Acquiring unpublished job offers.')
to_push = JobAlchemy.get_not_pushed_on_twitter(num_tweets_to_push)
for job_offer in to_push:
tweet = self._format_tweet(job_offer.id, job_offer.title)
try:
self._logging(logging.INFO, 'Publishing to Twitter.')
self._twitter_api.PostUpdate(tweet)
except twitter.TwitterError as exc:
err_msg = '[Job offer id: %s] The following error: %s, ' \
'occurred while pushing the following tweet: %s.' \
% (job_offer.id, exc.message, tweet)
self._logging(logging.WARNING, err_msg)
except Exception as exc:
err_msg = '[Job offer id: %s] An unhandled error: %s, ' \
'occurred while pushing the following tweet: %s.' \
% (job_offer.id, exc, tweet)
self._logging(logging.ERROR, err_msg)
else:
# The tweet has been pushed successfully. Mark the job offer as
# pushed on Twitter in the Postgresql database, so we don't push
# it again on Twitter later on.
self._logging(logging.INFO, 'Marking as published on Twitter.')
JobAlchemy.set_pushed_on_twitter(job_offer.id, True)
def get_statements(self):
"""
Returns list of random statements from the API.
"""
from twitter import TwitterError
statements = []
# Generate a random word
random_word = self.random_word(self.random_seed_word)
self.logger.info(u'Requesting 50 random tweets containing the word {}'.format(random_word))
tweets = self.api.GetSearch(term=random_word, count=50)
for tweet in tweets:
statement = Statement(tweet.text)
if tweet.in_reply_to_status_id:
try:
status = self.api.GetStatus(tweet.in_reply_to_status_id)
statement.add_response(Response(status.text))
statements.append(statement)
except TwitterError as error:
self.logger.warning(str(error))
self.logger.info('Adding {} tweets with responses'.format(len(statements)))
return statements
def fetch_acc(account, cursor):
t = get_twitter_for_acc(account)
if not t:
print("no twitter access, aborting")
return
try:
user = t.account.verify_credentials()
db.session.merge(account_from_api_user_object(user))
kwargs = {
'user_id': account.twitter_id,
'count': 200,
'trim_user': True,
'tweet_mode': 'extended',
}
if cursor:
kwargs.update(cursor)
if 'max_id' not in kwargs:
most_recent_post = (
Post.query.order_by(db.desc(Post.created_at))
.filter(Post.author_id == account.id).first())
if most_recent_post:
kwargs['since_id'] = most_recent_post.twitter_id
tweets = t.statuses.user_timeline(**kwargs)
except (TwitterError, URLError) as e:
handle_error(e)
print("processing {} tweets for {acc}".format(len(tweets), acc=account))
if len(tweets) > 0:
kwargs['max_id'] = +inf
for tweet in tweets:
db.session.merge(post_from_api_tweet_object(tweet))
kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id'])
else:
kwargs = None
db.session.commit()
return kwargs
def parse_media_file(passed_media):
""" Parses a media file and attempts to return a file-like object and
information about the media file.
Args:
passed_media: media file which to parse.
Returns:
file-like object, the filename of the media file, the file size, and
the type of media.
"""
img_formats = ['image/jpeg',
'image/png',
'image/gif',
'image/bmp',
'image/webp']
video_formats = ['video/mp4',
'video/quicktime']
# If passed_media is a string, check if it points to a URL, otherwise,
# it should point to local file. Create a reference to a file obj for
# each case such that data_file ends up with a read() method.
if not hasattr(passed_media, 'read'):
if passed_media.startswith('http'):
data_file = http_to_file(passed_media)
filename = os.path.basename(passed_media)
else:
data_file = open(os.path.realpath(passed_media), 'rb')
filename = os.path.basename(passed_media)
# Otherwise, if a file object was passed in the first place,
# create the standard reference to media_file (i.e., rename it to fp).
else:
if passed_media.mode not in ['rb', 'rb+', 'w+b']:
raise TwitterError('File mode must be "rb" or "rb+"')
filename = os.path.basename(passed_media.name)
data_file = passed_media
data_file.seek(0, 2)
file_size = data_file.tell()
try:
data_file.seek(0)
except Exception as e:
pass
media_type = mimetypes.guess_type(os.path.basename(filename))[0]
if media_type is not None:
if media_type in img_formats and file_size > 5 * 1048576:
raise TwitterError({'message': 'Images must be less than 5MB.'})
elif media_type in video_formats and file_size > 15 * 1048576:
raise TwitterError({'message': 'Videos must be less than 15MB.'})
elif media_type not in img_formats and media_type not in video_formats:
raise TwitterError({'message': 'Media type could not be determined.'})
return data_file, filename, file_size, media_type
def get_tweets(tweet_ids, consumer_key, consumer_secret, access_token, access_token_secret, nlp):
"""
Expands tweets from Twitter
:param tweet_ids: the list of tweet IDs to expand
:return: a dictionary of tweet ID to tweet text
"""
# Save tweets in a temporary file, in case the script stops working and re-starts
tweets = {}
if os.path.exists('tweet_temp'):
with codecs.open('tweet_temp', 'r', 'utf-8') as f_in:
lines = [tuple(line.strip().split('\t')) for line in f_in]
tweets = { tweet_id : tweet for (tweet_id, tweet) in lines }
api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=access_token,
access_token_secret=access_token_secret)
[sleeptime, resettime] = reset_sleep_time(api)
with codecs.open('tweet_temp', 'a', 'utf-8') as f_out:
for tweet_id in tweet_ids:
# We didn't download this tweet yet
if not tweet_id in tweets:
try:
curr_tweet = api.GetStatus(tweet_id, include_entities=False)
tweets[tweet_id] = clean_tweet(' '.join([t.lower_ for t in nlp(curr_tweet.text)]))
except twitter.TwitterError as err:
error = str(err)
# If the rate limit exceeded, this script should be stopped and resumed the next day
if 'Rate limit exceeded' in error:
raise
# Other error - the tweet is not available :(
print 'Error reading tweet id:', tweet_id, ':', error
tweets[tweet_id] = 'TWEET IS NOT AVAILABLE'
print >> f_out, '\t'.join((tweet_id, tweets[tweet_id]))
time.sleep(sleeptime)
if time.time() >= resettime:
[sleeptime, resettime] = reset_sleep_time(api)
return tweets