def load(self):
self.oauth = OAuth(
self.config.get(c.ACCESS_TOKEN_KEY),
self.config.get(c.ACCESS_TOKEN_SECRET_KEY),
self.config.get(c.CONSUMER_KEY_KEY),
self.config.get(c.CONSUMER_SECRET_KEY),
)
self.client = TwitterAPI(auth=self.oauth)
context = dict(plugin=self, bot=self.bot)
self.add_web_handlers(
(r'/twitter/oauth_token', OAuthTokenHandler, context),
(r'/twitter/authorize', OAuthAuthorizeHandler, context),
(r'/twitter/status', TwitterStatusHandler, context),
(r'/twitter/disconnect', TwitterDisconnectHandler, context),
)
self.monitored = set()
self.restore_monitored()
self.request_tokens = dict()
self.last_tweets = dict()
self.interactive_tweets = dict()
python类OAuth()的实例源码
def create_api():
"""create a twitter api"""
from twitter import OAuth, Twitter
from app import APP_INSTANCE as app
access_token = app.get_config('api.twitter.access_token')
access_secret = app.get_config('api.twitter.access_secret')
consumer_key = app.get_config('api.twitter.consumer_key')
consumer_secret = app.get_config('api.twitter.consumer_secret')
# temporary fix for certificate error
ssl._create_default_https_context = ssl._create_unverified_context
oauth = OAuth(access_token, access_secret, consumer_key, consumer_secret)
# Initiate the connection to Twitter API
return Twitter(auth=oauth)
def __init__(self, consumer_key, consumer_secret, user_credentials):
credentials = json.loads(user_credentials)
auth = OAuth(credentials['oauth_token'],
credentials['oauth_token_secret'],
consumer_key,
consumer_secret)
self.api = Twitter(auth=auth)
def connect(self):
self.__client = Twitter(
auth=OAuth(self.__token, self.__token_key, self.__con_secret, self.__con_secret_key))
# Send a start up tweet
def save_user(self, user, oauth_token, oauth_verifier):
oauth = OAuth(
oauth_token,
self.request_tokens[oauth_token],
self.config.get(c.CONSUMER_KEY_KEY),
self.config.get(c.CONSUMER_SECRET_KEY),
)
tweaked_twitter_client = TwitterAPI(
auth = oauth,
format = '',
api_version = None
)
try:
response = tweaked_twitter_client.oauth.access_token(
oauth_verifier = oauth_verifier
)
except Exception as e:
self.error(e)
pass
else:
params = dict(
(key, value) for key, value in (
component.split('=')
for component in response.split('&')
)
)
with self.transaction() as trans:
trans.execute(q.add_user, dict(
id = user.id,
token = params['oauth_token'],
secret = params['oauth_token_secret']
))
def get_user_api(self, oauth_user):
oauth = OAuth(
oauth_user.token,
oauth_user.secret,
self.config.get(c.CONSUMER_KEY_KEY),
self.config.get(c.CONSUMER_SECRET_KEY),
)
return TwitterAPI(auth = oauth)
def get_twitter_client():
return twitter.Twitter(
auth=twitter.OAuth(
os.environ.get('ACCESS_TOKEN'),
os.environ.get('ACCESS_TOKEN_SECRET'),
os.environ.get('CONSUMER_KEY'),
os.environ.get('CONSUMER_SECRET'),
)
)
def get_login_url(callback='oob', consumer_key=None, consumer_secret=None):
twitter = Twitter(
auth=OAuth('', '', consumer_key, consumer_secret),
format='', api_version=None)
resp = url_decode(twitter.oauth.request_token(oauth_callback=callback))
oauth_token = resp['oauth_token']
oauth_token_secret = resp['oauth_token_secret']
token = OAuthToken(token=oauth_token, token_secret=oauth_token_secret)
db.session.merge(token)
db.session.commit()
return (
"https://api.twitter.com/oauth/authenticate?oauth_token=%s"
% (oauth_token,))
def receive_verifier(oauth_token, oauth_verifier,
consumer_key=None, consumer_secret=None):
temp_token = OAuthToken.query.get(oauth_token)
if not temp_token:
raise Exception("OAuth token has expired")
twitter = Twitter(
auth=OAuth(temp_token.token, temp_token.token_secret,
consumer_key, consumer_secret),
format='', api_version=None)
resp = url_decode(
twitter.oauth.access_token(oauth_verifier=oauth_verifier))
db.session.delete(temp_token)
new_token = OAuthToken(token=resp['oauth_token'],
token_secret=resp['oauth_token_secret'])
new_token = db.session.merge(new_token)
new_twitter = Twitter(
auth=OAuth(new_token.token, new_token.token_secret,
consumer_key, consumer_secret))
remote_acct = new_twitter.account.verify_credentials()
acct = account_from_api_user_object(remote_acct)
acct = db.session.merge(acct)
new_token.account = acct
db.session.commit()
return new_token
def get_twitter_for_acc(account):
consumer_key = app.config['TWITTER_CONSUMER_KEY']
consumer_secret = app.config['TWITTER_CONSUMER_SECRET']
tokens = (OAuthToken.query.with_parent(account)
.order_by(db.desc(OAuthToken.created_at)).all())
for token in tokens:
t = Twitter(
auth=OAuth(token.token, token.token_secret,
consumer_key, consumer_secret))
try:
t.account.verify_credentials()
return t
except TwitterHTTPError as e:
if e.e.code == 401:
# token revoked
if sentry:
sentry.captureMessage(
'Twitter auth revoked', extra=locals())
db.session.delete(token)
db.session.commit()
else:
raise TemporaryError(e)
except URLError as e:
raise TemporaryError(e)
return None
def twitter_login(self, cons_key, cons_secret, access_token, \
access_token_secret):
"""Logs in to Twitter, using the provided access keys. You can get
these for your own Twitter account at apps.twitter.com
Arguments
cons_key - String of your Consumer Key (API Key)
cons_secret - String of your Consumer Secret (API Secret)
access_token - String of your Access Token
access_token_secret
- String of your Access Token Secret
"""
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'twitter_login', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._oauth = twitter.OAuth(access_token, access_token_secret, \
cons_key, cons_secret)
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
def _twitter_reconnect(self):
"""Logs in to Twitter, using the stored OAuth. This function is
intended for internal use, and should ONLY be called after
twitter_login has been called.
"""
# Report the reconnection attempt.
self._message(u'_twitter_reconnect', \
u"Attempting to reconnect to Twitter.")
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'_twitter_reconnect', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
# Report the reconnection success.
self._message(u'_twitter_reconnect', \
u"Successfully reconnected to Twitter!")
def twitter_login(self, cons_key, cons_secret, access_token, \
access_token_secret):
"""Logs in to Twitter, using the provided access keys. You can get
these for your own Twitter account at apps.twitter.com
Arguments
cons_key - String of your Consumer Key (API Key)
cons_secret - String of your Consumer Secret (API Secret)
access_token - String of your Access Token
access_token_secret
- String of your Access Token Secret
"""
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'twitter_login', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._oauth = twitter.OAuth(access_token, access_token_secret, \
cons_key, cons_secret)
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
def _twitter_reconnect(self):
"""Logs in to Twitter, using the stored OAuth. This function is
intended for internal use, and should ONLY be called after
twitter_login has been called.
"""
# Report the reconnection attempt.
self._message(u'_twitter_reconnect', \
u"Attempting to reconnect to Twitter.")
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'_twitter_reconnect', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
# Report the reconnection success.
self._message(u'_twitter_reconnect', \
u"Successfully reconnected to Twitter!")
def __init__(self):
ACCESS_TOKEN = self.access_token
ACCESS_SECRET = self.access_secret
CONSUMER_KEY = self.consumer_key
CONSUMER_SECRET = self.consumer_secret
print ACCESS_TOKEN, ACCESS_SECRET, CONSUMER_KEY, CONSUMER_SECRET
oauth = OAuth(ACCESS_TOKEN, ACCESS_SECRET, CONSUMER_KEY, CONSUMER_SECRET)
self.twitter = Twitter(auth=oauth)
def post_tweet(text, image_path=None,
user_token=None, user_secret=None,
consumer_key=None, consumer_secret=None):
"""
Post a tweet, optionally with an image attached.
"""
if len(text) > 140:
raise ValueError('tweet is too long')
auth = twitter.OAuth(
user_token or os.environ['TWITTER_USER_TOKEN'],
user_secret or os.environ['TWITTER_USER_SECRET'],
consumer_key or os.environ['TWITTER_CONSUMER_KEY'],
consumer_secret or os.environ['TWITTER_CONSUMER_SECRET'])
image_data, image_id = None, None
if image_path:
with open(image_path, 'rb') as image_file:
image_data = image_file.read()
t_up = twitter.Twitter(domain='upload.twitter.com', auth=auth)
image_id = t_up.media.upload(media=image_data)['media_id_string']
if not (text.strip() or image_id):
raise ValueError('no text or images to tweet')
t_api = twitter.Twitter(auth=auth)
params = {
'status': text,
'trim_user': True,
}
if image_id:
params.update({
'media[]': base64.b64encode(image_data),
'_base64': True,
})
return t_api.statuses.update_with_media(**params)
else:
return t_api.statuses.update(**params)
def __init__(self, bot):
super(TweetPlugin, self).__init__(bot)
self.command = "!tweet"
self.twitter = twitter.Twitter(
auth=twitter.OAuth(
config["PLUGINS"]["TWITTER_ATOKEN"],
config["PLUGINS"]["TWITTER_ASECRET"],
config["PLUGINS"]["TWITTER_CKEY"],
config["PLUGINS"]["TWITTER_CSECRET"]
)
)
def connect(self):
self.client = Twitter(
auth=OAuth(self.token, self.token_key, self.con_secret, self.con_secret_key))
#Set the appropriate settings for each alert
def __init__(self):
self.t = Twitter(auth=OAuth( \
config.TOKEN,
config.TOKEN_KEY,
config.CON_SECRET,
config.CON_SECRET_KEY)
)