python类TweepError()的实例源码

views.py 文件源码 项目:tweet-analysis 作者: D4D3VD4V3 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def twittercallback():
    verification = request.args["oauth_verifier"]
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    try:
        auth.request_token = session["request_token"]
    except KeyError:
        flash("Please login again", "danger")
        return redirect(url_for("bp.home"))

    try:
        auth.get_access_token(verification)
    except tweepy.TweepError:
        flash("Failed to get access token", "danger")
        return redirect(url_for("bp.home"))

    session["access_token"] = auth.access_token
    session["access_token_secret"] = auth.access_token_secret

    return render_template("twittercallback.html", form=HashtagForm())
twitter.py 文件源码 项目:picdescbot 作者: elad661 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(self, picture):
        "Send a tweet. `picture` is a `Result` object from `picdescbot.common`"
        retries = 0
        status = None
        filename = picture.url.split('/')[-1]
        data = picture.download_picture()
        try:
            while retries < 3 and not status:
                if retries > 0:
                    self.log.info('retrying...')
                    data.seek(0)
                try:
                    status = self.api.update_with_media(filename=filename,
                                                        status=picture.caption,
                                                        file=data)
                except tweepy.TweepError as e:
                    self.log.error("Error when sending tweet: %s" % e)
                    retries += 1
                    if retries >= 3:
                        raise
                    else:
                        time.sleep(5)
        finally:
            data.close(really=True)
        return status.id
helpers.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def log_tweep_error(logger, tweep_error):
    """Log a TweepError exception."""
    if tweep_error.api_code:
        if tweep_error.api_code == 32:
            logger.error("invalid API authentication tokens")
        elif tweep_error.api_code == 34:
            logger.error("requested object (user, Tweet, etc) not found")
        elif tweep_error.api_code == 64:
            logger.error("your account is suspended and is not permitted")
        elif tweep_error.api_code == 130:
            logger.error("Twitter is currently in over capacity")
        elif tweep_error.api_code == 131:
            logger.error("internal Twitter error occurred")
        elif tweep_error.api_code == 135:
            logger.error("could not authenticate your API tokens")
        elif tweep_error.api_code == 136:
            logger.error("you have been blocked to perform this action")
        elif tweep_error.api_code == 179:
            logger.error("you are not authorized to see this Tweet")
        else:
            logger.error("error while using the REST API: %s", tweep_error)
    else:
        logger.error("error with Twitter: %s", tweep_error)
users.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_hydrated(writer, user_ids=None, screen_names=None):
    """Get hydrated Twitter User-objects from a list of user ids and/or screen names."""
    LOGGER.info("get_hydrated() starting")
    ensure_at_least_one(user_ids=user_ids, screen_names=screen_names)
    user_ids = user_ids if user_ids else []
    screen_names = screen_names if screen_names else []

    # initialize config and Twitter API
    config = read_config()
    api = get_oauth_api(config)  # OAuth gives more capacity for the users/lookup API

    # process user ids and/or screen names, storing returned users in JSON format
    num_users = 0
    for chunk in gen_chunks(user_ids, screen_names, size=LOOKUP_USERS_PER_REQUEST):
        try:
            num_users += write_objs(writer, api.lookup_users,
                                    {"user_ids": chunk[0], "screen_names": chunk[1]})
        except TweepError as err:
            log_tweep_error(LOGGER, err)
    LOGGER.info("downloaded %d user(s)", num_users)

    # finished
    LOGGER.info("get_hydrated() finished")
users.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_followers(writer, user_id=None, screen_name=None):
    """Get the ids of the followers for a Twitter user id or screen name."""
    LOGGER.info("get_followers() starting")
    ensure_only_one(user_id=user_id, screen_name=screen_name)

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process user id or screen name, storing returned ids in plain text
    args = {"count": FOLLOWERS_IDS_COUNT}
    if user_id is not None:
        args.update({"user_id": user_id})
    if screen_name is not None:
        args.update({"screen_name": screen_name})
    limit = config.getint("followers", "limit")
    try:
        num_ids = write_ids(writer, api.followers_ids, args, cursored=True, limit=limit)
        LOGGER.info("downloaded %d follower id(s)", num_ids)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_followers() finished")
