python类Stream()的实例源码

twitter_replies.py 文件源码 项目:Seq2Seq-chatbot 作者: wataruhashimoto52 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def on_error(self, status_code):
        print('Stream error with status code:', status_code, file = sys.stderr)
        return False
responsebot_stream.py 文件源码 项目:ResponseBot 作者: invinst 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def start(self, retry_limit=None):
        """
        Try to connect to Twitter's streaming API.

        :param retry_limit: The maximum number of retries in case of failures. Default is None (unlimited)
        :raises :class:`~tweepy.error.TweepyError`: If there's some critical API error
        """
        # Run tweepy stream
        wrapper_listener = TweepyWrapperListener(listener=self.listener)
        stream = tweepy.Stream(auth=self.client.tweepy_api.auth, listener=wrapper_listener)

        retry_counter = 0
        while retry_limit is None or retry_counter <= retry_limit:
            try:
                retry_counter += 1
                if not self.client.config.get('user_stream'):
                    logging.info('Listening to public stream')
                    stream.filter(follow=self.filter.follow, track=self.filter.track)
                else:
                    if self.filter.follow:
                        logging.warning('Follow filters won\'t be used in user stream')

                    logging.info('Listening to user stream')
                    stream.userstream(track=self.filter.track)
            except AttributeError as e:
                # Known Tweepy's issue https://github.com/tweepy/tweepy/issues/576
                if "'NoneType' object has no attribute 'strip'" in str(e):
                    pass
                else:
                    raise
twitter.py 文件源码 项目:relocaliser 作者: very-scary-scenario 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run_game():
    saved_games = sorted(
        (fn for fn in os.listdir(GAMES_DIR) if not fn.startswith('.'))
    )

    if not saved_games:
        listener = start_new_game()
    else:
        with open(os.path.join(GAMES_DIR, saved_games[-1])) as gf:
            listener = camel.load(gf.read())
        if listener.over:
            listener = start_new_game()

    stream = tweepy.Stream(auth=api.auth, listener=listener)
    stream.userstream()
streaming.py 文件源码 项目:twitter-sentiment 作者: words-sdsc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, socketio):
        manager = CredentialsManager(settings.CREDENTIALS_PATH)
        cred = manager.read()

        auth = tweepy.OAuthHandler(cred['twitter_api_key'] ,
                                   cred['twitter_api_secret'])

        auth.set_access_token(cred['access_token'],
                              cred['access_token_secret'])
        self.stream = tweepy.Stream(auth, StreamListener(socketio))
twitter_channel.py 文件源码 项目:pinkpython-bot 作者: mtnalonso 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init_listener(self):
        self.listener = TwitterListener(self.inbox_queue)
        self.stream = tweepy.Stream(auth=self.auth, listener=self.listener)
        self.stream.filter(track=[self.username], async=True)
        self.active = True
Tweety.py 文件源码 项目:Twitter 作者: LucasRodriguez 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    #This handles Twitter authetification and the connection to Twitter Streaming API
    l = StdOutListener()
    auth = OAuthHandler(k.consumer_key, k.consumer_secret)
    auth.set_access_token(k.access_token, k.access_secret)
    stream = Stream(auth, l)


    #This line filter Twitter Streams to capture data by keywords
    stream.filter(languages=["en"], track=[("Valentine")])
twitter-bot.py 文件源码 项目:meetup-utilities 作者: igauravsehrawat 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def twitter_stream():
    auth = tweepy.OAuthHandler(
        bc.provide_key("consumer_key"), bc.provide_key("consumer_secret"))
    auth.set_access_token(
        bc.provide_key("access_token"), bc.provide_key("access_token_secret"))
    api = tweepy.API(auth)

    twitter_stream_listener = MyStreamListener()
    twitter_stream = tweepy.Stream(
        auth = api.auth, listener=twitter_stream_listener)
    # twitter_stream.filter(track=["python"])
    twitter_stream.userstream()
mineTweets.py 文件源码 项目:Twitter-Analysis 作者: AparaV 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def trackLiveTweets(self):
        print ("Enter a key word to track for 5 minutes. Be as specific as possible")
        self.file = 'tweets.json'
        self.trackWord = str(raw_input())
        self.twitter_stream = Stream(self.auth, Listener(self.file))
        self.twitter_stream.filter(track=[self.trackWord])
        return self.file

    # Getting tweets from user profile for analysis
tweetminer.py 文件源码 项目:Twitter-Analysis 作者: AparaV 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_live_tweets(auth, phrase, fname='./tmp/liveStream.json', runTime=60):
    twitter_stream = Stream(auth, Listener(fname))
    twitter_stream.filter(track=[phrase], async=True)
    time.sleep(runTime)
    twitter_stream.disconnect()
stream.py 文件源码 项目:es-presentation-examples 作者: hkulekci 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def main(argv):
    print(argv)
    if len(argv) < 2:
        sys.stderr.write("Usage: %s <searchKeyword>" % (argv[0],))
        return 1

    l = StdOutListener(config.es_configuration)
    auth = OAuthHandler(config.consumer_key, config.consumer_secret)
    auth.set_access_token(config.access_token, config.access_token_secret)
    stream = Stream(auth, l)
    print "Stream started\n"
    stream.filter(track=[argv[1]])
    print "Stream ended!\n"
