def Cursor(self, *args, **kwargs):
return tweepy.Cursor(*args, **kwargs)
python类Cursor()的实例源码
def get_and_save_friends(user_id):
global n_calls
for friend in tweepy.Cursor(api.friends, user_id=user_id, count=200).items():
save_user(friend)
# Las claves de acceso a twitter están definidas como variables de entorno
def get_followers(self, account_name):
"""Return a list of all the followers of an account"""
followers = []
for page in tweepy.Cursor(self.api.followers_ids, screen_name=str(account_name)).pages():
followers.extend(page)
return followers
def get_tweets_from_timeline(self):
"""Return a list of all the tweets from the home timeline"""
tweets = []
for status in tweepy.Cursor(self.api.home_timeline).items(200):
tweets.append(status)
return tweets
def get_user_tweets(self):
"""Return a list of all tweets from the authenticathed API user."""
tweets = []
for status in tweepy.Cursor(self.api.user_timeline).items():
tweets.append(status)
return tweets
def get_mentions_from_timeline(self):
"""Return a list of all the tweets from the home timeline"""
tweets = []
for status in tweepy.Cursor(self.api.home_timeline, include_entities=True).items(200):
if 'user_mentions' in status.entities:
tweets.append(str(status.user.screen_name) + " " + str(status.created_at) + "\n" + status.text)
mentions = []
for items in tweets:
if 'pytwe_bot' in items:
mentions.append(items)
return mentions
def getFollowerIds(self, userId, limit=5000):
if self._byProtected(userId):
return []
followerIds = []
try:
followers = tweepy.Cursor(\
self.API.followers_ids,\
user_id = userId, \
cursor = -1).items()
for cnt, follower in enumerate(followers):
if not cnt < limit:
break
followerIds.append(follower)
except tweepy.error.TweepError as et:
print(et)
return []
return followerIds
def get_nphs_tweets(since=datetime.utcnow() - timedelta(hours=24)):
""" Get most recent tweets from the Twitter list of NPHS students """
statuses = []
# Find all tweets since the provided datetime
for status in Cursor(api.list_timeline, "1Defenestrator", "NPHS").items():
if status.created_at < since:
break
else:
statuses.append(status)
# statuses = api.list_timeline("1Defenestrator", "NPHS")
# Filter out retweets and return
return [s for s in statuses if not s.text.startswith("RT @")]
def get_friends(api, username, limit):
""" Download friends and process them """
for friend in tqdm(tweepy.Cursor(api.friends, screen_name=username).items(limit), unit="friends", total=limit):
process_friend(friend)
def get_tweets(api, username, limit):
""" Download Tweets from username account """
for status in tqdm(tweepy.Cursor(api.user_timeline, screen_name=username).items(limit),
unit="tw", total=limit):
process_tweet(status)
def twitterdetails(username):
auth = tweepy.OAuthHandler(cfg.twitter_consumer_key, cfg.twitter_consumer_secret)
auth.set_access_token(cfg.twitter_access_token, cfg.twiter_access_token_secret)
#preparing auth
api = tweepy.API(auth)
f = open("temptweets.txt","w+")
#writing tweets to temp file- last 1000
for tweet in tweepy.Cursor(api.user_timeline, id=username).items(1000):
f.write(tweet.text.encode("utf-8"))
f.write("\n")
#extracting hashtags
f = open('temptweets.txt', 'r')
q=f.read()
strings = re.findall(r'(?:\#+[\w_]+[\w\'_\-]*[\w_]+)', q) #Regex(s) Source: https://marcobonzanini.com/2015/03/09/mining-twitter-data-with-python-part-2/
#extracting users
tusers = re.findall(r'(?:@[\w_]+)', q)
f.close()
hashlist=[]
userlist=[]
for item in strings:
item=item.strip( '#' )
item=item.lower()
hashlist.append(item)
hashlist=hashlist[:10]
for itm in tusers:
itm=itm.strip( '@' )
itm=itm.lower()
userlist.append(itm)
userlist=userlist[:10]
return hashlist,userlist
def getConnection(profile1, profile2):
followerProfile1 = []
for user in tweepy.Cursor(api.followers, screen_name=profile1).items():
followerProfile1.append(user.screen_name)
followerProfile2 = []
for user in tweepy.Cursor(api.followers, screen_name=profile2).items():
followerProfile2.append(user.screen_name)
sharedFollower = []
for i in len(followerProfile1):
for e in len(followerProfile2):
if (followerProfile1[i] == followerProfile2[e]):
sharedFollower.append(followerProfile1[i])
print "[*] " + followerProfile1[i]
print "\n[+] Total shared follower " + str(len(sharedFollower)) + "\n"
followingProfile1 = []
for user in tweepy.Cursor(api.followers, screen_name=profile1).items():
followingProfile1.append(user.screen_name)
followingProfile2 = []
for user in tweepy.Cursor(api.followers, screen_name=profile2).items():
followingProfile2.append(user.screen_name)
sharedFollowing = []
for i in len(followingProfile1):
for e in len(followingProfile2):
if (followingProfile1[i] == followingProfile2[e]):
sharedFollowing.append(followingProfile1[i])
print "[*] " + followingProfile1[i]
print "\n[+] Total shared following " + str(len(sharedFollowing)) + "\n"
getSharedFollower(profile1Follower,profile2Follower)
def get_followers(screen_name):
timestamp = datetime.now()
log_doc = {
'accounts': {
screen_name: {
'started_at': timestamp.timestamp()
}
}
}
db.saveToImportLog(IMPORT_KEY, log_doc)
if FOLLOWER_LIMIT == 0:
print("Get all followers for @" + screen_name)
else:
print("Get %d followers for @%s" % (FOLLOWER_LIMIT, screen_name))
print(timestamp.strftime("%d.%m.%Y %H:%M:%S"))
followers = []
for user in limit_handled(tweepy.Cursor(TWITTER_API.followers, screen_name="@"+screen_name, count=200).items(FOLLOWER_LIMIT)):
followers.append(user)
return followers
# def get_all_retweeters(screen_name):
# timestamp = time.strftime("%d.%m.%Y %H:%M:%S", time.localtime())
# print(timestamp)
# all_retweeters = []
# for tweet in limit_handled(tweepy.Cursor(api.user_timeline, id=screen_name, count=200).items()):
# print(tweet.id)
# retweeters = get_retweets(tweet.id)
# # somehow get to retweeters
# # all_retweeters.append(retweeters_per_tweet)
# return all_retweeters
def get_tweets(pages=1):
"""Return a (200*pages) of Trump's tweets."""
tweets = []
for page in t.Cursor(
api.user_timeline,
screen_name="realDonaldTrump",
count=200
).pages(pages):
for tweet in page:
tweets.append(_process_text(tweet.text))
return [i for i in tweets if i]
twitter_search.py 文件源码
项目:Social_Media_Analytics_RutgersU
作者: arnavd96
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def tw_search_json(query, cnt=5):
authfile = './auth.k'
api = tw_oauth(authfile)
results = {}
meta = {
'username': 'text',
'usersince': 'date',
'followers': 'numeric',
'friends': 'numeric',
'authorid': 'text',
'authorloc': 'geo',
'geoenable': 'boolean',
'source': 'text'
}
data = []
for tweet in tweepy.Cursor(api.search, q=query, count=cnt).items():
dTwt = {}
dTwt['username'] = tweet.author.name
dTwt['usersince'] = tweet.author.created_at #author/user profile creation date
dTwt['followers'] = tweet.author.followers_count #number of author/user followers (inlink)
dTwt['friends'] = tweet.author.friends_count #number of author/user friends (outlink)
dTwt['authorid'] = tweet.author.id #author/user ID#
dTwt['authorloc'] = tweet.author.location #author/user location
dTwt['geoenable'] = tweet.author.geo_enabled #is author/user account geo enabled?
dTwt['source'] = tweet.source #platform source for tweet
data.append(dTwt)
results['meta'] = meta
results['data'] = data
return results
# TWEEPY SEARCH FUNCTION
def read_messages(self, to):
exit = []
searched_tweets = [status for status in tweepy.Cursor(self.api.search, q=to, lang=self.language).items(self.max_tweets)]
for elem in searched_tweets:
exit.append({'user_tweets':elem.user.id,'screen_name':elem.user.screen_name,'description':elem.user.description,'tweet_message':elem.text,'created_date':str(elem.created_at)})
return exit
def handle_mentions(api, responses):
for status in tweepy.Cursor(api.mentions_timeline).items():
process_status(status, responses)
get_community_tweets.py 文件源码
项目:twitter_LDA_topic_modeling
作者: kenneth-orton
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_tweets(user_id, api):
cursor = tweepy.Cursor(api.user_timeline, user_id).pages()
while True:
try:
tweets = [page for page in cursor]
except tweepy.TweepError as e:
tweets = []
api_codes = [401, 404, 500]
if not str(e): break
if(int(filter(str.isdigit, str(e))) in api_codes): break
print('get_tweets: ' + str(e))
return tweets
def get_and_process_tweets(user="realdonaldtrump"):
"""
A function that uses tweepy to download all the tweets by a given `user`,
processes the tweets for stopwords & weird internet formatting,
tokenizes the tweets using the NLTK, and then uses markovify to output a
reusable JSON file for use in generating future tweets.
"""
all_tweets = [] # a list in which to store DJT's tweets.
#get DJT's tweets.
for tweet in tweepy.Cursor(api.user_timeline, id=user).items():
if tweet.source == 'Twitter for Android': # only get tweets from DJT's
# insecure Android phone
fishy_tweet = clean_tweet(tweet.text) # and add them to the list.
all_tweets.append(fishy_tweet)
# write his crappy tweets to a text file.
with open('djt_tweets.txt', 'w') as f:
for tweet in all_tweets:
f.write(tweet + ' ') # need the space so they don't stick together.
# open the file to POS tag it and process the results into JSON.
with open("djt_tweets.txt") as t:
text = t.read()
#
text_model = POSifiedText(input_text=text, state_size=3)
model_json = text_model.to_json()
# save the json to disk for future use.
with open('djt_tweets.json', 'w', encoding='utf-8') as j:
json.dump(model_json, j, ensure_ascii=False)
def collect(self, since_id: str=None) -> Iterable[Dict[str, Any]]:
"""Collect tweets
:param since_id: TODO
:returns: TODO
"""
logger.debug("Collecting tweets")
data = json.load(open("tweets-5.json", "r"))
yield from data
# for page in limit_handled(tweepy.Cursor(self._api.list_timeline, self.account_name,
# self.source_list).pages(1)):
# yield from page