users.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_friends(writer, user_id=None, screen_name=None):
    """Get the ids of the friends for a Twitter user id or screen name."""
    LOGGER.info("get_friends() starting")
    ensure_only_one(user_id=user_id, screen_name=screen_name)

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process user id or screen name, storing returned ids in plain text
    args = {"count": FRIENDS_IDS_COUNT}
    if user_id is not None:
        args.update({"user_id": user_id})
    if screen_name is not None:
        args.update({"screen_name": screen_name})
    limit = config.getint("friends", "limit")
    try:
        num_ids = write_ids(writer, api.friends_ids, args, cursored=True, limit=limit)
        LOGGER.info("downloaded %d friend id(s)", num_ids)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_friends() finished")
tweets.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_hydrated(writer, tweet_ids):
    """Get hydrated Tweet-objects from a list of Tweet ids."""
    LOGGER.info("get_hydrated() starting")

    # initialize config and Twitter API
    config = read_config()
    api = get_oauth_api(config)  # OAuth gives more capacity for the statuses/lookup API

    # process Tweet ids, storing returned Tweets in JSON format
    num_tweets = 0
    for chunk in gen_chunks(tweet_ids, size=LOOKUP_STATUSES_PER_REQUEST):
        try:
            num_tweets = write_objs(writer, api.statuses_lookup, {"id_": chunk[0]})
        except TweepError as err:
            log_tweep_error(LOGGER, err)
    LOGGER.info("downloaded %d Tweet(s)", num_tweets)

    # finished
    LOGGER.info("get_hydrated() finished")
tweets.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_retweets(writer, tweet_id):
    """Get hydrated Retweet-objects for a given Tweet id."""
    LOGGER.info("get_retweets() starting")

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process Tweet id, storing returned Retweets in JSON format
    try:
        num_retweets = write_objs(writer, api.retweets, {"id": tweet_id, "count": RETWEETS_COUNT})
        LOGGER.info("downloaded %d Retweet(s)", num_retweets)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_retweets() finished")
Trending_Bot.py 文件源码 项目:Trending-Places-in-OpenStreetMap 作者: geometalab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def tweet_status_trends(self):
        """
        Tweets the top trending places
        """
        logging.info("Updating status with Trending places....")

        try:
            base_text = "Top trending places in #OSM " + DATE.strftime('%d/%m') + ': '
            end_text = ''
            count_available = TWITTER_STATUS_LIMIT - len(base_text) - len(end_text)
            text = Ft().get_cities_from_file(str(DATE.date()), REGION, count_available)
            img = Ft.get_trending_graph(str(DATE.date()), REGION)
            if text:
                self.api.update_with_media(img, base_text + text + end_text)
                self.state['last_tweet'] = time.time()
            else:
                self.api.update_status(ERROR_MSG)
                logging.info("Could not update status. Rechecking in a while....")

        except tweepy.TweepError as e:
            self._log_tweepy_error('Can\'t update status because', e)
oauth_handler.py 文件源码 项目:twitter_LDA_topic_modeling 作者: kenneth-orton 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def manage_auth_handlers(auths):
    index = 0
    while True:
        api = auths[index]
        try:
            limit = api.rate_limit_status()
            status_limit = limit['resources']['statuses']['/statuses/user_timeline']['remaining']
            if status_limit > 180:
                return api
        except tweepy.TweepError as e:
            #print('manage_auth_handlers ' + str(e))
            pass
        finally:
            if index == (len(auths) - 1):
                index = 0
            else:
                index += 1
