def analyzetweets(self, access_token, access_token_secret, mytweets=False, q=None):
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
sentimentlist = []
subjectivitylist = []
number = NUMBER_OF_TWEETS
tweets = tweepy.Cursor(api.user_timeline).items() if mytweets else tweepy.Cursor(api.search, q=q).items(number)
for index, tweet in enumerate(tweets):
analysis = TextBlob(tweet.text).sentiment
sentimentlist.append(analysis.polarity)
subjectivitylist.append(analysis.subjectivity)
self.update_state(state="RUNNING", meta={"current": index + 1, "total": number})
sentimentavg = float(sum(sentimentlist) / max(len(sentimentlist), 1))
subjectivityavg = float(sum(subjectivitylist) / max(len(subjectivitylist), 1))
return {"current": number, "total": number, "subjectivityavg": subjectivityavg, "sentimentavg": sentimentavg}
python类API的实例源码
def __init__(self):
date_time_name = datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
logging.basicConfig(filename=date_time_name + '.log', level=logging.INFO)
path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
self.config = configparser.ConfigParser()
self.config.read(os.path.join(path, "configuration.txt"))
self.sleep_time = int(self.config.get("settings", "time_between_retweets"))
self.search_term = self.config.get("settings", "search_query")
self.tweet_language = self.config.get("settings", "tweet_language")
self.max_age_in_minutes = int(self.config.get("settings", "max_age_in_minutes"))
self.last_id_file = self.build_save_point()
self.savepoint = self.retrieve_save_point(self.last_id_file)
auth = tweepy.OAuthHandler(self.config.get("twitter", "consumer_key"), self.config.
get("twitter", "consumer_secret"))
auth.set_access_token(self.config.get("twitter", "access_token"), self.config.
get("twitter", "access_token_secret"))
self.api = tweepy.API(auth)
def init_app(self, app):
consumer_key = app.config.get('TWITTER_CONSUMER_KEY')
consumer_secret = app.config.get('TWITTER_CONSUMER_SECRET')
access_token = app.config.get('TWITTER_ACCESS_TOKEN')
access_token_secret = app.config.get('TWITTER_ACCESS_TOKEN_SECRET')
if not all([consumer_key, consumer_secret, access_token, access_token_secret]):
raise RuntimeError(
'At least one of the following configuration values was not '
'set: TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET, '
'TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET.'
)
auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
app.extensions['tweepy'] = api
def send_notification(message):
auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg")
auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
try:
api.auth.get_username()
except:
logger.error(u"check your twitter credits!")
return False
logger.info(u"sending notification to twitter: %s" % message)
if config.TWITTER_USE_DM:
status = api.send_direct_message(user=config.TWITTER_DM_USER, text=message)
else:
status = api.update_status(status=message)
if status:
logger.info(u"Notification to twitter successfully send: %s" % status.text)
return True
else:
logger.error(u"unable to send twitter notification: %s" % status)
return False
def tweet_listener():
consumer_key = os.getenv("consumer_key")
consumer_secret = os.getenv("consumer_secret")
access_token = os.getenv("access_token")
access_token_secret = os.getenv("access_token_secret")
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
while True:
try:
stream = tweepy.Stream(auth=api.auth,
listener=StreamListener(api))
print("listener starting...")
stream.userstream()
except Exception as e:
print(e)
print(e.__doc__)
def _initialize_api(self):
"""
Handles authentication with Twitter API using tweepy.
Requires a file at config/twitter_creds.json with the following attributes:
"access_token":
"access_secret":
"consumer_token":
"consumer_secret":
See Twitter OAUTH docs + Tweepy docs for more details http://docs.tweepy.org/en/v3.5.0/auth_tutorial.html
:return:
"""
with open(self.loc.format('../config/twitter_creds.json')) as fp:
config = json.load(fp)
auth = tweepy.OAuthHandler(config['consumer_token'], config['consumer_secret'])
auth.set_access_token(config['access_token'], config['access_secret'])
self.logger.info('Successfully Authenticated to Twitter API')
self.api = tweepy.API(auth)
def main():
"""Module"""
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
quijote = QuijoteTweet("quijote.txt", "kek.txt")
status_file = GettingId("last_status_id.txt")
while True:
id_status = status_file.get_id_from_file()
tweet = quijote.get_quijote_tweet()
api.update_status(tweet, in_reply_to_status_id=id_status)
list_statuses = api.user_timeline(api.me().id)
status_file.save_id_to_file(list_statuses[0].id)
sleep(900)
def gymkhana_main():
json_config = open('tokens.json', 'r')
tokens = json.load(json_config)
json_config.close()
consumer_key = tokens['consumer_key']
consumer_secret = tokens['consumer_secret']
access_token = tokens['access_token']
access_token_secret = tokens['access_token_secret']
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
listener = GymkhanaListener(api)
stream = tweepy.Stream(api.auth, listener)
filtro = ['@pytwe_bot', 'pytwe_bot', 'pytwe']
stream.filter(track=filtro)
def sendTweet(link, site, name):
global restock
# setup twitter
C_KEY = "C_KEY"
C_SECRET = "C_SECRET"
A_TOKEN = "A_TOKEN"
A_TOKEN_SECRET = "A_TOKEN_SECRET"
auth = tweepy.OAuthHandler(C_KEY, C_SECRET)
auth.set_access_token(A_TOKEN, A_TOKEN_SECRET)
api = tweepy.API(auth)
# send tweet
alert = "\U0001f6a8 "
sos = "\U0001f198 "
flag = "\U0001f6a9 "
tweet = alert+sos+flag+" IN STOCK "+flag+sos+alert
tweet += "\n"+name
tweet += "\nSite: "+site+"\n"
tweet += link+"\n"
tweet += strftime("%Y-%m-%d %H:%M:%S", gmtime())
print(tweet)
api.update_status(tweet.encode('utf-8'))
restock = 1
def sendTweet(link, site, name):
global restock
# setup twitter
C_KEY = "C_KEY"
C_SECRET = "C_SECRET"
A_TOKEN = "A_TOKEN"
A_TOKEN_SECRET = "A_TOKEN_SECRET"
auth = tweepy.OAuthHandler(C_KEY, C_SECRET)
auth.set_access_token(A_TOKEN, A_TOKEN_SECRET)
api = tweepy.API(auth)
# send tweet
alert = "\U0001f6a8 "
sos = "\U0001f198 "
flag = "\U0001f6a9 "
tweet = alert+sos+flag+" IN STOCK "+flag+sos+alert
tweet += "\n"+name
tweet += "\nSite: "+site+"\n"
tweet += link+"\n"
tweet += strftime("%Y-%m-%d %H:%M:%S", gmtime())
print(tweet)
api.update_status(tweet.encode('utf-8'))
restock = 1
def sendTweet(link, site, name, price):
global restock
# setup twitter
C_KEY = "KEYS"
C_SECRET = "KEYS"
A_TOKEN = "KEYS"
A_TOKEN_SECRET = "KEYS"
auth = tweepy.OAuthHandler(C_KEY, C_SECRET)
auth.set_access_token(A_TOKEN, A_TOKEN_SECRET)
api = tweepy.API(auth)
# send tweet
alert = "\U0001f6a8 "
sos = "\U0001f198 "
flag = "\U0001f6a9 "
tweet = alert+sos+flag+" IN STOCK "+flag+sos+alert
tweet += "\n"+name
tweet += "\n$"+price
tweet += "\nSite: "+site+"\n"
tweet += link+"\n"
tweet += strftime("%Y-%m-%d %H:%M:%S", gmtime())
print(tweet)
api.update_status(tweet.encode('utf-8'))
restock = 1
def __init__(self, CK=CK,CS=CS,AT=AT,AS=AS, byGetF=True):
self.SAMPLE_NUM = 50
# if it's 1.0, ex) follow=100 and follower=0
# if it's 0.5, ex) follow=100 and follower=100
# if it's 0.25 ex) follow=33 and follower=100
self.MIN_FRIENDS_RATIO = 0.35
# 1 month
self.MAX_DAY_SPAN = 7*4.
# if it's 1.0, all tweets have url or hashtag
self.MIN_HASHURL_RATIO = 0.55
auth = tweepy.OAuthHandler(CK, CS)
auth.set_access_token(AT, AS)
self.API = tweepy.API(auth, api_root='/1.1', wait_on_rate_limit=True)
self.ME = self._getMe()
if byGetF:
# self.friends is not used now
# self.friends = self.getFriendIds(self.ME)
self.friends = []
self.followers = self.getFollowerIds(self.ME)
def getTimeline(self, limit=50000, resultType="recent"):
try:
tweets = []
tweetsObj = tweepy.Cursor(self.API.home_timeline,
result_type=resultType,
exclude_replies = False).items(limit)
pBar = tqdm(tweetsObj, ascii=True, total=limit, desc="Getting Tweets!")
for cnt, tweet in enumerate(pBar):
pBar.update(1)
if not cnt < limit:
break
tweets.append(tweet)
except tweepy.error.TweepError as et:
print(et)
except Exception as e:
print(e)
return tweets
def getFriendIds(self, userId, limit=100000):
if self._byProtected(userId):
return []
friendIds = []
try:
friends = tweepy.Cursor(\
self.API.friends_ids,\
user_id = userId, \
cursor = -1\
).items()
for cnt, friend in enumerate(friends):
if not cnt < limit:
break
friendIds.append(friend)
return friendIds
except tweepy.error.TweepError as et:
print(et)
return []
def getTweets(self, userId, limit=50):
tweets = []
try:
tweetsObj = tweepy.Cursor( \
self.API.user_timeline, \
user_id=userId, \
exclude_replies = True \
).items(limit)
for cnt, tweet in enumerate(tweetsObj):
if not cnt < limit:
break
# print(tweet.text.replace("\n", ""))
tweets.append(tweet)
except tweepy.error.TweepError as et:
print(et)
return tweets
def favoriteTweet(self, tweetId=None, tweet=None):
if not tweetId is None and not tweet is None:
return False
if not tweet is None:
tId = self._getTweetId(tweet)
elif not tweetId is None:
tId = tweetId
else:
print("please input a tweet id")
try:
self.API.create_favorite(tId)
return 1, "Succeed in favoritting this tweet! %d"%tId
except tweepy.error.TweepError as tp:
#print(type(tp.reason))
if "429" in tp.reason:
return 429, "Favo restriction! %d"%tId
if "139" in tp.reason:
return 139, "You have already favorite it! %d"%tId
return -1, "Exception! %s"%str(tp.reason)
def is_target(screen_name, disable_targeting, model_file='cluster.pkl'):
"""
Returns a boolean for whether the user should be selected according
to label identity returned by a prediction from a pretrained
clustering algorithm.
"""
if disable_targeting:
return True
else:
auth = tweepy.OAuthHandler(credentials.consumer_key,
credentials.consumer_secret)
auth.set_access_token(credentials.access_token,
credentials.access_token_secret)
api = tweepy.API(auth, parser=tweepy.parsers.JSONParser())
user_array = numpy.array([api.get_user(screen_name=screen_name)])
model = joblib.load(model_file)
cluster_label = model.predict(user_array)
return cluster_label == 1
def log_tweep_error(logger, tweep_error):
"""Log a TweepError exception."""
if tweep_error.api_code:
if tweep_error.api_code == 32:
logger.error("invalid API authentication tokens")
elif tweep_error.api_code == 34:
logger.error("requested object (user, Tweet, etc) not found")
elif tweep_error.api_code == 64:
logger.error("your account is suspended and is not permitted")
elif tweep_error.api_code == 130:
logger.error("Twitter is currently in over capacity")
elif tweep_error.api_code == 131:
logger.error("internal Twitter error occurred")
elif tweep_error.api_code == 135:
logger.error("could not authenticate your API tokens")
elif tweep_error.api_code == 136:
logger.error("you have been blocked to perform this action")
elif tweep_error.api_code == 179:
logger.error("you are not authorized to see this Tweet")
else:
logger.error("error while using the REST API: %s", tweep_error)
else:
logger.error("error with Twitter: %s", tweep_error)
def twitter_url(match, bot=None):
# Find the tweet ID from the URL
tweet_id = match.group(1)
# Get the tweet using the tweepy API
api = get_api(bot)
if not api:
return
try:
tweet = api.get_status(tweet_id)
user = tweet.user
except tweepy.error.TweepError:
return
# Format the return the text of the tweet
text = " ".join(tweet.text.split())
if user.verified:
prefix = u"\u2713"
else:
prefix = ""
time = timesince.timesince(tweet.created_at, datetime.utcnow())
h = HTMLParser()
return u"{}@\x02{}\x02 ({}): {} ({} ago)".format(prefix, user.screen_name, user.name, h.unescape(text), time)
def twitter_poster(tcreds):
"""Pass me a dict with twitter creds.
Returns:
a function to call to post to the given twitter account and get dict with relevant info
"""
auth = tweepy.OAuthHandler(tcreds["consumer_key"], tcreds["consumer_secret"])
auth.set_access_token(tcreds["access_token"], tcreds["access_secret"])
twitter = tweepy.API(auth)
print("created credentials")
def post_tweet(text):
sobj = twitter.update_status(text)
print("posted tweet")
url = "https://twitter.com/" + sobj.user.screen_name + "/status/" + str(sobj.id)
return {"text": sobj.text, "id": sobj.id, "date": sobj.created_at.isoformat(), "account": sobj.user.screen_name, "url": url}
return post_tweet
def handle(self, *args, **options):
path = '{0}/users'.format(settings.PORTRAIT_FOLDER)
screen_name = options.get('screen_name', None)
if not screen_name:
print('no user')
return
api_keys = settings.TWITTER_USER_KEYS
auth = tweepy.OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret'])
auth.set_access_token(api_keys['access_token_key'], api_keys['access_token_secret'])
api = tweepy.API(auth)
user_response = api.get_user(screen_name)._json
print(user_response)
create_portrait_model(user_response, has_auth=False)
def test_sending_images(self):
# ensure there is an image as the mock object will not do anything
shutil.copy('./image.jpg', '/tmp/image.jpg')
client = boto3.client('s3')
client.download_file = MagicMock(return_value=None)
auth = tweepy.OAuthHandler('foo', 'bar')
api = tweepy.API(auth)
api.update_with_media = MagicMock(return_value=Status())
tweet_images = TweetS3Images(api, client)
tweet_images.send_image('test_bucket', 'image.jpg', cleanup=True)
client.download_file.assert_called_with('test_bucket', 'image.jpg', '/tmp/image.jpg')
api.update_with_media.assert_called_with(
filename='image.jpg',
status='New image image.jpg brought to you by lambda-tweet',
file=tweet_images.get_file())
self.assertFalse(os.path.exists('/tmp/image-test.jpg'), 'The image was not cleaned up correctly.')
def get_tweets(self, since_id):
"""Looks up metadata for all Trump tweets since the specified ID."""
tweets = []
# Include the first ID by passing along an earlier one.
since_id = str(int(since_id) - 1)
# Use tweet_mode=extended so we get the full text.
for status in Cursor(self.twitter_api.user_timeline,
user_id=TRUMP_USER_ID, since_id=since_id,
tweet_mode="extended").items():
# Use the raw JSON, just like the streaming API.
tweets.append(status._json)
self.logs.debug("Got tweets: %s" % tweets)
return tweets
def get_tweet_text(self, tweet):
"""Returns the full text of a tweet."""
# The format for getting at the full text is different depending on
# whether the tweet came through the REST API or the Streaming API:
# https://dev.twitter.com/overview/api/upcoming-changes-to-tweets
try:
if "extended_tweet" in tweet:
self.logs.debug("Decoding extended tweet from Streaming API.")
return tweet["extended_tweet"]["full_text"]
elif "full_text" in tweet:
self.logs.debug("Decoding extended tweet from REST API.")
return tweet["full_text"]
else:
self.logs.debug("Decoding short tweet.")
return tweet["text"]
except KeyError:
self.logs.error("Malformed tweet: %s" % tweet)
return None
def load_credentials(path=VAULT_PATH):
'''
Load credentials from vault.
'''
gist, api = None, None
with open(path, 'r') as vault_file:
try:
vault = json.loads(vault_file.read())
auth = tweepy.OAuthHandler(vault['twitter']['consumer-key'],
vault['twitter']['consumer-secret'])
auth.set_access_token(vault['twitter']['access-token'],
vault['twitter']['access-token-secret'])
api = tweepy.API(auth)
gist = vault['github']
except IOError:
print 'Unable to read vault-file: {0}.'.format(path)
except (KeyError, ValueError):
print 'Unable to parse the vault-file.'
return gist, api
def load_credentials(path=VAULT_PATH):
'''
Load credentials from vault.
'''
api = None
with open(path, 'r') as vault_file:
try:
vault = json.loads(vault_file.read())
auth = tweepy.OAuthHandler(vault['twitter']['consumer-key'],
vault['twitter']['consumer-secret'])
auth.set_access_token(vault['twitter']['access-token'],
vault['twitter']['access-token-secret'])
api = tweepy.API(auth)
except IOError:
print 'Unable to read vault-file: {0}.'.format(path)
except (KeyError, ValueError):
print 'Unable to parse the vault-file.'
return api
def load_credentials(path=VAULT_PATH):
'''
Load credentials from vault.
'''
gist, api = None, None
with open(path, 'r') as vault_file:
try:
vault = json.loads(vault_file.read())
auth = tweepy.OAuthHandler(vault['twitter']['consumer-key'],
vault['twitter']['consumer-secret'])
auth.set_access_token(vault['twitter']['access-token'],
vault['twitter']['access-token-secret'])
api = tweepy.API(auth)
gist = vault['github']
except IOError:
print 'Unable to read vault-file: {0}.'.format(path)
except (KeyError, ValueError):
print 'Unable to parse the vault-file.'
return gist, api
def authenticate():
#generate auth details and assign appropriately
_get_secrets()
consumer_key = secret_keys[0]
consumer_secret = secret_keys[1]
access_token = secret_keys[2]
access_token_secret = secret_keys[3]
#access our twitter app with the appropriate credentials
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
#create tweepy object
global api
api = tweepy.API(auth)
#a test function to see if all the authenticateion is working
def get_new_tweets(wfm, querytype, query, old_tweets):
# Authenticate with "app authentication" mode (high rate limit, read only)
consumer_key = os.environ['CJW_TWITTER_CONSUMER_KEY']
consumer_secret = os.environ['CJW_TWITTER_CONSUMER_SECRET']
auth = tweepy.AppAuthHandler(consumer_key, consumer_secret)
api = tweepy.API(auth)
# Only get last 100 tweets, because that is twitter API max for single call
if querytype == Twitter.QUERY_TYPE_USER:
tweetsgen = api.user_timeline(query, count=200)
else:
tweetsgen = api.search(q=query, count=100)
# Columns to retrieve and store from Twitter
# Also, we use this to figure ou the index the id field when merging old and new tweets
cols = ['id', 'created_at', 'text', 'in_reply_to_screen_name', 'in_reply_to_status_id', 'retweeted',
'retweet_count', 'favorited', 'favorite_count', 'source']
tweets = [[getattr(t, x) for x in cols] for t in tweetsgen]
table = pd.DataFrame(tweets, columns=cols)
return table
# Combine this set of tweets with previous set of tweets
def sendTweet(item,color,link, size):
# line 102
auth = tweepy.OAuthHandler(C_KEY, C_SECRET)
auth.set_access_token(A_TOKEN, A_TOKEN_SECRET)
api = tweepy.API(auth)
tweet = item+"\n"
tweet += color+'\n'
tweet += size.title()+'\n'
tweet += link+'\n'
tweet += "Restock!"+'\n'
tweet += str(datetime.utcnow().strftime('%H:%M:%S.%f')[:-3])
try:
api.update_status(tweet)
print(tweet)
except:
print("Error sending tweet!")