def Query():
QUERY = 'big'
# The file to write output as newline-delimited JSON documents
OUT_FILE = QUERY + ".json"
# Authenticate to Twitter with OAuth
auth = twitter.oauth.OAuth(OAUTH_TOKEN, OAUTH_TOKEN_SECRET,
CONSUMER_KEY, CONSUMER_SECRET)
# Create a connection to the Streaming API
twitter_stream = twitter.TwitterStream(auth=auth)
print 'Filtering the public timeline for "{0}"'.format(QUERY)
# See https://dev.twitter.com/docs/streaming-apis on keyword parameters
stream = twitter_stream.statuses.filter(track=QUERY)
# Write one tweet per line as a JSON document.
with io.open(OUT_FILE, 'w', encoding='utf-8', buffering=1) as f:
for tweet in stream:
f.write(unicode(u'{0}\n'.format(json.dumps(tweet, ensure_ascii=False))))
print tweet['text']
python类TwitterStream()的实例源码
def twitter_login(self, cons_key, cons_secret, access_token, \
access_token_secret):
"""Logs in to Twitter, using the provided access keys. You can get
these for your own Twitter account at apps.twitter.com
Arguments
cons_key - String of your Consumer Key (API Key)
cons_secret - String of your Consumer Secret (API Secret)
access_token - String of your Access Token
access_token_secret
- String of your Access Token Secret
"""
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'twitter_login', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._oauth = twitter.OAuth(access_token, access_token_secret, \
cons_key, cons_secret)
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
def _twitter_reconnect(self):
"""Logs in to Twitter, using the stored OAuth. This function is
intended for internal use, and should ONLY be called after
twitter_login has been called.
"""
# Report the reconnection attempt.
self._message(u'_twitter_reconnect', \
u"Attempting to reconnect to Twitter.")
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'_twitter_reconnect', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
# Report the reconnection success.
self._message(u'_twitter_reconnect', \
u"Successfully reconnected to Twitter!")
def twitter_login(self, cons_key, cons_secret, access_token, \
access_token_secret):
"""Logs in to Twitter, using the provided access keys. You can get
these for your own Twitter account at apps.twitter.com
Arguments
cons_key - String of your Consumer Key (API Key)
cons_secret - String of your Consumer Secret (API Secret)
access_token - String of your Access Token
access_token_secret
- String of your Access Token Secret
"""
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'twitter_login', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._oauth = twitter.OAuth(access_token, access_token_secret, \
cons_key, cons_secret)
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
def _twitter_reconnect(self):
"""Logs in to Twitter, using the stored OAuth. This function is
intended for internal use, and should ONLY be called after
twitter_login has been called.
"""
# Report the reconnection attempt.
self._message(u'_twitter_reconnect', \
u"Attempting to reconnect to Twitter.")
# Raise an Exception if the twitter library wasn't imported
if not IMPTWITTER:
self._error(u'_twitter_reconnect', u"The 'twitter' library could not be imported. Check whether it is installed correctly.")
# Log in to a Twitter account
self._t = twitter.Twitter(auth=self._oauth)
self._ts = twitter.TwitterStream(auth=self._oauth)
self._loggedin = True
# Get the bot's own user credentials
self._credentials = self._t.account.verify_credentials()
# Report the reconnection success.
self._message(u'_twitter_reconnect', \
u"Successfully reconnected to Twitter!")