twitter.py 文件源码 项目:TwitterStockMonitor 作者: semaaJ 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_latest_tweet(self):
        """Checks the twitter handle for new tweets.
           If there has been a new tweet, it will return the Tweet
           to be checked for companies"""

        try:
            latest_tweet = self.api.user_timeline(screen_name=self.handle, count=1)[0]
            tweet = latest_tweet.text.encode('ascii', 'ignore').decode('utf-8')  # Removes emjois

            with open(f'{LATEST_TWEET}{self.handle}.txt', "r") as f:
                old_tweet = f.read()

            if tweet != old_tweet:
                with open(f'{LATEST_TWEET}{self.handle}.txt', 'w') as f:
                    f.write(tweet)

                self.tweet_id = latest_tweet.id_str
                self.tweet = tweet

                return tweet

        except tweepy.TweepError as error:
            logging.debug(error)
twitter.py 文件源码 项目:TwitterStockMonitor 作者: semaaJ 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def initial_tweet(self, matches):
        """Tweets when a company is mentioned, along with it's sentiment."""

        sentiment = self.sentiment_analysis()
        sentiment_dict = {"positive": u"\U00002705",
                          "negative": u"\U0000274E",
                          "neutral": u"\U00002796"
                          }

        for comp in matches:
            try:
                self.api.update_status(f'{self.handle} just mentioned {comp.upper()} {sentiment}ly '
                                       f'in their latest tweet! '
                                       f'https://twitter.com/{self.handle}/status/{self.tweet_id}')

            except tweepy.TweepError as error:
                logging.debug(error)
twitter.py 文件源码 项目:TwitterStockMonitor 作者: semaaJ 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def share_output(self):
        """Calls difference_in_shares from the Companies module,
            Outputs the data to twitter."""

        share_dict = company.get_company_dict()

        for comp in share_dict:
            try:
                self.api.update_status(
                    f'Since {share_dict[comp]["handle"]} mentioned {comp.upper()}, {share_dict[comp]["day"]} days ago, '
                    f'their shares have changed from {share_dict[comp]["initialSharePrice"]:.2f} to '
                    f"{share_dict[comp]['currentSharePrice']:} that's a {share_dict[comp]['shareChange']:.3f}% change!"
                    )

            except tweepy.TweepError as error:
                logging.debug(error)
tweeapi.py 文件源码 项目:tweetopo 作者: zthxxx 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_friends(self, callback, pages_limit=0):
        api = self._api
        user = self._user
        if user.friends_count > _FRIENDS_COUNT_MAX_:
            logging.warning('The user [%d]-[%s] has too many [%d] friends!'
                            % (user.id, user.screen_name, user.friends_count))
            return
        cursor = tweepy.Cursor(api.friends_ids, user_id=user.id, screen_name=user.screen_name)
        friends = []
        try:
            for friends_page in cursor.pages(pages_limit):
                friends.extend(friends_page)
            if callable(callback):
                callback(friends)
        except tweepy.TweepError as e:
            logging.warning([user.id, user.screen_name, e])
twittstopher-local.py 文件源码 项目:ewe_ebooks 作者: jaymcgrath 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, twitter_username):
        # TODO: Login to twitter for corpus generation using end user's credentials
        auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

        # Connect to Twitter - raise TweepError if we brick out on this
        try:
            api = tweepy.API(auth)
        except tweepy.TweepError:
            # TODO: make sure this error bubbles up and gets handled gracefully
            raise PermissionError("Twitter Auth failed")

        usr = api.get_user(twitter_username)
        self.username = twitter_username
        self.image = usr.profile_image_url
        # Exposes entire api - for debugging only
        # self.api = usr
        self.description = usr.description
        self.screen_name = usr.screen_name
        self.name = usr.name
