def tweets(self):
"""
Generator yielding the last tweets (200 is the API limit) since the
last tweet saved in Jarbas database. If no tweet is saved it yields the
last 200 tweets.
"""
kwargs = dict(
screen_name='RosieDaSerenata',
count=200, # this is the maximum suported by Twitter API
include_rts=False,
exclude_replies=True
)
latest_tweet = Tweet.objects.first()
if latest_tweet:
kwargs['since_id'] = latest_tweet.status
api = twitter.Api(*self.credentials, sleep_on_rate_limit=True)
yield from api.GetUserTimeline(**kwargs)
python类Api()的实例源码
happierfuntokenizing.py 文件源码
项目:user-factor-adaptation
作者: StonyBrookNLP
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def tokenize_random_tweet(self):
"""
If the twitter library is installed and a twitter connection
can be established, then tokenize a random tweet.
"""
try:
import twitter
except ImportError:
print "Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/"
from random import shuffle
api = twitter.Api()
tweets = api.GetPublicTimeline()
if tweets:
for tweet in tweets:
if tweet.user.lang == 'en':
return self.tokenize(tweet.text)
else:
raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")
def get_random_tweet(session):
"""
Grab token from session and get a tweet!
"""
token = session['user']['accessToken'].split(',')
key, secret = token[0], token[1]
api = twitter.Api(consumer_key=TWITTER_CONSUMER_KEY,
consumer_secret=TWITTER_CONSUMER_SECRET,
access_token_key=key, access_token_secret=secret)
timeline = api.GetUserTimeline(screen_name='realDonaldTrump',
include_rts=False,
trim_user=True,
exclude_replies=True,
count=100)
tweet = timeline[random.randint(0, len(timeline))]
return tweet.text
def post(message, url=None):
"""
?????????? ? Twitter
"""
config = SocialConfig.get_solo()
message = format_message(message, url=url)
if not message:
return
import twitter
twitter_api = twitter.Api(
config.twitter_client_id,
config.twitter_client_secret,
config.twitter_access_token,
config.twitter_access_token_secret,
)
twitter_api.PostUpdate(message)
def __init__(self):
## Load the API keys
try:
stream = open("app/models/api_keys.yml", 'r')
keys = yaml.load(stream)
except:
print('Failed to load Twitter API keys from file, falling back on environment variables')
keys = {
'consumer_key': os.environ['consumer_key'],
'consumer_secret': os.environ['consumer_secret'],
'access_token_key': os.environ['access_token_key'],
'access_token_secret': os.environ['access_token_secret'],
}
self.api = twitter.Api(consumer_key=keys['consumer_key'], consumer_secret=keys['consumer_secret'], access_token_key=keys['access_token_key'], access_token_secret=keys['access_token_secret'], sleep_on_rate_limit=False) # NOTE: setting sleep_on_rate_limit to True here means the application will sleep when we hit the API rate limit. It will sleep until we can safely make another API call. Making this False will make the API throw a hard error when the rate limit is hit.
## Request tweets for a given movie
def testGetUserTimeline(self):
'''Test the twitter.Api GetUserTimeline method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json?count=1&screen_name=kesuke',
curry(self._OpenTestData, 'user_timeline-kesuke.json'))
statuses = self._api.GetUserTimeline(screen_name='kesuke', count=1)
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(89512102, statuses[0].id)
self.assertEqual(718443, statuses[0].user.id)
#def testGetFriendsTimeline(self):
# '''Test the twitter.Api GetFriendsTimeline method'''
# self._AddHandler('https://api.twitter.com/1.1/statuses/friends_timeline/kesuke.json',
# curry(self._OpenTestData, 'friends_timeline-kesuke.json'))
# statuses = self._api.GetFriendsTimeline('kesuke')
# # This is rather arbitrary, but spot checking is better than nothing
# self.assertEqual(20, len(statuses))
# self.assertEqual(718443, statuses[0].user.id)
def get_tweets(request):
datas = []
p = ttp.Parser()
try:
api = twitter.Api(
consumer_key=settings.TWITTER_CONSUMER_KEY,
consumer_secret=settings.TWITTER_CONSUMER_SECRET,
access_token_key=settings.TWITTER_ACCESS_TOKEN,
access_token_secret=settings.TWITTER_ACCESS_TOKEN_SECRET
)
tweets = api.GetUserTimeline(screen_name='kodlaco')
for tweet in tweets:
datas.append({
#'text': p.parse(tweet.text).html,
'text': tweet.text,
'id_str': tweet.id_str
})
except:
datas = []
return HttpResponse(json.dumps(datas), content_type="application/json")
def handle(self, *args, **options):
api = twitter.Api(consumer_key=settings.TWITTER_CONSUMER_KEY,
consumer_secret=settings.TWITTER_CONSUMER_SECRET,
access_token_key=settings.TWITTER_ACCESS_TOKEN_KEY,
access_token_secret=settings.TWITTER_ACCESS_TOKEN_SECRET)
for currency_symbol in settings.SOCIAL_NETWORK_SENTIMENT_CONFIG['twitter']:
print(currency_symbol)
results = api.GetSearch("$" + currency_symbol, count=200)
for tweet in results:
if SocialNetworkMention.objects.filter(network_name='twitter', network_id=tweet.id).count() == 0:
snm = SocialNetworkMention.objects.create(
network_name='twitter',
network_id=tweet.id,
network_username=tweet.user.screen_name,
network_created_on=datetime.datetime.fromtimestamp(tweet.GetCreatedAtInSeconds()),
text=tweet.text,
symbol=currency_symbol,
)
snm.set_sentiment()
snm.save()
def tokenize_random_tweet(self):
"""
If the twitter library is installed and a twitter connection
can be established, then tokenize a random tweet.
"""
try:
import twitter
except ImportError:
print "Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/"
from random import shuffle
api = twitter.Api()
tweets = api.GetPublicTimeline()
if tweets:
for tweet in tweets:
if tweet.user.lang == 'en':
return self.tokenize(tweet.text)
else:
raise Exception(
"Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")
def handle(self, *args, **options):
api = twitter.Api(consumer_key=settings.TWITTER_CONSUMER_KEY,
consumer_secret=settings.TWITTER_CONSUMER_SECRET,
access_token_key=settings.TWITTER_ACCESS_TOKEN_KEY,
access_token_secret=settings.TWITTER_ACCESS_TOKEN_SECRET)
for currency_symbol in settings.SOCIAL_NETWORK_SENTIMENT_CONFIG['twitter']:
print(currency_symbol)
results = api.GetSearch("$" + currency_symbol, count=200)
for tweet in results:
if SocialNetworkMention.objects.filter(network_name='twitter', network_id=tweet.id).count() == 0:
snm = SocialNetworkMention.objects.create(
network_name='twitter',
network_id=tweet.id,
network_username=tweet.user.screen_name,
network_created_on=datetime.datetime.fromtimestamp(tweet.GetCreatedAtInSeconds()),
text=tweet.text,
symbol=currency_symbol,
)
snm.set_sentiment()
snm.save()
def tokenize_random_tweet(self):
"""
If the twitter library is installed and a twitter connection
can be established, then tokenize a random tweet.
"""
try:
import twitter
except ImportError:
print("Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/")
from random import shuffle
api = twitter.Api()
tweets = api.GetPublicTimeline()
if tweets:
for tweet in tweets:
if tweet.user.lang == 'en':
return self.tokenize(tweet.text)
else:
raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")
def respond(consumer_key, consumer_secret, token_key, token_secret):
if consumer_key is None:
print 'usage: python album_genre_bot.py <consumer_key> <consumer_secret> <token_key> <token_secret>'
return
api = twitter.Api(consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token_key=token_key,
access_token_secret=token_secret)
original_timestamp = get_time_stamp()
new_timestamp = original_timestamp
for m in api.GetMentions():
dt = parse_twitter_date(m.created_at)
new_timestamp = max(dt, new_timestamp)
if dt > original_timestamp and m.text.startswith(twitterId):
genre = predict(clean_text(m.text).lower())
try:
status = api.PostUpdate(
in_reply_to_status_id=m.id,
status='@{} {}'.format(m.user.screen_name, genre))
print m.text,
print ' ==> ' + genre
except Exception as e:
print e
save_time_stamp(new_timestamp)
def post_to_twitter(tw_post):
tw_ck, tw_cs, tw_ak, tw_as = lamvery.secret.get('tw_keys').split(',')
tw_api = twitter.Api(consumer_key=tw_ck,
consumer_secret=tw_cs,
access_token_key=tw_ak,
access_token_secret=tw_as)
try:
tw_api.PostUpdate(tw_post)
except:
pass
return True
def __init__(self, consumer_key, consumer_secret, access_token,
access_token_secret, user):
self.client = twitter.Api(
consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token_key=access_token,
access_token_secret=access_token_secret)
if TwitterFeed.algorithm_choice != AlgorithmChoice.DUMMY:
self.client.VerifyCredentials()
self.user = user
def connect():
api = twitter.Api(consumer_key=settings.SOCIAL_AUTH_TWITTER_KEY, \
consumer_secret=settings.SOCIAL_AUTH_TWITTER_SECRET, \
access_token_key=settings.TWITTER_ACCESS_TOKEN_KEY, \
access_token_secret=settings.TWITTER_ACCESS_TOKEN_SECRET)
return api
def init_twitter():
# load twitter API tokens
with open(creds) as data_file:
data = json.load(data_file)
api = twitter.Api(consumer_key=data["consumer_key"],
consumer_secret=data["consumer_secret"],
access_token_key=data["access_token_key"],
access_token_secret=data["access_token_secret"])
return api
def __init__(self, path):
self.__twitter_keys = misc.GetKeys(path)
self.__api = twitter.Api(consumer_key=str(self.__twitter_keys['consumer_key']),
consumer_secret=str(self.__twitter_keys['consumer_secret']),
access_token_key=str(self.__twitter_keys['access_token']),
access_token_secret=str(self.__twitter_keys['access_secret']))
def connect(self):
self.api = twitter.Api(
consumer_key=settings.get_secret('TWITTER_CONSUMER_KEY'),
consumer_secret=settings.get_secret('TWITTER_CONSUMER_SECRET'),
access_token_key=settings.get_secret('TWITTER_ACCESS_TOKEN_KEY'),
access_token_secret=settings.get_secret(
'TWITTER_ACCESS_TOKEN_SECRET'),
)
def __init__(self, tokens):
"""
:param tokens: Dictionary of all tokens
[consumer_key, consumer_secret, access_token_key,
access_token_secret]
required to initialize the Twitter Api
"""
self.api = twitter.Api(
consumer_key=tokens['consumer_key'],
consumer_secret=tokens['consumer_secret'],
access_token_key=tokens['access_token_key'],
access_token_secret=tokens['access_token_secret']
)
tweets.py 文件源码
项目:Effective-Amazon-Machine-Learning
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self, raw_query):
self.twitter_api = twitter.Api(consumer_key='fd5TwZfTmZR4Jaaaaaaaaa',
consumer_secret='8aFf6eGf6kKEfrIQjsiQbYyveawaaaaaaaaa',
access_token_key='14698049-rtGaaaaaaaaa',
access_token_secret='Z2qkurxKaaaaaaaaa')
self.raw_query = raw_query
B05028_09_python.py 文件源码
项目:Effective-Amazon-Machine-Learning
作者: PacktPublishing
项目源码
文件源码
阅读 14
收藏 0
点赞 0
评论 0
def __init__(self, raw_query):
self.twitter_api = twitter.Api(consumer_key='your own key',
consumer_secret= 'your own key',
access_token_key='your own key',
access_token_secret='your own key')
self.raw_query = raw_query
# capture the tweets:
# see http://python-twitter.readthedocs.io/en/latest/twitter.html
def __init__(self, stream=True):
self.auth = helpers.twitter_auth()
self.api = twitter.Api(*self.auth)
self.stream = stream
def _get_twitter_client(self, name):
"""Get a twitter client."""
if name not in self._cached_twitter:
conf = self.config['twitter'][name]
self._cached_twitter[name] = twitter.Api(
consumer_key=conf['consumer_key'],
consumer_secret=conf['consumer_secret'],
access_token_key=conf['access_token_key'],
access_token_secret=conf['access_token_secret']
)
return self._cached_twitter[name]
publish_playlist_twitter.py 文件源码
项目:homemadescripts
作者: helioloureiro
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def TwitterPost(video_info):
# App python-tweeter
# https://dev.twitter.com/apps/815176
tw = twitter.Api(
consumer_key = cons_key,
consumer_secret = cons_sec,
access_token_key = acc_key,
access_token_secret = acc_sec
)
title = video_info.keys()[0]
print "Title: %s" % title
image = video_info[title]['thumb']
print "Image: %s" % image
url = video_info[title]['url']
print "Video: %s" % url
fd, file_image = tempfile.mkstemp()
GetImage(image, fd)
msg = "Watch again: %s\n%s #pyconse" % (title, url)
print "Posting: %s" % msg
print "Image: %s" % file_image
try:
tw.PostMedia(status = msg,media = file_image)
os.unlink(file_image)
except twitter.TwitterError as err:
# twitter.TwitterError: [{u'message': u'Status is a duplicate.', u'code': 187}]
print "ERROR: %s" % err.args[0][0]['message']
print "Post on Twitter failed. Trying again..."
if os.path.exists(file_image):
os.unlink(file_image)
video = GetVideo()
TwitterPost(video)
return
try:
msg = "Don't forget to send you talk proposal for #pyconse (September 6th in Stockholm). key=%d " % random.randint(1000,5000) + \
"https://goo.gl/RjHpoS"
print "Posting: %s" % msg
tw.PostUpdate(msg)
except twitter.TwitterError as err:
error = err.args[0][0]
print "ERROR: %s" % error['message']
def analyze(username):
twitter_consumer_key = 'NXXzhmDN1kcu3N7YPxf8y7qvp'
twitter_consumer_secret = 'vViSlyPlHkueZCrEPTnQJ5PP8ET8iVHNmFh6G4RRSkcpDiCFPT'
twitter_access_token = '3260949332-GsQhU60z94XZA012BbcgsJmT9EOFZEfbVW6zboW'
twitter_access_secret = 'NjxFN3dpdvmE4iZ0ci2wV5wfdeEqnmmMJssczSJv61fpf'
twitter_api = twitter.Api(consumer_key=twitter_consumer_key,
consumer_secret=twitter_consumer_secret,
access_token_key=twitter_access_token,
access_token_secret=twitter_access_secret)
statuses = twitter_api.GetUserTimeline(screen_name=handle, count=200, include_rts=False)
text = "" # Store the retrieved info
for status in statuses:
if (status.lang =='en'): # English tweets
text += status.text.encode('utf-8')
# The IBM Bluemix credentials for Personality Insights!
pi_username = '91afa133-d7a9-469f-95e9-a38cb2c500f4'
pi_password = 'EUiG6T4wGG7j'
personality_insights = PersonalityInsights(username=pi_username, password=pi_password)
result = personality_insights.profile(text)
return result
def FetchTwitter(user, output):
assert user
statuses = twitter.Api().GetUserTimeline(id=user, count=1)
s = statuses[0]
xhtml = TEMPLATE % (s.user.screen_name, s.text, s.user.screen_name, s.id, s.relative_created_at)
if output:
Save(xhtml, output)
else:
print xhtml
def setUp(self):
self._urllib = MockUrllib()
api = twitter.Api(consumer_key='CONSUMER_KEY',
consumer_secret='CONSUMER_SECRET',
access_token_key='OAUTH_TOKEN',
access_token_secret='OAUTH_SECRET',
cache=None)
api.SetUrllib(self._urllib)
self._api = api
def testGetStatus(self):
'''Test the twitter.Api GetStatus method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/show.json?include_my_retweet=1&id=89512102',
curry(self._OpenTestData, 'show-89512102.json'))
status = self._api.GetStatus(89512102)
self.assertEqual(89512102, status.id)
self.assertEqual(718443, status.user.id)
def testPostUpdate(self):
'''Test the twitter.Api PostUpdate method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/update.json',
curry(self._OpenTestData, 'update.json'))
status = self._api.PostUpdate(u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8'))
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text)
def testPostRetweet(self):
'''Test the twitter.Api PostRetweet method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/retweet/89512102.json',
curry(self._OpenTestData, 'retweet.json'))
status = self._api.PostRetweet(89512102)
self.assertEqual(89512102, status.id)