def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
python类OAuthHandler()的实例源码
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
def __init__(self):
'''
Class constructor or initialization method.
'''
consumer_key = '18QHFMz0zvycM2KLrTMfrafI1'
consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF'
access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq'
access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM'
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.api = tweepy.API(self.auth)
except:
print("Error: Authentication Failed")
TwitterClient.py 文件源码
项目:fashion-revolution
作者: womenhackfornonprofits
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _get_twitter_api(self):
"""
Since we are only reading public information from Twitter, we don't need
access token/secret values.
"""
with open('secrets.json') as secrets_file:
secrets = json.load(secrets_file)
consumer_key = secrets['consumer_key']
consumer_secret = secrets['consumer_secret']
access_token = secrets['access_token']
access_secret = secrets['access_secret']
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_secret)
return tweepy.API(self.auth)
def create_api_instance(tokens):
"""return authenticated tweepy api instance. In order to do that
you need to provide a dictionary which values are the four tokens
provided by apps.twitter when registering an app.
Parameters
----------
tokens : dict{'api_key': <api_key>, 'api_secret': <api_secret>,
'access': <access>, 'access_secret': <secret_access>}
"""
auth = tweepy.OAuthHandler(tokens['api_key'], tokens['api_secret'])
auth.set_access_token(tokens['access'], tokens['access_secret'])
if 'proxy' in tokens:
api = tweepy.API(auth, proxy=tokens['proxy'])
else:
api = tweepy.API(auth)
return api
def tweetit(msg):
import tweepy
access_token = ""
access_token_secret = ""
consumer_key = ""
consumer_secret = ""
if(msg != ""):
print("Message Tweet")
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
tweet = msg
print(tweet)
if(debug == False):
api.update_status(tweet)
if(debug == True):
print("Post not uploaded due to debug mode being on")
if(msg == ""):
print("Error no message")
def __init__(self,
corpus,
friends=[],
commentary="None",
black_list=[],
local="world",
hashtag_search=None):
self.black_list = black_list
self.local = local
self.friends = friends
self.corpus = corpus
auth = tweepy.OAuthHandler(ConsumerKey, ConsumerSecret)
auth.set_access_token(AccessToken, AccessTokenSecret)
self.api = tweepy.API(auth)
entry = [("Date", [get_date()]),
("Followers", [len(self.api.followers_ids())]),
("Following", [len(self.api.friends_ids())]),
("Commentary", [commentary])]
self.df = pd.DataFrame.from_items(entry)
self.log()
if hashtag_search is None:
self.hashtag_search = self.get_trends(self.local)
else:
self.hashtag_search = hashtag_search + self.get_trends(self.local)
def tweet_status(status, image_path=None):
try:
auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET)
auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
if image_path:
new_tweet = api.update_with_media(image_path, status=status)
else:
new_tweet = api.update_status(status)
except tweepy.TweepError as ex:
raise MeteoSangueException(ex)
try:
mention = '{0} {1}'.format(
' '.join([ass['twitter_id'] for ass in settings.BLOOD_ASSOCIATIONS if 'twitter_id' in ass]),
'Nuovo bollettino meteo ?',
)
api.update_status(mention, in_reply_to_status_id=new_tweet.id)
except tweepy.TweepError as ex:
pass #Mention is allowed to fail silently
def setUp(self):
self.data = {
"DC_PEC": "",
"DC_CODE": '''
import tweepy
access_token = "1092294848-aHN7DcRP9B4VMTQIhwqOYiB14YkW92fFO8k8EPy"
access_token_secret = "X4dHmhPfaksHcQ7SCbmZa2oYBBVSD2g8uIHXsp5CTaksx"
consumer_key = "nZ6EA0FxZ293SxGNg8g8aP0HM"
consumer_secret = "fJGEodwe3KiKUnsYJC3VRndj7jevVvXbK2D5EiJ2nehafRgA6i"
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
''',
"DC_SOLUTION": '''
import tweepy
access_token = "1092294848-aHN7DcRP9B4VMTQIhwqOYiB14YkW92fFO8k8EPy"
access_token_secret = "X4dHmhPfaksHcQ7SCbmZa2oYBBVSD2g8uIHXsp5CTaksx"
consumer_key = "nZ6EA0FxZ293SxGNg8g8aP0HM"
consumer_secret = "fJGEodwe3KiKUnsYJC3VRndj7jevVvXbK2D5EiJ2nehafRgA6i"
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
'''
}
def test_Pass(self):
self.data["DC_SCT"] = '''
# Test: import tweepy
import_msg = "Did you correctly import the required package?"
test_import("tweepy", same_as = True, not_imported_msg = import_msg, incorrect_as_msg = import_msg)
# Test: Predefined code
predef_msg = "You don't have to change any of the predefined code."
test_object("access_token", undefined_msg = predef_msg, incorrect_msg = predef_msg)
test_object("access_token_secret", undefined_msg = predef_msg, incorrect_msg = predef_msg)
test_object("consumer_key", undefined_msg = predef_msg, incorrect_msg = predef_msg)
test_object("consumer_secret", undefined_msg = predef_msg, incorrect_msg = predef_msg)
# Test: call to tweepy.OAuthHandler() and 'auth' variable
test_object("auth", do_eval = False)
test_function("tweepy.OAuthHandler")
# Test: call to auth.set_access_token()
test_function("auth.set_access_token")
success_msg("Awesome!")
'''
sct_payload = helper.run(self.data)
self.assertTrue(sct_payload['correct'])
def main(args):
if args.debug:
logger.setLevel(logging.DEBUG)
auth = tweepy.OAuthHandler(args.consumer_key, args.consumer_secret)
auth.set_access_token(args.access_token, args.access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
screen_name = api.me().screen_name
if args.classifier == 'mock':
classifier = classifiers.MockClassifier()
elif args.classifier == 'local':
classifier = classifiers.URLClassifier(classifiers.ImageClassifier(args.dataset_path, INPUT_SHAPE))
elif args.classifier == 'remote':
classifier = classifiers.RemoteClassifier(args.remote_endpoint)
stream = tweepy.Stream(auth=auth, listener=ReplyToTweet(screen_name, classifier, api, args.silent))
logger.info('Listening as {}'.format(screen_name))
stream.userstream(track=[screen_name])
def __init__(self):
api_key = AppConfig.ENVIRONMENT_VARIABLE['twitter_api_key']
api_secret = AppConfig.ENVIRONMENT_VARIABLE['twitter_api_secret']
access_token = AppConfig.ENVIRONMENT_VARIABLE['twitter_access_token']
access_token_secret = AppConfig.ENVIRONMENT_VARIABLE['twitter_access_token_secret']
_check_environment_variable_set(api_key, raise_err = True)
_check_environment_variable_set(api_secret, raise_err = True)
_check_environment_variable_set(access_token, raise_err = True)
_check_environment_variable_set(access_token_secret, raise_err = True)
self.api_key = api_key
self.api_secret = api_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
self.auth_handler = tweepy.OAuthHandler(self.api_key, self.api_secret)
self.auth_handler.set_access_token(self.access_token, self.access_token_secret)
self.api = tweepy.API(self.auth_handler)
def get_twitter_auth():
"""Setup Twitter authentication.
Return: tweepy.OAuthHandler object
"""
try:
consumer_key = os.environ['TWITTER_CONSUMER_KEY']
consumer_secret = os.environ['TWITTER_CONSUMER_SECRET']
access_token = os.environ['TWITTER_ACCESS_TOKEN']
access_secret = os.environ['TWITTER_ACCESS_SECRET']
except KeyError:
print("TWITTER_* environment variables not set\n")
sys.stderr.write("TWITTER_* environment variables not set\n")
sys.exit(1)
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
return auth
def streaming(credentials, coins, queries, refresh, path, realtime=False, logTracker=True, logTweets=True, logSentiment=False, debug=True):
# User Error Checks
if len(coins) <= 0: print("Error: You must include at least one coin."); return
if len(coins) >= 10: print("Warning: Fewer than ten coins recommended.")
if len(queries) <= 0: print("Error: You must include at least one query."); return
if len(queries) >= 20: print("Warning: Fewer than twenty queries recommended.")
if refresh <= 0: print("Error: Refresh rate must be greater than 0"); return
auth = tweepy.OAuthHandler(credentials[0], credentials[1])
auth.set_access_token(credentials[2], credentials[3])
if logSentiment:
global SentimentIntensityAnalyzer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
while True:
# Start streaming -----------------------------
try:
print("Streaming Now...")
listener = CoinListener(auth, coins, queries, refresh, path, realtime, logTracker, logTweets, logSentiment, debug)
stream = tweepy.Stream(auth, listener)
stream.filter(track=queries)
except (Timeout, ConnectionError, ReadTimeoutError):
print("Reestablishing Connection...")
with open("%sError_Log.txt" % path, "a") as outfile:
outfile.write("%s Error: Connection Dropped\n" % time.strftime('%m/%d/%Y %H:%M'))
time.sleep((15*60)+1) # Wait at least 15 minutes before restarting listener
# ---------------------------------------------
def fetch_tweets_and_send_emails() -> None:
logging.info('-- starting fetching tweets and sending emails')
auth = tweepy.OAuthHandler(Settings.TWITTER_CONSUMER_KEY, Settings.TWITTER_CONSUMER_SECRET)
auth.set_access_token(Settings.TWITTER_ACCESS_TOKEN, Settings.TWITTER_ACCESS_TOKEN_SECRET)
twitter_api = tweepy.API(auth)
with smtp_server() as server:
for list_slug in LIST_SLUGS:
last_tweet_id = load_last_tweet_id(list_slug)
if last_tweet_id is None:
logging.warning('no last tweet id was found for list %s' % list_slug)
tweets = get_tweets(twitter_api, list_slug=list_slug, last_tweet_id=last_tweet_id)
for tweet in reversed(tweets):
kwargs = parse_tweet(tweet)
kwargs['msg_subject'] = '[{list_slug}] {subject}'.format(list_slug=list_slug,
subject=kwargs['msg_subject'])
send_email(server, **kwargs)
save_last_tweet_id(last_id=tweet.id_str, list_slug=list_slug)
def twittersignin():
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
redirect_url = auth.get_authorization_url()
session["request_token"] = auth.request_token
except tweepy.TweepError:
flash("Failed to get request token", "danger")
return redirect(url_for("bp.home"))
return redirect(redirect_url)