twittstopher.py 文件源码 项目:ewe_ebooks 作者: jaymcgrath 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, twitter_username):
        # TODO: Login to twitter for corpus generation using end user's credentials
        auth = tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

        # Connect to Twitter - raise TweepError if we brick out on this
        try:
            api = tweepy.API(auth)
        except tweepy.TweepError:
            # TODO: make sure this error bubbles up and gets handled gracefully
            raise PermissionError("Twitter Auth failed")

        usr = api.get_user(twitter_username)
        self.username = twitter_username
        self.image = usr.profile_image_url
        # Exposes entire api - for debugging only
        # self.api = usr
        self.description = usr.description
        self.screen_name = usr.screen_name
        self.name = usr.name
posters.py 文件源码 项目:meteosangue 作者: meteosangue 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def tweet_status(status, image_path=None):
    try:
        auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET)
        auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET)
        api = tweepy.API(auth)
        if image_path:
            new_tweet = api.update_with_media(image_path, status=status)
        else:
            new_tweet = api.update_status(status)
    except tweepy.TweepError as ex:
        raise MeteoSangueException(ex)

    try:
        mention = '{0} {1}'.format(
            ' '.join([ass['twitter_id'] for ass in settings.BLOOD_ASSOCIATIONS if 'twitter_id' in ass]),
            'Nuovo bollettino meteo ?',
        )
        api.update_status(mention, in_reply_to_status_id=new_tweet.id)
    except tweepy.TweepError as ex:
        pass #Mention is allowed to fail silently
get_ids.py 文件源码 项目:TwitterBotClassifier 作者: JVogel27 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def check_human_accounts():
    api = get_api(key3[0], key3[1], key3[2], key3[3])
    ids = []
    try:
        with open('..//humans.txt', 'r') as f:
            for line in f:
                ids.append(line.rstrip())
                ids.append('\n')

        os.remove('..//humans.txt')

        with open('..//existing_humans_copy.txt', 'r') as f_read:
            with open('..//humans.txt', 'w') as f_write:
                f_write.write(''.join(ids))
                for line in f_read:
                    user = api.get_user(line.rstrip())
                    print(line)
                    if not user.protected:
                        f_write.write(line)
    except tweepy.TweepError:
        f_read.close()
        f_write.close()
get_ids.py 文件源码 项目:TwitterBotClassifier 作者: JVogel27 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_bot_accounts():
    api = get_api(key2[0], key2[1], key2[2], key2[3])
    ids = []

    try:
        with open('..//bots.txt', 'r') as f:
                for line in f:
                    ids.append(line.rstrip())
                    ids.append('\n')

        os.remove('..//bots.txt')
        with open('..//existing_bots_copy.txt', 'r') as f_read:
            with open('..//bots.txt', 'w') as f_write:
                f_write.write(''.join(ids))
                for line in f_read:
                    user = api.get_user(line.rstrip())
                    print(line)
                    if not user.protected:
                        f_write.write(line)
    except tweepy.TweepError:
        f_read.close()
        f_write.close()
data_from_ids.py 文件源码 项目:TwitterBotClassifier 作者: JVogel27 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_data_from_humans():
    try:
        x = []
        with open('..//human_x.txt', 'r') as f:
            reader = csv.reader(f)
            for row in reader:
                x.append(row)

        os.remove('..//human_x.txt')

        with open('..//humans_copy.txt', 'r') as f_read:
            with open('..//human_x.txt', 'w') as f_write:
                writer = csv.writer(f_write, lineterminator='\n')
                for row in x:
                    writer.writerow(row)
                for count, line in enumerate(f_read):

                    row = get_data(line.rstrip(), api)

                    writer.writerow(row)

                    print(line)

    except tweepy.TweepError:
        f_write.close()
twitter_search.py 文件源码 项目:twitter_search 作者: agalea91 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tweet_search(api, query, max_tweets, max_id, since_id, geocode):
    ''' Function that takes in a search string 'query', the maximum
        number of tweets 'max_tweets', and the minimum (i.e., starting)
        tweet id. It returns a list of tweepy.models.Status objects. '''

    searched_tweets = []
    while len(searched_tweets) < max_tweets:
        remaining_tweets = max_tweets - len(searched_tweets)
        try:
            new_tweets = api.search(q=query, count=remaining_tweets,
                                    since_id=str(since_id),
                                    max_id=str(max_id-1))
