def start_stream(self):
twitterStream = Stream(auth, streamer())
twitterStream.filter(track=["#airqualityin"])
python类Stream()的实例源码
def start_polling(self):
""" Strats an infinite loop to see if there are new events.
The loop ends when the `self._polling_should_run` will be false
(set `True` by `self.run` and `False` by `self.stop`)
"""
stream_listener = MyStreamListener()
stream_listener.set_endpoint(self)
self._stream = tweepy.Stream(
auth=self._api.auth,
listener=stream_listener
)
self._stream.userstream(async=True)
self._polling_is_running = True
def main():
with tf.Session() as sess:
listener = QueueListener(sess)
stream = Stream(listener.auth, listener)
stream.filter(languages=["ja"],
track=['?', '?', '?', '?', '???', '??', '??', '?', 'http', 'www', 'co', '@', '#', '?', '?', '?',
'.', '!', ',', ':', '?', '?', ')', '...', '??'])
try:
while True:
try:
stream.sample()
except KeyboardInterrupt:
print('KEYBOARD INTERRUPT')
return
except (socket.error, http.client.HTTPException):
global tcpip_delay
print('TCP/IP Error: Restarting after %.2f seconds.' % tcpip_delay)
time.sleep(min(tcpip_delay, MAX_TCPIP_TIMEOUT))
tcpip_delay += 0.25
finally:
stream.disconnect()
print('Exit successful, corpus dumped in %s' % (listener.dumpfile))
def tweet_listener():
consumer_key = os.getenv("consumer_key")
consumer_secret = os.getenv("consumer_secret")
access_token = os.getenv("access_token")
access_token_secret = os.getenv("access_token_secret")
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
while True:
try:
stream = tweepy.Stream(auth=api.auth,
listener=StreamListener(api))
print("listener starting...")
stream.userstream()
except Exception as e:
print(e)
print(e.__doc__)
def gymkhana_main():
json_config = open('tokens.json', 'r')
tokens = json.load(json_config)
json_config.close()
consumer_key = tokens['consumer_key']
consumer_secret = tokens['consumer_secret']
access_token = tokens['access_token']
access_token_secret = tokens['access_token_secret']
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
listener = GymkhanaListener(api)
stream = tweepy.Stream(api.auth, listener)
filtro = ['@pytwe_bot', 'pytwe_bot', 'pytwe']
stream.filter(track=filtro)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('source_file', type = argparse.FileType('a'))
parser.add_argument('target_file', type = argparse.FileType('a'))
parser.add_argument('--languages', nargs = '+', default = ['ja'])
args = parser.parse_args()
while True:
try:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
reply_stream_listener = ReplyStreamListener(api, args.target_file, args.source_file)
reply_stream = tweepy.Stream(auth = api.auth, listener = reply_stream_listener)
reply_stream.sample(languages = args.languages)
except:
traceback.print_exc(limit = 10, file = sys.stderr, chain = False)
time.sleep(10)
continue
def tweet_listener():
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
while True:
try:
stream = tweepy.Stream(auth=api.auth,
listener=StreamListener(api))
print("listener starting...")
stream.userstream()
except Exception as e:
print(e)
print(e.__doc__)
def main():
# track = ['dundundundun']
full = [-180,-90,180,90]
rutgers=[ -74.496245,-40.464329, -74.374364,40.540052 ]
nj = [-76,38.5,-73.5,41.5]
listen = SListener(api, 'data')
stream = tweepy.Stream(auth, listen)
#import requests.packages.urllib3
#requests.packages.urllib3.disable_warnings('SNIMissingWarning')
#requests.packages.urllib3.disable_warnings('InsecurePlatformWarning')
#try:
stream.filter(locations = nj)
print ("Streaming started...")
#except:
# print ("error!")
# stream.disconnect()
def start_streaming(self, user_id, callback):
"""
:param user_id: String
:param callback: method that takes Tweet text (String) as a parameter.
Starts streaming tweets and returning data to the callback.
"""
self.twitter_listener = TwitterListener(user_id, callback=callback)
twitter_stream = Stream(self._auth, self.twitter_listener)
print("Starting Twitter stream for account: %s" % user_id)
twitter_stream.filter(follow=[user_id])
# If we got here because of an API error, raise it.
if self.twitter_listener and self.twitter_listener.get_error_status():
raise Exception("Twitter API error: %s" %
self.twitter_listener.get_error_status())
def process_request(self, obj):
"""
Method for processing a query with the Twitter Public Streams
API.
"""
auth = self.authenticate()
listener = CustomStreamListener(faucet=self)
stream = tweepy.Stream(auth, listener)
kwargs = self._format_query(obj)
stream.filter(**kwargs)
_LOGGER.info('Received %s objects from Twitter and saved %s of them',
stream.listener.data_count,
stream.listener.saved_data_count)
return Cargo(status_code=listener.status_code, notes=listener.notes)
def main(args):
if args.debug:
logger.setLevel(logging.DEBUG)
auth = tweepy.OAuthHandler(args.consumer_key, args.consumer_secret)
auth.set_access_token(args.access_token, args.access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
screen_name = api.me().screen_name
if args.classifier == 'mock':
classifier = classifiers.MockClassifier()
elif args.classifier == 'local':
classifier = classifiers.URLClassifier(classifiers.ImageClassifier(args.dataset_path, INPUT_SHAPE))
elif args.classifier == 'remote':
classifier = classifiers.RemoteClassifier(args.remote_endpoint)
stream = tweepy.Stream(auth=auth, listener=ReplyToTweet(screen_name, classifier, api, args.silent))
logger.info('Listening as {}'.format(screen_name))
stream.userstream(track=[screen_name])
tweets_producer.py 文件源码
项目:Google-Finance-Stock-Data-Analysis
作者: hpnhxxwn
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def fetch_twitter_status(producer, symbols):
"""
Retrieve English tweet associated with stock symbols (e.g. SNAP, AAPL, GOOG, etc)
:param producer: Kafka producer
:param symbols: stock symbol list
:return: None
"""
try:
global stream
if stream is not None and stream.running is True:
logger.debug("Tweet streamming is running")
stream.disconnect()
del stream
logger.info("Fetching tweets")
stream_listener = Listener(api, producer, tweet_topic, symbols)
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=symbols, async=True, languages=["en"])
except TweepError as te:
logger.debug("TweepyExeption: Failed to get tweet for stocks caused by: %s" % te.message)
except Exception as e:
logger.warn("Eception: Failed to get tweet for stocks caused by: %s" % e.message)
def __init__(self, path_home, conn_sec, schema, table, consumer_key,
consumer_secret, access_token, access_token_secret,
geo=None, search_word=None):
self.geo = geo
self.path_home = path_home
self.conn_sec = conn_sec
self.conn_schema = schema
self.conn_table = table
self.search_word = search_word
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.access_token = access_token
self.access_token_secret = access_token_secret
self.running = False
# Create database connection to store the tweets
self.CRUD = CRUD(self.path_home, self.conn_sec)
# Create database table if it does not exist
self.create_table()
# Create the Twitter Stream object if running variable is False
while True:
if not self.running:
self.init()
def streaming(credentials, coins, queries, refresh, path, realtime=False, logTracker=True, logTweets=True, logSentiment=False, debug=True):
# User Error Checks
if len(coins) <= 0: print("Error: You must include at least one coin."); return
if len(coins) >= 10: print("Warning: Fewer than ten coins recommended.")
if len(queries) <= 0: print("Error: You must include at least one query."); return
if len(queries) >= 20: print("Warning: Fewer than twenty queries recommended.")
if refresh <= 0: print("Error: Refresh rate must be greater than 0"); return
auth = tweepy.OAuthHandler(credentials[0], credentials[1])
auth.set_access_token(credentials[2], credentials[3])
if logSentiment:
global SentimentIntensityAnalyzer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
while True:
# Start streaming -----------------------------
try:
print("Streaming Now...")
listener = CoinListener(auth, coins, queries, refresh, path, realtime, logTracker, logTweets, logSentiment, debug)
stream = tweepy.Stream(auth, listener)
stream.filter(track=queries)
except (Timeout, ConnectionError, ReadTimeoutError):
print("Reestablishing Connection...")
with open("%sError_Log.txt" % path, "a") as outfile:
outfile.write("%s Error: Connection Dropped\n" % time.strftime('%m/%d/%Y %H:%M'))
time.sleep((15*60)+1) # Wait at least 15 minutes before restarting listener
# ---------------------------------------------
def success(name,tme):
global count
# Splits the time duration into relevant information
d,waste1,waste2 = tme.split()
d=eval(d)
# runs when needs to make STREAMLISTENER API Calls
if d>7:
twitter_stream = Stream(auth, MyListener(time_limit=d))
twitter_stream.filter(track=[name])
else:
# runs when needs to make REST API Calls
past(name,d)
return render_template('welcome.html',count=count,name=name,duration=tme)
# collects the input data from the FORM and then redirects the input to the next page
def twitter_stream(client, project_name, topic, track_list):
"""Connects to Twitter stream API."""
print 'Connecting to Twitter...'
with open('twitter.json') as f:
twitter_cred = json.load(f)
auth = tweepy.auth.OAuthHandler(twitter_cred['consumer_key'], twitter_cred['consumer_secret'])
auth.set_access_token(twitter_cred['access_token'], twitter_cred['access_token_secret'])
watcher = StreamWatcherListener(client=client, project=project_name, topic=topic)
stream = tweepy.Stream(auth, watcher, timeout=None)
track_list = [k for k in track_list.split(',')]
stream.filter(None, track_list)
def start_feeds(self, *, feeds = None):
if self.reconnecting:
await self.reconnect_ready.wait()
return
self.reconnecting = True
await self.reconnect_ready.wait()
self.reconnect_ready.clear()
if feeds: self.feeds = feeds
if self.stream: self.stream.disconnect()
self.stream = tweepy.Stream(auth = clients.twitter_api.auth, listener = self)
self.stream.filter(follow = set([id for feeds in self.feeds.values() for id in feeds]), **{"async" : "True"})
self.bot.loop.call_later(120, self.reconnect_ready.set)
self.reconnecting = False
def init_stream(id):
global listener
global poster
# ll_wikia 2734031000
# mkydyrea 3299062544
# LLupdates 4423137133
# ll_extra 739117766100189184
# lovelive_staff 347849994
# lovelive_sif 1346933186
# ischyrb 357915189
listener = TweetListener(id)
poster = tweepy.Stream(auth=auth, listener=listener)
poster.filter(follow=id, track=['#??????????????'], async=True)
def restart_stream(id):
global listener
global poster
listener = TweetListener(id)
poster = tweepy.Stream(auth=auth, listener=listener)
poster.filter(follow=id, track=['#??????????????'], async=True)
def on_status(self, status):
if status.in_reply_to_user_id == myid:
log.debug("[ Stream ] ???????")
self.queue.put(status)
else:
pass
def run(self):
l = Listener(self.queue)
stream = tweepy.Stream(auth, l)
while True:
try:
stream.userstream()
except Exception as e:
api.send_direct_message(
screen_name=debug_id, text="Stream down. And now restarting. Wait 60s...")
log.exception(e)
time.sleep(60)
stream = tweepy.Stream(auth, l)
api.send_direct_message(
screen_name=debug_id, text="Start streaming.")
def tweetassembler(**args):
in_reply_to_status = args['in_reply_to_status']
if in_reply_to_status is not None:
regex = u'.*??.*'
if re.match(regex, in_reply_to_status.text, re.U):
# ??????ID???
id = in_reply_to_status.in_reply_to_status_id
# ??????????????
qkou_status = api.get_status(id)
entities = qkou_status.entities['hashtags']
# ????????????????
if len(entities) > 0:
hashtag = entities[0]['text']
# ??????????????
info_num = re.search("(?<=lec)[0-9]*", hashtag)
news_num = re.search("(?<=news)[0-9]*", hashtag)
if info_num is not None:
qkou_id = info_num.group()
log.debug("[ Stream ] Info??????")
dm_text = get_info(qkou_id)
elif news_num is not None:
news_id = news_num.group()
log.debug("[ Stream ] News??????")
dm_text = get_news(news_id)
else:
pass
try:
api.send_direct_message(
user_id=in_reply_to_status.user.id, text=dm_text)
log.debug('[ Stream ] DM???')
except Exception as e:
log.exception(e)
else:
pass
def main():
"""Bot"""
json_config = open("tokens.json", 'r')
tokens = json.load(json_config)
json_config.close()
consumer_key = tokens["consumer_key"]
consumer_secret = tokens["consumer_secret"]
access_token = tokens["access_token"]
access_token_secret = tokens["access_token_secret"]
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)
listener = PyTweListener(api)
stream = tweepy.Stream(api.auth, listener)
timeline = raw_input("You want to see your home timeline? ")
if timeline == "yes":
stream.userstream()
# filtrando tweets por un patrón
# stream.filter(track=["ultra kek 0 name"])
else:
filtr = ["ultra kek 0 name", "pytwe_bot", "i like botijos", "when te pasa", "y naci ciego", "nosvemo", "@pytwe_bot search", "hora botijo"]
# data = raw_input("Add words to filter. type end to finish the list. ")
# while data != 'end':
# filtr.append(data)
# data = raw_input()
stream.filter(track=filtr)
def open_stream(self, users):
"""
Returns a stream
"""
try:
listen = TwitterMentionsListener(self.api)
stream = tweepy.Stream(self.__auth, listen)
stream.filter(track=users)
except Exception as ex:
ExceptionHandler.handle_exception(ex, False)
def _get_stream(writer, config, limit=0):
api = get_oauth_api(config)
listener = PassThroughStreamListener(writer, limit=limit)
return Stream(auth=api.auth, listener=listener)
def crawl_tweets(filename, track=None, follow=None, locations=None, auth=None, api_keys=None, time_limit=None, log_level=None):
with gzip.open(filename + '.part', 'wt') as f:
listener = _Listener(f, time_limit)
if not auth:
auth = OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret'])
auth.set_access_token(api_keys['access_token_key'], api_keys['access_token_secret'])
stream = Stream(auth, listener)
stream.filter(track=track, follow=follow, locations=locations)
importer = Importer()
tweet_count = importer(filename + '.part')
os.rename(filename + '.part', filename)
return tweet_count
def streamer_entrypoint():
twitter_stream = tweepy.Stream(twitter.auth, AccountListener())
twitter_stream.filter(follow=[str(twitter.get_id(k[0])) for k in configuration.get_accounts()], async=False)
def _setup_stream(self):
logger.info("Starting twitter steam")
if not self.api:
raise RuntimeError("Client not authenticated!")
s_listener = HashtagStreamListener(self)
stream = tweepy.Stream(auth=self.api.auth, listener=s_listener)
stream.filter(track=self.tags, async=True)
def start_streaming(self, callback):
"""Starts streaming tweets and returning data to the callback."""
self.twitter_listener = TwitterListener(
callback=callback, logs_to_cloud=self.logs_to_cloud)
twitter_stream = Stream(self.twitter_auth, self.twitter_listener)
self.logs.debug("Starting stream.")
twitter_stream.filter(follow=[TRUMP_USER_ID])
# If we got here because of an API error, raise it.
if self.twitter_listener and self.twitter_listener.get_error_status():
raise Exception("Twitter API error: %s" %
self.twitter_listener.get_error_status())
def __init__(self):
loggit('TwitterSphereConfig...')
#consumer key, consumer secret, access token, access secret.
ckey="QpBbAaG5j5LQwtISpXHSnMaaZ"
csecret="siWbAxNBq9KojlwcoTnmZqo6yrUNsWAvVhZZ9DOY9nP2wRIUiw"
atoken="2728509485-FJjALLzmjoF4uBWMCWRz7gG2suUGSK9qYQiYI2a"
asecret="eJe1olMUVhK5gxz2RNZYNd4RT5H0zrJmINg2ot8w07Jaw"
api_url = 'http://192.169.141.201/iflychatbot/api/'
self.users = User.query.all()
self.sources = Source.query.all()
self.sources_count = len(self.sources)
self.sources_screen_names = [s.name.lower() for s in self.sources]
# Provide Twitter Dev tokens
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
# Use API to lookup id's for screen_names`
self.api = API(auth)
global twitterStream
twitterStream = Stream(auth, TwitterSphere())
global twitter_user_ids #keep this updated
global twitter_screen_names
twitter_user_ids = [s.id_str for s in self.sources if s.id_str != None]
twitter_screen_names = [s.name.lower() for s in self.sources if s.id_str != None]
global update_stream
update_stream = True #to trigger initial streaming, set false after stream starts