def main():
parser = argparse.ArgumentParser()
parser.add_argument('source_file', type = argparse.FileType('a'))
parser.add_argument('target_file', type = argparse.FileType('a'))
parser.add_argument('--languages', nargs = '+', default = ['ja'])
args = parser.parse_args()
while True:
try:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
reply_stream_listener = ReplyStreamListener(api, args.target_file, args.source_file)
reply_stream = tweepy.Stream(auth = api.auth, listener = reply_stream_listener)
reply_stream.sample(languages = args.languages)
except:
traceback.print_exc(limit = 10, file = sys.stderr, chain = False)
time.sleep(10)
continue
python类API的实例源码
def tweet_listener():
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
while True:
try:
stream = tweepy.Stream(auth=api.auth,
listener=StreamListener(api))
print("listener starting...")
stream.userstream()
except Exception as e:
print(e)
print(e.__doc__)
def authenticate(self):
''' Push the keys and tokens to OAuth to get the
access to the API
'''
# set OAuth
self.auth = tweepy.OAuthHandler(
self.appinfo['consumer'],
self.appinfo['consumer_secret'])
self.auth.set_access_token(
self.appinfo['token'],
self.appinfo['token_secret'])
# TODO : Encrypt the contents of appinfo.json
# API access
self.api = tweepy.API(
self.auth,
wait_on_rate_limit = self.wait_ratelimit
) # TODO : Bypass the rate limit of Twitter API
def upload_images(twitter_api, images, image_format='jpeg', quality=90):
'''Upload Image(s) to twitter using the given settings
Args:
twitter_api (:class:`tweepy.API`): the Twitter API instance to use
images (list of :class:`PIL.Image.Image`): images to upload
image_format (string): file format to upload as (e.g. 'jpeg', 'png')
quality (int): quality percentage used for (Jpeg) compression
Returns:
list of media_ids
'''
media_ids = []
for image in images:
img_io = BytesIO()
image.save(img_io, image_format, quality=quality)
filename = "temp." + image_format
img_io.seek(0)
upload_res = twitter_api.media_upload(filename, file=img_io)
media_ids.append(upload_res.media_id)
return media_ids
def post_tweet(twitter_api, tweet_text, reply_id=None, media_ids=None):
'''post a tweet to the timeline, trims tweet if appropriate
Args:
twitter_api (:class:`tweepy.API`): the Twitter API instance to use
tweet_text (string): the status text to tweet
reply_id (int): optional, nests the tweet as reply to that tweet (use id_str element from a tweet)
media_ids (list of media_ids): optional, media to attach to the tweet
Returns:
bool: True if posted successfully, False otherwise
'''
tweet_text = trim_tweet_text(tweet_text)
try:
twitter_api.update_status(tweet_text, reply_id, media_ids=media_ids)
return True
except tweepy.error.TweepError as e:
cozmo.logger.error("post_tweet Error: " + str(e))
return False
oauth_handler.py 文件源码
项目:twitter_LDA_topic_modeling
作者: kenneth-orton
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def get_access_creds():
'''
Twitter API authentication credentials are stored in a file as:
consumer_key \t consumer_secret \t access_token \t access_secret
'''
oauths = []
print('Building list of developer access credentials...')
credentials = pd.read_csv('twitter_dev_accounts', sep='\t', header=None, names=['consumer_key', 'consumer_secret', 'access_token', 'access_secret'])
for index, row in credentials.iterrows():
auth = tweepy.auth.OAuthHandler(str(row['consumer_key']), str(row['consumer_secret']))
auth.set_access_token(str(row['access_token']), str(row['access_secret']))
oauth_api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
if(verify_working_credentials(oauth_api)):
oauths.append(oauth_api)
return oauths
def run(self):
while screen_names.count() > users.count():
s_name = screen_names.find_one_and_update({"collected": False},
{"$set": {"collected": True}})
logging.info("User {} in thread {}.".format(s_name["_id"], self.getName()))
try:
users.insert_one(data_user(s_name["_id"], self.api))
screen_names.update_one({"_id": s_name["_id"]},
{"$set": {"completed": True}})
except TweepError as e:
logging.error("Ups, Arrived to API limit in thread {}. Exception: {}".format(self.getName(), e))
sleep(60 * 15)
except Exception as e:
logging.error("Could not store user {}, the Exceception was {}.".format(s_name["user"], e))
# Create a new process for each twitter authorization
def upload_images(twitter_api, images, image_format='jpeg', quality=90):
'''Upload Image(s) to twitter using the given settings
Args:
twitter_api (:class:`tweepy.API`): the Twitter API instance to use
images (list of :class:`PIL.Image.Image`): images to upload
image_format (string): file format to upload as (e.g. 'jpeg', 'png')
quality (int): quality percentage used for (Jpeg) compression
Returns:
list of media_ids
'''
media_ids = []
for image in images:
img_io = BytesIO()
image.save(img_io, image_format, quality=quality)
filename = "temp." + image_format
img_io.seek(0)
upload_res = twitter_api.media_upload(filename, file=img_io)
media_ids.append(upload_res.media_id)
return media_ids
def post_tweet(twitter_api, tweet_text, reply_id=None, media_ids=None):
'''post a tweet to the timeline, trims tweet if appropriate
Args:
twitter_api (:class:`tweepy.API`): the Twitter API instance to use
tweet_text (string): the status text to tweet
reply_id (int): optional, nests the tweet as reply to that tweet (use id_str element from a tweet)
media_ids (list of media_ids): optional, media to attach to the tweet
Returns:
bool: True if posted successfully, False otherwise
'''
tweet_text = trim_tweet_text(tweet_text)
try:
twitter_api.update_status(tweet_text, reply_id, media_ids=media_ids)
return True
except tweepy.error.TweepError as e:
cozmo.logger.error("post_tweet Error: " + str(e))
return False
def auth(config):
"""
Perform authentication with Twitter and return a client instance to communicate with Twitter
:param config: ResponseBot config
:type config: :class:`~responsebot.utils.config_utils.ResponseBotConfig`
:return: client instance to execute twitter action
:rtype: :class:`~responsebot.responsebot_client.ResponseBotClient`
:raises: :class:`~responsebot.common.exceptions.AuthenticationError`: If failed to authenticate
:raises: :class:`~responsebot.common.exceptions.APIQuotaError`: If API call rate reached limit
"""
auth = tweepy.OAuthHandler(config.get('consumer_key'), config.get('consumer_secret'))
auth.set_access_token(config.get('token_key'), config.get('token_secret'))
api = tweepy.API(auth)
try:
api.verify_credentials()
except RateLimitError as e:
raise APIQuotaError(e.args[0][0]['message'])
except TweepError as e:
raise AuthenticationError(e.args[0][0]['message'])
else:
logging.info('Successfully authenticated as %s' % api.me().screen_name)
return ResponseBotClient(config=config, client=api)
def __init__(self, twitter_username):
# TODO: Login to twitter for corpus generation using end user's credentials
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
# Connect to Twitter - raise TweepError if we brick out on this
try:
api = tweepy.API(auth)
except tweepy.TweepError:
# TODO: make sure this error bubbles up and gets handled gracefully
raise PermissionError("Twitter Auth failed")
usr = api.get_user(twitter_username)
self.username = twitter_username
self.image = usr.profile_image_url
# Exposes entire api - for debugging only
# self.api = usr
self.description = usr.description
self.screen_name = usr.screen_name
self.name = usr.name
def __init__(self, twitter_username):
# TODO: Login to twitter for corpus generation using end user's credentials
auth = tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
# Connect to Twitter - raise TweepError if we brick out on this
try:
api = tweepy.API(auth)
except tweepy.TweepError:
# TODO: make sure this error bubbles up and gets handled gracefully
raise PermissionError("Twitter Auth failed")
usr = api.get_user(twitter_username)
self.username = twitter_username
self.image = usr.profile_image_url
# Exposes entire api - for debugging only
# self.api = usr
self.description = usr.description
self.screen_name = usr.screen_name
self.name = usr.name
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")