#                                    geocode=geocode)
            print('found',len(new_tweets),'tweets')
            if not new_tweets:
                print('no tweets found')
                break
            searched_tweets.extend(new_tweets)
            max_id = new_tweets[-1].id
        except tweepy.TweepError:
            print('exception raised, waiting 15 minutes')
            print('(until:', dt.datetime.now()+dt.timedelta(minutes=15), ')')
            time.sleep(15*60)
            break # stop the loop
    return searched_tweets, max_id
handlers.py 文件源码 项目:cyphon 作者: dunbarcyber 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process_request(self, obj):
        """Convert a ReservoirQuery into an API request and get the response.

        Parameters
        ----------
        obj : |ReservoirQuery|

        Returns
        -------
        |Cargo|

        """
        try:
            # TODO(LH): handle rate limit
            cursor = self._get_statuses(obj)
            data = [status._json for status in cursor.items()]
            status_code = 200
        except tweepy.TweepError as error:
            data = []
            status_code = error.api_code

        return Cargo(status_code=status_code, data=data)
trusted_source_extractor.py 文件源码 项目:aihackathon 作者: nicoheidtke 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def extract_tweets_from_a_source(self, source):
        if '@' not in source:
            source = '@' + source

        extracted_tweets = []
        try:
            print "Extracting %s..." % source
            max_twitter_id = None
            while True:
                tw = self.API.user_timeline(screen_name=source, count=self.N_TWEETS_PER_REQUEST, max_id=max_twitter_id)
                if not len(tw):
                    break

                extracted_tweets += [self.get_filtered_tweet(t) for t in tw]
                earliest_tweet_date = extracted_tweets[-1]['created_at']
                if earliest_tweet_date < self.from_time:
                    break

                max_twitter_id = extracted_tweets[-1]['id'] - 1
        except TweepError:
            print "Error processing", source

        print "\textracted %d tweets for %s" % (len(extracted_tweets), source)
        self.TWEET_STORAGE += extracted_tweets
        return extracted_tweets
add_twitter_resource.py 文件源码 项目:measure 作者: okfn 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _handle_twitter_rate_limit(cursor):
    '''Handle twitter rate limits. If rate limit is reached, the next element
    will be accessed again after sleep time'''
    while True:
        try:
            yield cursor.next()
        except tweepy.RateLimitError:
            log.info('Twitter API rate limit error. Sleeping for {} secs.') \
                .format(TWITTER_API_RATE_LIMIT_PERIOD)
            sleep_time = TWITTER_API_RATE_LIMIT_PERIOD
            time.sleep(sleep_time)
        except tweepy.TweepError as e:
            if str(e.api_code) == TWITTER_API_USER_NOT_FOUND_ERROR_CODE:
                raise ValueError(
                    'Requested user was not found. Check your configuration')
            raise e
twitter.py 文件源码 项目:MoodBot 作者: Bonanashelby 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def tweet_grab(self, query, count):
        tweets = []
        try:
            tweets_fetched = self.api.search(q=query, count=count)

            for tweet in tweets_fetched:
                tweets_parsed = {}
                tweets_parsed['text'] = tweet.text
                tweets_parsed['sentiment'] = self.tweet_sentiment(tweet.text)

                if tweet.retweet_count > 0:
                    if tweets_parsed not in tweets:
                        tweets.append(tweets_parsed)
                else:
                    tweets.append(tweets_parsed)
            return tweets
        except tweepy.TweepError: # pragma no cover
            print('Error : ' + str(tweepy.TweepError))
views.py 文件源码 项目:tweet-analysis 作者: D4D3VD4V3 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def twittersignin():
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    try:
        redirect_url = auth.get_authorization_url()
        session["request_token"] = auth.request_token

    except tweepy.TweepError:
        flash("Failed to get request token", "danger")
        return redirect(url_for("bp.home"))
    return redirect(redirect_url)
