def testPostUpdateLatLon(self):
'''Test the twitter.Api PostUpdate method, when used in conjunction with latitude and longitude'''
self._AddHandler('https://api.twitter.com/1.1/statuses/update.json',
curry(self._OpenTestData, 'update_latlong.json'))
#test another update with geo parameters, again test somewhat arbitrary
status = self._api.PostUpdate(u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8'), latitude=54.2, longitude=-2)
self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text)
self.assertEqual(u'Point',status.GetGeo()['type'])
self.assertEqual(26.2,status.GetGeo()['coordinates'][0])
self.assertEqual(127.5,status.GetGeo()['coordinates'][1])
python类Api()的实例源码
def testGetReplies(self):
'''Test the twitter.Api GetReplies method'''
self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json',
curry(self._OpenTestData, 'replies.json'))
statuses = self._api.GetReplies()
self.assertEqual(36657062, statuses[0].id)
def testGetFriends(self):
'''Test the twitter.Api GetFriends method'''
self._AddHandler('https://api.twitter.com/1.1/friends/list.json?cursor=123',
curry(self._OpenTestData, 'friends.json'))
users = self._api.GetFriends(cursor=123)
buzz = [u.status for u in users if u.screen_name == 'buzz']
self.assertEqual(89543882, buzz[0].id)
def testGetDirectMessages(self):
'''Test the twitter.Api GetDirectMessages method'''
self._AddHandler('https://api.twitter.com/1.1/direct_messages.json',
curry(self._OpenTestData, 'direct_messages.json'))
statuses = self._api.GetDirectMessages()
self.assertEqual(u'A légpárnás hajóm tele van angolnákkal.', statuses[0].text)
def testPostDirectMessage(self):
'''Test the twitter.Api PostDirectMessage method'''
self._AddHandler('https://api.twitter.com/1.1/direct_messages/new.json',
curry(self._OpenTestData, 'direct_messages-new.json'))
status = self._api.PostDirectMessage('test', u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8'))
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text)
def testDestroyDirectMessage(self):
'''Test the twitter.Api DestroyDirectMessage method'''
self._AddHandler('https://api.twitter.com/1.1/direct_messages/destroy.json',
curry(self._OpenTestData, 'direct_message-destroy.json'))
status = self._api.DestroyDirectMessage(3496342)
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(673483, status.sender_id)
def testCreateFriendship(self):
'''Test the twitter.Api CreateFriendship method'''
self._AddHandler('https://api.twitter.com/1.1/friendships/create.json',
curry(self._OpenTestData, 'friendship-create.json'))
user = self._api.CreateFriendship('dewitt')
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(673483, user.id)
def testDestroyFriendship(self):
'''Test the twitter.Api DestroyFriendship method'''
self._AddHandler('https://api.twitter.com/1.1/friendships/destroy.json',
curry(self._OpenTestData, 'friendship-destroy.json'))
user = self._api.DestroyFriendship('dewitt')
# This is rather arbitrary, but spot checking is better than nothing
self.assertEqual(673483, user.id)
def api_from_config(config):
api = twitter.Api(
consumer_key=config.get('twitter_api', 'consumer_key'),
consumer_secret=config.get('twitter_api', 'consumer_secret'),
access_token_key=config.get('twitter_api', 'access_token_key'),
access_token_secret=config.get('twitter_api', 'access_token_secret'),
tweet_mode='extended')
return api
def getAPI():
api = twitter.Api(
consumer_key = '',
consumer_secret = '',
access_token_key = '',
access_token_secret = ''
)
return api
def __init__(self, credentials):
err_msg = ''
exception = None
self._logger = logging.getLogger(__name__)
try:
self._twitter_api = twitter.Api(
consumer_key=credentials['consumer_key'],
consumer_secret=credentials['consumer_secret'],
access_token_key=credentials['access_token_key'],
access_token_secret=credentials['access_token_secret']
)
except twitter.TwitterError as exc:
err_msg = 'The following error: %s, occurred while connecting ' \
'to the twitter API.' % exc.message
exception = exc
except KeyError as exc:
err_msg = 'Malformed credentials dictionary: %s.' % exc.message
exception = exc
except Exception as exc:
err_msg = 'An unhandled error: %s, occurred while connecting ' \
'to the twitter API.' % exc
exception = exc
if err_msg:
logging.getLogger(__name__).log(logging.ERROR, err_msg)
raise exception
Fetch_Data_Media_Twitter.py 文件源码
项目:StockRecommendSystem
作者: doncat99
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def getTwitterApi(root_path):
global gloabl_api
if gloabl_api is None:
import twitter
config = configparser.ConfigParser()
config.read(root_path + "/" + "config.ini")
api_key = config.get('Twitter', 'KEY')
api_secret = config.get('Twitter', 'SECRET')
access_token_key = config.get('Twitter', 'TOKEN_KEY')
access_token_secret = config.get('Twitter', 'TOKEN_SECRET')
http = config.get('Proxy', 'HTTP')
https = config.get('Proxy', 'HTTPS')
proxies = {'http': http, 'https': https}
gloabl_api = twitter.Api(api_key, api_secret, access_token_key, access_token_secret, timeout = 15, proxies=proxies)
return gloabl_api
def __init__(self, storage, **kwargs):
super(TwitterTrainer, self).__init__(storage, **kwargs)
from twitter import Api as TwitterApi
# The word to be used as the first search term when searching for tweets
self.random_seed_word = kwargs.get('random_seed_word', 'random')
self.api = TwitterApi(
consumer_key=kwargs.get('twitter_consumer_key'),
consumer_secret=kwargs.get('twitter_consumer_secret'),
access_token_key=kwargs.get('twitter_access_token_key'),
access_token_secret=kwargs.get('twitter_access_token_secret')
)
def __init__(self, config):
auth_dict = config["auth"]
self.api = twitter.Api(
consumer_key=auth_dict["api_key"],
consumer_secret=auth_dict["api_secret"],
access_token_key=auth_dict["token"],
access_token_secret=auth_dict["token_secret"],
tweet_mode="extended",
)
self.screen_name = config["user"]["screen_name"]
def __init__(self):
self.client = twitter.Api(consumer_key=twitter_consumer_key,
consumer_secret=twitter_consumer_secret,
access_token_key=twitter_access_token,
access_token_secret=twitter_access_secret)
# setup
def get_api(consumer_key, consumer_secret, access_key, access_secret):
api = twitter.Api(consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token_key=access_key,
access_token_secret=access_secret,
sleep_on_rate_limit=True)
return api
Twitter Tweets - User ID.py 文件源码
项目:Web-Scraping-and-Python-Codes
作者: dushyantRathore
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_tweets(user_id):
api = twitter.Api(consumer_key=c_key, consumer_secret=c_secret, access_token_key=a_key,
access_token_secret=a_secret)
# print api.VerifyCredentials()
l = []
t = api.GetUserTimeline(user_id=user_id, count=25)
# print t
tweets = [i.AsDict() for i in t]
for i in tweets :
l.append(i['text'])
print l
def get_posts(screenname):
api = twitter.Api(consumer_key='ikJIFDBN0h1LZomIkCRKbQXUE', consumer_secret='NS411RIiYBO2Gzj67pqcgFnD0AY3mPV5hWjkujCklZjeLurwDa', access_token_key='280279077-y9OmVTnveqglTJTy2ND2uPCy9wLWj9PnHDA0Kl0j', access_token_secret='HYW79wDKilDY44QX5mGupWx0001V06jz8mWCLt9aYihXe')
#TEMPORARILY SMALL FOR TESTING
posts = api.GetUserTimeline(screen_name=screenname, count=15, exclude_replies=True)
return posts
def tweet(self):
if can_tweet():
account = twitter.Api(
username=settings.TWITTER_USERNAME,
password=settings.TWITTER_PASSWORD,
)
account.PostUpdate(self.as_tweet())
else:
raise ImproperlyConfigured(
"Unable to send tweet due to either "
"missing python-twitter or required settings."
)
def __init__(self, **kwargs):
super(TwitterAdapter, self).__init__(**kwargs)
self.api = twitter.Api(
consumer_key=kwargs["twitter_consumer_key"],
consumer_secret=kwargs["twitter_consumer_secret"],
access_token_key=kwargs["twitter_access_token_key"],
access_token_secret=kwargs["twitter_access_token_secret"]
)