def testGettersAndSetters(self):
'''Test all of the twitter.User getters and setters'''
user = twitter.User()
user.SetId(673483)
self.assertEqual(673483, user.GetId())
user.SetName('DeWitt')
self.assertEqual('DeWitt', user.GetName())
user.SetScreenName('dewitt')
self.assertEqual('dewitt', user.GetScreenName())
user.SetDescription('Indeterminate things')
self.assertEqual('Indeterminate things', user.GetDescription())
user.SetLocation('San Francisco, CA')
self.assertEqual('San Francisco, CA', user.GetLocation())
user.SetProfileImageUrl('https://twitter.com/system/user/profile_im'
'age/673483/normal/me.jpg')
self.assertEqual('https://twitter.com/system/user/profile_image/673'
'483/normal/me.jpg', user.GetProfileImageUrl())
user.SetStatus(self._GetSampleStatus())
self.assertEqual(4212713, user.GetStatus().id)
python类com()的实例源码
def testProperties(self):
'''Test all of the twitter.User properties'''
user = twitter.User()
user.id = 673483
self.assertEqual(673483, user.id)
user.name = 'DeWitt'
self.assertEqual('DeWitt', user.name)
user.screen_name = 'dewitt'
self.assertEqual('dewitt', user.screen_name)
user.description = 'Indeterminate things'
self.assertEqual('Indeterminate things', user.description)
user.location = 'San Francisco, CA'
self.assertEqual('San Francisco, CA', user.location)
user.profile_image_url = 'https://twitter.com/system/user/profile_i' \
'mage/673483/normal/me.jpg'
self.assertEqual('https://twitter.com/system/user/profile_image/6734'
'83/normal/me.jpg', user.profile_image_url)
self.status = self._GetSampleStatus()
self.assertEqual(4212713, self.status.id)
def testGetUserTimeline(self):
'''Test the twitter.Api GetUserTimeline method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json?count=1&screen_name=kesuke',
curry(self._OpenTestData, 'user_timeline-kesuke.json'))
statuses = self._api.GetUserTimeline(screen_name='kesuke', count=1)
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(89512102, statuses[0].id)
self.assertEqual(718443, statuses[0].user.id)
#def testGetFriendsTimeline(self):
# '''Test the twitter.Api GetFriendsTimeline method'''
# self._AddHandler('https://api.twitter.com/1.1/statuses/friends_timeline/kesuke.json',
# curry(self._OpenTestData, 'friends_timeline-kesuke.json'))
# statuses = self._api.GetFriendsTimeline('kesuke')
# # This is rather arbitrary, but spot checking is better than nothing
# self.assertEqual(20, len(statuses))
# self.assertEqual(718443, statuses[0].user.id)
def Query():
QUERY = 'big'
# The file to write output as newline-delimited JSON documents
OUT_FILE = QUERY + ".json"
# Authenticate to Twitter with OAuth
auth = twitter.oauth.OAuth(OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
CONSUMER_KEY, CONSUMER_SECRET)
# Create a connection to the Streaming API
twitter_stream = twitter.TwitterStream(auth=auth)
print 'Filtering the public timeline for "{0}"'.format(QUERY)
# See https://dev.twitter.com/docs/streaming-apis on keyword parameters
stream = twitter_stream.statuses.filter(track=QUERY)
# Write one tweet per line as a JSON document.
with io.open(OUT_FILE, 'w', encoding='utf-8', buffering=1) as f:
for tweet in stream:
f.write(unicode(u'{0}\n'.format(json.dumps(tweet, ensure_ascii=False))))
print tweet['text']
def _GetSampleUser(self):
return twitter.User(id=718443,
name='Kesuke Miyagi',
screen_name='kesuke',
description=u'Canvas. JC Penny. Three ninety-eight.',
location='Okinawa, Japan',
url='https://twitter.com/kesuke',
profile_image_url='https://twitter.com/system/user/pro'
'file_image/718443/normal/kesuke.pn'
'g')
def _GetSampleUser(self):
return twitter.User(id=673483,
name='DeWitt',
screen_name='dewitt',
description=u'Indeterminate things',
location='San Francisco, CA',
url='http://unto.net/',
profile_image_url='https://twitter.com/system/user/prof'
'ile_image/673483/normal/me.jpg',
status=self._GetSampleStatus())
def testInit(self):
'''Test the twitter.User constructor'''
user = twitter.User(id=673483,
name='DeWitt',
screen_name='dewitt',
description=u'Indeterminate things',
url='https://twitter.com/dewitt',
profile_image_url='https://twitter.com/system/user/prof'
'ile_image/673483/normal/me.jpg',
status=self._GetSampleStatus())
def testEq(self):
'''Test the twitter.User __eq__ method'''
user = twitter.User()
user.id = 673483
user.name = 'DeWitt'
user.screen_name = 'dewitt'
user.description = 'Indeterminate things'
user.location = 'San Francisco, CA'
user.profile_image_url = 'https://twitter.com/system/user/profile_image/67' \
'3483/normal/me.jpg'
user.url = 'http://unto.net/'
user.status = self._GetSampleStatus()
self.assertEqual(user, self._GetSampleUser())
def testTwitterError(self):
'''Test that twitter responses containing an error message are wrapped.'''
self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json',
curry(self._OpenTestData, 'public_timeline_error.json'))
# Manually try/catch so we can check the exception's value
try:
statuses = self._api.GetUserTimeline()
except twitter.TwitterError, error:
# If the error message matches, the test passes
self.assertEqual('test error', error.message)
else:
self.fail('TwitterError expected')
def testGetStatus(self):
'''Test the twitter.Api GetStatus method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/show.json?include_my_retweet=1&id=89512102',
curry(self._OpenTestData, 'show-89512102.json'))
status = self._api.GetStatus(89512102)
self.assertEqual(89512102, status.id)
self.assertEqual(718443, status.user.id)
def testDestroyStatus(self):
'''Test the twitter.Api DestroyStatus method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/destroy/103208352.json',
curry(self._OpenTestData, 'status-destroy.json'))
status = self._api.DestroyStatus(103208352)
self.assertEqual(103208352, status.id)
def testPostRetweet(self):
'''Test the twitter.Api PostRetweet method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/retweet/89512102.json',
curry(self._OpenTestData, 'retweet.json'))
status = self._api.PostRetweet(89512102)
self.assertEqual(89512102, status.id)
def testPostUpdateLatLon(self):
'''Test the twitter.Api PostUpdate method, when used in conjunction with latitude and longitude'''
self._AddHandler('https://api.twitter.com/1.1/statuses/update.json',
curry(self._OpenTestData, 'update_latlong.json'))
#test another update with geo parameters, again test somewhat arbitrary
status = self._api.PostUpdate(u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8'), latitude=54.2, longitude=-2)
self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text)
self.assertEqual(u'Point',status.GetGeo()['type'])
self.assertEqual(26.2,status.GetGeo()['coordinates'][0])
self.assertEqual(127.5,status.GetGeo()['coordinates'][1])
def testGetReplies(self):
'''Test the twitter.Api GetReplies method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json',
curry(self._OpenTestData, 'replies.json'))
statuses = self._api.GetReplies()
self.assertEqual(36657062, statuses[0].id)
def testGetRetweetsOfMe(self):
'''Test the twitter.API GetRetweetsOfMe method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/retweets_of_me.json',
curry(self._OpenTestData, 'retweets_of_me.json'))
retweets = self._api.GetRetweetsOfMe()
self.assertEqual(253650670274637824, retweets[0].id)
def testGetFriends(self):
'''Test the twitter.Api GetFriends method'''
self._AddHandler('https://api.twitter.com/1.1/friends/list.json?cursor=123',
curry(self._OpenTestData, 'friends.json'))
users = self._api.GetFriends(cursor=123)
buzz = [u.status for u in users if u.screen_name == 'buzz']
self.assertEqual(89543882, buzz[0].id)
def testGetDirectMessages(self):
'''Test the twitter.Api GetDirectMessages method'''
self._AddHandler('https://api.twitter.com/1.1/direct_messages.json',
curry(self._OpenTestData, 'direct_messages.json'))
statuses = self._api.GetDirectMessages()
self.assertEqual(u'A légpárnás hajóm tele van angolnákkal.', statuses[0].text)
def testPostDirectMessage(self):
'''Test the twitter.Api PostDirectMessage method'''
self._AddHandler('https://api.twitter.com/1.1/direct_messages/new.json',
curry(self._OpenTestData, 'direct_messages-new.json'))
status = self._api.PostDirectMessage('test', u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8'))
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text)
def testDestroyDirectMessage(self):
'''Test the twitter.Api DestroyDirectMessage method'''
self._AddHandler('https://api.twitter.com/1.1/direct_messages/destroy.json',
curry(self._OpenTestData, 'direct_message-destroy.json'))
status = self._api.DestroyDirectMessage(3496342)
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(673483, status.sender_id)
def testCreateFriendship(self):
'''Test the twitter.Api CreateFriendship method'''
self._AddHandler('https://api.twitter.com/1.1/friendships/create.json',
curry(self._OpenTestData, 'friendship-create.json'))
user = self._api.CreateFriendship('dewitt')
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(673483, user.id)
def testDestroyFriendship(self):
'''Test the twitter.Api DestroyFriendship method'''
self._AddHandler('https://api.twitter.com/1.1/friendships/destroy.json',
curry(self._OpenTestData, 'friendship-destroy.json'))
user = self._api.DestroyFriendship('dewitt')
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(673483, user.id)
def convert_truncated(tweet):
"""Converts a tweet in extended compatibility mode to a fully extended tweet.
These come from the Streaming API, and python-twitter will only extract a legacy style tweet.
See https://dev.twitter.com/overview/api/upcoming-changes-to-tweets for details and https://github.com/twitterdev/tweet-updates/blob/master/samples/initial/compatibilityplus_extended_13997.json for an example.
This hasn't been tested extensively, so may break in some cases, but seems to work so far."""
raw_tweet = tweet._json
if raw_tweet.has_key('extended_tweet'):
for key, value in raw_tweet['extended_tweet'].items():
raw_tweet[key] = value
converted_tweet = Status.NewFromJsonDict(raw_tweet)
return converted_tweet
def long_desc(self):
return ("Interface to Twitter. The bot's account is at"
" https://twitter.com/relay_bot . Every tweet will be signed"
" with the nickname of the user who sent it. ")
def main():
with open('output.log', 'a') as f:
# api.GetStreamFilter will return a generator that yields one status
# message (i.e., Tweet) at a time as a JSON dictionary.
for line in api.GetStreamFilter(follow=AQOURS):
if line.has_key('delete') == True: # If deleting a tweet
bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>???????</b>', parse_mode="HTML")
bot.send_message(chat_id=cfg.CHAT_ID, text=u'?????? ID ??' + '<b>' + line['delete']['status']['id_str'] + '</b>', parse_mode="HTML")
if line.has_key('delete') == False: # If not deleting a tweet, an update can be confirmed
if line['user']['id_str'] in AQOURS:
print u'???? Aqours ??????'
f.write(json.dumps(line)) # log output
f.write('\n')
bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>????????</b><a href="https://twitter.com/{0}">{1}</a><b>??????</b>'.format(line['user']['screen_name'], line['user']['name']), parse_mode="HTML", disable_web_page_preview=True)
twitterUrl = 'https://twitter.com/{0}/status/{1}'.format(line['user']['screen_name'], line['id_str'])
button_list = [
[InlineKeyboardButton(u"???", url=twitterUrl)]
]
reply_markup = InlineKeyboardMarkup(button_list)
if line['is_quote_status'] == True:
print u'???? Aqours ????????'
if line['quoted_status']['truncated'] == False: # ??????????
print u'????????'
r = bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>?</b><a href="https://twitter.com/{0}">{1}</a><b>???????????</b>\n{2}'.format(line['quoted_status']['user']['screen_name'], line['quoted_status']['user']['name'], line['quoted_status']['text']), parse_mode="HTML", disable_web_page_preview=True)
getMediaForQuotedTweet(line, r)
bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>??????</b>\n' + line['text'], parse_mode="HTML", disable_web_page_preview=True, reply_markup=reply_markup)
if line['quoted_status']['truncated'] == True: # ??????????
print u'????????'
r = bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>?</b><a href="https://twitter.com/{0}">{1}</a><b>???????????</b>\n{2}'.format(line['quoted_status']['user']['screen_name'], line['quoted_status']['user']['name'], line['quoted_status']['extended_tweet']['full_text']), parse_mode="HTML", disable_web_page_preview=True)
getMediaForQuotedLongTweet(line, r)
bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>??????</b>\n' + line['text'], parse_mode="HTML", disable_web_page_preview=True, reply_markup=reply_markup)
if line.has_key('retweeted_status') == True and line['is_quote_status'] == False:
print u'???? Aqours ???????'
if line['retweeted_status']['truncated'] == False: # ???????????
r = bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>?</b><a href="https://twitter.com/{0}">{1}</a><b>????????</b>\n{2}'.format(line['retweeted_status']['user']['screen_name'], line['retweeted_status']['user']['name'], line['retweeted_status']['text']), parse_mode="HTML", disable_web_page_preview=True, reply_markup=reply_markup)
getMediaForRetweet(line, r)
if line['retweeted_status']['truncated'] == True: # ??????????????????
r = bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>?</b><a href="https://twitter.com/{0}">{1}</a><b>????????</b>\n{2}'.format(line['retweeted_status']['user']['screen_name'], line['retweeted_status']['user']['name'], line['retweeted_status']['extended_tweet']['full_text']), parse_mode="HTML", disable_web_page_preview=True, reply_markup=reply_markup)
getMediaForRetweetLongTweet(line, r)
if line['in_reply_to_status_id'] != None:
print u'???????'
r = bot.send_message(chat_id=cfg.CHAT_ID, text=u'<a href="https://twitter.com/{0}">{1}</a><b>???</b><a href="https://twitter.com/{2}">{3}</a>'.format(line['user']['screen_name'], line['user']['name'], api.GetUser(user_id=line['in_reply_to_user_id_str']).screen_name, api.GetUser(user_id=line['in_reply_to_user_id_str']).name), parse_mode="HTML", disable_web_page_preview=True)
bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>???</b>\n{0}\n<b>??????</b>\n{1}'.format(line['text'], api.GetStatus(line['in_reply_to_status_id_str']).text), parse_mode="HTML", disable_web_page_preview=True, reply_markup=reply_markup)
getMediaForRepling(line, r)
if line['truncated'] == True:
print u'???????'
r = bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>???????</b>\n' + line['extended_tweet']['full_text'], parse_mode="HTML", disable_web_page_preview=True, reply_markup=reply_markup)
getMediaForLongTweet(line, r)
elif line['in_reply_to_status_id'] == None and line.has_key('retweeted_status') == False and line['is_quote_status'] == False and line['truncated'] == False: # ??????????????????????????
print u'????????'
r = bot.send_message(chat_id=cfg.CHAT_ID, text=u'<b>???????</b>\n' + line['text'], parse_mode="HTML", disable_web_page_preview=True, reply_markup=reply_markup)
getMediaForPost(line, r)
def release_tweet(tweet, api):
"""Formats and publishes a Tweet to the account"""
if tweet.truncated:
tweet = convert_truncated(tweet)
tweet_html = render_tweet_html(tweet)
image = html_to_png(tweet_html)
status = get_status_message()
media = []
# Max 4 photos, or 1 video or 1 GIF
for media_item in tweet.media or []:
extra_media_url = 'https://twitter.com/%s/status/%d' % (tweet.user.screen_name, tweet.id)
if media_item.type == 'video':
if status != '':
status += '\n'
status += '[Video: %s]' % extra_media_url
elif media_item.type == 'animated_gif':
if status != '':
status += '\n'
status += '[GIF: %s]' % extra_media_url
elif media_item.type == 'photo':
if len(media) < 3:
media.append(media_item.media_url_https)
# Use large photo size if available
if media_item.sizes.has_key('large'):
media[-1] += ':large'
else:
if status != '':
status += '\n'
status += '[Photo: %s]' % extra_media_url
print(status)
print(media)
with NamedTemporaryFile(suffix='.png') as png_file:
image.save(png_file, format='PNG', dpi=(144,144))
media.insert(0, png_file)
api.PostUpdate(status=status, media=media)