views.py 文件源码 项目:plexivity 作者: mutschler 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def twitter():
    auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg",  "http://"+ request.environ["HTTP_HOST"] + "/auth/twitter")
    auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET)

    api = tweepy.API(auth)
    try:
         if api.me().name:
             return redirect(url_for('index'))
    except tweepy.TweepError:
        pass

    redirect_url = auth.get_authorization_url()
    session["request_token"] = auth.request_token
    return redirect(redirect_url)
tweet_replyer.py 文件源码 项目:tensorflow_seq2seq_chatbot 作者: higepon 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def twitter_bot():
    # Only allocate part of the gpu memory when predicting.
    gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.2)
    tf_config = tf.ConfigProto(gpu_options=gpu_options)

    consumer_key = os.getenv("consumer_key")
    consumer_secret = os.getenv("consumer_secret")
    access_token = os.getenv("access_token")
    access_token_secret = os.getenv("access_token_secret")

    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    api = tweepy.API(auth)
    with tf.Session(config=tf_config) as sess:
        predictor = predict.EasyPredictor(sess)

        for tweet in tweets():
            status_id, status, bot_flag = tweet
            print("Processing {0}...".format(status.text))
            screen_name = status.author.screen_name
            replies = predictor.predict(status.text)
            if not replies:
                print("no reply")
                continue
            reply_body = replies[0]
            if reply_body is None:
                print("No reply predicted")
            else:
                try:
                    post_reply(api, bot_flag, reply_body, screen_name, status_id)
                except tweepy.TweepError as e:
                    # duplicate status
                    if e.api_code == 187:
                        pass
                    else:
                        raise
            mark_tweet_processed(status_id)
helpers.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def bulk_process(logger, output_dir, filename_tmpl, function, func_input, var_arg, resume=False):  # pylint: disable=too-many-arguments
    """Process a function in bulk using an iterable input and a variable argument."""
    if not path.exists(output_dir):
        makedirs(output_dir)
        logger.info("created output directory: %s", output_dir)
    num_processed = 0
    for basename, value in func_input:
        output_filename = path.join(output_dir, filename_tmpl % basename)

        # check if there is a previous processing and skip or resume it
        latest_id = None
        if path.exists(output_filename):
            if not resume:
                logger.warning("skipping existing file: %s", output_filename)
                continue
            latest_id = _get_latest_id(output_filename)

        # process the input element with the provided function
        try:
            logger.info("processing: %s", value)
            args = {var_arg: value}
            if latest_id is not None:
                args.update({"since_id": latest_id})
                logger.info("latest id processed: %d", latest_id)
            with open(output_filename, "a" if resume else "w") as writer:
                function(writer, **args)
            num_processed += 1
        except TweepError:
            logger.exception("exception while using the REST API")
    return num_processed
tweets.py 文件源码 项目:python-twitter-toolbox 作者: hhromic 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_timeline(writer, user_id=None, screen_name=None, since_id=0):
    """Get hydrated Tweet-objects from a user timeline."""
    LOGGER.info("get_timeline() starting")
    ensure_only_one(user_id=user_id, screen_name=screen_name)

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process user id or screen name, storing returned Tweets in JSON format
    num_tweets = 0
    args = {"count": TIMELINE_COUNT}
    if user_id is not None:
        args.update({"user_id": user_id})
    if screen_name is not None:
        args.update({"screen_name": screen_name})
    if since_id > 0:
        args.update({"since_id": since_id})
    limit = config.getint("timeline", "limit")
    try:
        num_tweets = write_objs(writer, api.user_timeline, args, cursored=True, limit=limit)
        LOGGER.info("downloaded %d Tweet(s)", num_tweets)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_timeline() finished")


问题


面经


文章

微信
公众号

扫码关注公众号