twitter.py 文件源码 项目:DET 作者: sensepost 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def listen():
    start_twitter()
    try:
        app_exfiltrate.log_message('info', "[twitter] Listening for DMs...")
        stream = Stream(auth, StdOutListener())
        stream.userstream()
    except Exception, e:
        app_exfiltrate.log_message(
            'warning', "[twitter] Couldn't listen for Twitter DMs".format(e))
index_twitter_stream.py 文件源码 项目:discursive 作者: Data4Democracy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def search():
    stream_listener = StreamListener()
    stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
    stream.filter(track=TOPICS)
    return
tweet_extract.py 文件源码 项目:twelisis 作者: yagamiash 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run_app(self,tag_name,file_name):
        self.tag_name=tag_name
        self.file_name=file_name
        f=open(self.file_name,"r") #extract passwords from a different file-comparitively safer
        a=f.read().splitlines()
        access_token,access_token_secret,consumer_key,consumer_secret=a
        f.close()
        l=StdOutListener()
        auth =OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret)
        stream=Stream(auth,l,language='en',filter_level="medium")
        stream.filter(track=[tag_name])#filtering tweets based on topic
tweet_extract.py 文件源码 项目:twelisis 作者: yagamiash 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run_app(self,tag_name,file_name):
        self.tag_name=tag_name
        self.file_name=file_name
        f=open(self.file_name,"r") #extract passwords from a different file-comparitively safer
        a=f.read().splitlines()
        access_token,access_token_secret,consumer_key,consumer_secret=a
        f.close()
        l=StdOutListener()
        auth =OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret)
        stream=Stream(auth,l,language='en',filter_level="medium")
        stream.filter(track=[tag_name])#filtering tweets based on topic
trader_bot.py 文件源码 项目:gnosis-twitter-bot 作者: ConsenSys 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start_streaming(self):
        """Starts listening to the streaming API"""
        try:
            self._logger.info('Starting streaming...')
            self._stream = tweepy.Stream(auth=self._auth.get_authentication(), listener=self._instance)
            self._stream.userstream(replies=True, async=True)
            self._logger.info('Streaming started')
        except:
            self._logger.error('An error occurred:', exc_info=True)
cli.py 文件源码 项目:goal 作者: victorskl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self.auth = tweepy.OAuthHandler(config.consumer_key, config.consumer_secret)
        self.auth.set_access_token(config.access_token, config.access_token_secret)
        self.stream = Stream(self.auth, TweetStreamingListener())
        signal.signal(signal.SIGINT, self.exit_signal_handler)
stream.py 文件源码 项目:Twitter-Hashtag-Tracking 作者: xuwenyihust 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def send_data(c_socket, keyword):
    auth = OAuthHandler(ckey, csecret)
    auth.set_access_token(atoken, asecret)

    twitter_stream = Stream(auth, listener(c_socket))
    twitter_stream.filter(track=[keyword], languages=['en'])
geometrize_bot.py 文件源码 项目:geometrize-twitter-bot 作者: Tw1ddle 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self,
        tweepy_auth,
        tweepy_api,
        on_connect = None,
        on_timeout = None,
        on_error = None,
        on_status = None,
        on_setup_filter = None):

        print("Will create Geometrize bot")

        self.api = tweepy_api
        self.auth = tweepy_auth

        # Set up a stream listener
        self.stream_listener = geometrize_stream_listener.GeometrizeStreamListener(self.api)
        self.stream_listener.on_connect_cb = on_connect
        self.stream_listener.on_timeout_cb = on_timeout
        self.stream_listener.on_error_cb = on_error
        self.stream_listener.on_status_cb = on_status

        self.stream = tweepy.Stream(self.auth, self.stream_listener)

        # Start listening for filtered tweets.
        if on_setup_filter is not None:
            on_setup_filter(self.stream)

        print("Did create Geometrize bot")
crawl.py 文件源码 项目:twitter-trends-summarizer 作者: yuva29 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def query_through_stream(topic):
    stream = Stream(auth, l)
    stream.filter(track=[topic])
TwitterApiScrap.py 文件源码 项目:crawler-twitter 作者: sidgleyandrade 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def init(self):
        auth = tweepy.OAuthHandler(self.consumer_key, self.consumer_secret)
        auth.set_access_token(self.access_token, self.access_token_secret)

        try:
            my_stream_listener = MyStreamListener
            my_stream = tweepy.Stream(auth=auth,
                                      listener=my_stream_listener(
                                          crud=self.CRUD,
                                          conn_sec=self.conn_sec,
                                          conn_schema=self.conn_schema,
                                          conn_table=self.conn_table))

            # Choose the kind of stream - either bounding box or word track.
            if self.search_word:
                my_stream.filter(track=[self.search_word], async=True)
            else:
                my_stream.filter(locations=self.geo, async=True)

            # Check if the connection stream is active and
            # break if it is not. init() function will restart
            # the connection stream.
            self.running = my_stream.running
            while True:
                if not my_stream.running:
                    self.running = False
                    time.sleep(60)  # Check each 60 sec.
                    break
        except Exception as e:
            logging.error(e)
            pass


问题


面经


文章

微信
公众号

扫码关注公众号