def login():
# Try to login or sleep/wait until logged in, or exit if user/pass wrong
NotLoggedIn = True
while NotLoggedIn:
try:
reddit = praw.Reddit(
user_agent=credsUserAgent,
client_id=credsClientID,
client_secret=credsClientSecret,
username=credsUserName,
password=credsPassword)
print_and_log("Logged in")
NotLoggedIn = False
except praw.errors.InvalidUserPass:
print_and_log("Wrong username or password", error=True)
exit(1)
except Exception as err:
print_and_log(str(err), error=True)
time.sleep(5)
return reddit
python类Reddit()的实例源码
def read_login_info():
""" Read all login information from ../data/config.ini """
config = ConfigParser.ConfigParser()
config.read('../data/config.ini')
client_id = config.get('API_INFO', 'id')
client_secret = config.get('API_INFO', 'secret')
password = config.get('API_INFO', 'password')
user_agent = config.get('API_INFO', 'user_agent')
username = config.get('API_INFO', 'username')
""" If No Api key info in config file: warn user """
ensure_valid_config(client_id)
reddit_api = praw.Reddit(client_id=client_id, client_secret=client_secret,
password=password, user_agent=user_agent,
username=username)
return reddit_api
def build_api():
data = datatools.get_data()
if "reddit_api_user_agent" not in data["discord"]["keys"] or \
"reddit_api_client_id" not in data["discord"]["keys"] or \
"reddit_api_client_secret" not in data["discord"]["keys"]:
logger.warning("For gamedeals to work, please make sure \"reddit_api_user_agent\", " +
"\"reddit_api_client_id\", and \"reddit_api_client_secret\" are all added as API keys")
logger.debug("Building Reddit API")
try:
global redditapi
redditapi = praw.Reddit(
user_agent=data["discord"]["keys"]["reddit_api_user_agent"],
client_id=data["discord"]["keys"]["reddit_api_client_id"],
client_secret=data["discord"]["keys"]["reddit_api_client_secret"])
logger.debug("Build successfull")
return True
except:
logger.warning("Error connecting to Reddit API, Reddit won't be available")
return False
def init():
connection = MongoClient(secret.mongo_url, secret.mongo_port)
db = connection[secret.mongo_db]
db.authenticate(secret.mongo_user, urllib.quote_plus(secret.mongo_pass))
r = praw.Reddit(user_agent="Samachar Bot for /r/india by /u/sallurocks")
scopes = {u'edit', u'submit', u'read', u'privatemessages', u'identity', u'history'}
oauth_helper = PrawOAuth2Mini(r, app_key=secret.news_app_key,
app_secret=secret.news_app_secret,
access_token=secret.news_access_token,
refresh_token=secret.news_refresh_token, scopes=scopes)
init_object = {'db': db,
'reddit': r,
'oauth': oauth_helper,
'goose': Goose()}
return init_object
def login(self, configuration):
credentials = configuration.get_credentials()
user_agent = configuration.get_user_agent()
reddit = praw.Reddit(
client_id=credentials["client_id"],
client_secret=credentials["client_secret"],
username=credentials["username"],
password=credentials["password"],
user_agent=user_agent
)
if reddit.user.me() == credentials["username"]:
self.reddit = reddit
return True
else:
raise Exception("LoginException")
def top_scorers(self, league_url, league_name):
cprint("\nRetrieving Top Scorers for " + league_name + "...", 'yellow')
cprint("Rank Player Team Goals", 'red', attrs=['bold', 'underline'])
r = requests.get(league_url)
soup = BeautifulSoup(r.content, "lxml")
try:
for x in range(0, 15):
player_name = soup.find_all("td", {"class": "player large-link"})[x].text
team_name = soup.find_all("td", {"class": "team large-link"})[x].text
goals_count = soup.find_all("td", {"class": "number goals"})[x].text
if x < 9:
topscorer = "{:<20}".format(player_name), space, "{:<15}".format(team_name), space, goals_count
cprint(" " + str(x + 1) + ". " + str("".join(topscorer)), 'blue')
elif 9 <= x:
topscorer = "{:<20}".format(player_name), space, "{:<15}".format(team_name), space, goals_count
cprint(" " + str(x + 1) + ". " + str("".join(topscorer)), 'blue')
except IndexError:
pass
# Scrapes subreddit in Reddit to display URLs to live streams (when available) based on user input
def test():
top_posts = get_top_posts(limit=5)
sorted_comments = get_top_comments(top_posts, limit=20)
authors = extract_authors(sorted_comments)
sorted_authors = calculate_karma(authors, limit=50)
write_authors_to_file(sorted_authors)
# Don't accidentally override author's file!!
# def try_until_works(function):
# while True:
# try:
# finction()
# break
# except urllib2.HTTPError, e:
# if e.code in [429, 500, 502, 503, 504]:
# print "Reddit is down (error %s), sleeping..." % e.code
# time.sleep(60)
# pass
# else:
# raise
# except Exception, e:
# print "couldn't Reddit: %s" % str(e)
# raise
def configure_tor(config):
"""
Assembles the tor object based on whether or not we've enabled debug mode
and returns it. There's really no reason to put together a Subreddit
object dedicated to our subreddit -- it just makes some future lines
a little easier to type.
:param r: the active Reddit object.
:param config: the global config object.
:return: the Subreddit object for the chosen subreddit.
"""
if config.debug_mode:
tor = config.r.subreddit('ModsOfToR')
else:
# normal operation, our primary subreddit
tor = config.r.subreddit('transcribersofreddit')
return tor
def get_subreddit_content(subreddit_name, error_msg):
try:
reddit_inst = praw.Reddit(client_id=client_id,
client_secret=client_secret,
password=password,
user_agent='contentscript by /u/pimpdaddyballer',
username=username)
puppies_subreddit = reddit_inst.subreddit(subreddit_name)
return puppies_subreddit.random().url
except praw.exceptions.ClientException as e:
print(e)
return None
except praw.exceptions as e:
print(e)
return error_msg
def __init__(self, subreddit=None, multireddit=None, days=2):
"""Initialize the SubredditStats instance with config options."""
self.submissions = []
self.comments = []
# extract post with at least 2 ours and less the <days+1> days
now_utc = calendar.timegm(time.gmtime())
self.max_date_thread = now_utc - HOUR_IN_SECONDS * 2
self.min_date_thread = now_utc - DAY_IN_SECONDS * (days + 1)
self.min_date = now_utc - DAY_IN_SECONDS * days
self.reddit = Reddit(check_for_updates=False, user_agent=AGENT)
if subreddit:
self.subreddit = self.reddit.subreddit(subreddit)
elif multireddit:
self.subreddit = self.reddit.multireddit(*multireddit)
else:
raise ValueError('Specify subreddit or multireddit')
change_wallpaper_reddit.py 文件源码
项目:Daily-Reddit-Wallpaper
作者: ssimunic
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def parse_args():
"""parse args with argparse
:returns: args
"""
parser = argparse.ArgumentParser(description="Daily Reddit Wallpaper")
parser.add_argument("-s", "--subreddit", type=str, default=config["subreddit"],
help="Example: art, getmotivated, wallpapers, ...")
parser.add_argument("-t", "--time", type=str, default=config["time"],
help="Example: new, hour, day, week, month, year")
parser.add_argument("-n", "--nsfw", action='store_true', default=config["nsfw"], help="Enables NSFW tagged posts.")
parser.add_argument("-d", "--display", type=int, default=config["display"],
help="Desktop display number on OS X (0: all displays, 1: main display, etc")
parser.add_argument("-o", "--output", type=str, default=config["output"],
help="Set the outputfolder in the home directory to save the Wallpapers to.")
args = parser.parse_args()
return args
def main():
print("Loading cache")
seen_posts = cache.load_cached_storage(cache_file, default_size=30)
print("Connecting to reddit")
r = praw.Reddit(user_agent=user_agent)
sub = r.get_subreddit(subreddit, fetch=False)
print("Getting new")
new = sub.get_new(limit=10)
new = seen_posts.get_diff(new)
print("New posts:")
print(new)
for post in new:
msg = create_post_message(post)
if msg:
print()
print(msg)
send_message(msg)
def play_dice(wager_amount):
die_result = roll_die()
if (die_result > 2):
outcome = SG_Repository.WagerOutcome.WIN
winnings = wager_amount * pay_table[die_result]
else:
outcome = SG_Repository.WagerOutcome.LOSE
winnings = 0
wager_result = {'die_roll_result' : die_result, 'outcome' : outcome,
'winnings' : winnings}
print("All-or-nothing die roll wager result:")
pprint.pprint(wager_result)
return wager_result
# create our Reddit instance
def play_6_9(wager_amount):
roll_1 = roll_die()
roll_2 = roll_die()
if (roll_1 + roll_2 == 6) or (roll_1 + roll_2 == 9) or (roll_1 + roll_2 == 3):
outcome = SG_Repository.WagerOutcome.WIN
winnings = wager_amount * payout_factor
else:
outcome = SG_Repository.WagerOutcome.LOSE
winnings = 0
wager_result = {'roll_1' : roll_1, 'roll_2' : roll_2, 'outcome' : outcome,
'winnings' : winnings, 'roll_total' : (roll_1 + roll_2)}
print("6-9 wager result:")
pprint.pprint(wager_result)
return wager_result
# create our Reddit instance
def play_coin_toss(wager_amount):
coin_result = flip_coin()
if (coin_result == "HEADS"):
outcome = SG_Repository.WagerOutcome.WIN
winnings = wager_amount * 2
else:
outcome = SG_Repository.WagerOutcome.LOSE
winnings = 0
wager_result = {'toss_result' : coin_result, 'outcome' : outcome,
'winnings' : winnings}
print("Coin toss wager result:")
pprint.pprint(wager_result)
return wager_result
# create our Reddit instance
def play_3_6_9(wager_amount):
roll_1 = roll_die()
roll_2 = roll_die()
if (roll_1 + roll_2 == 6) or (roll_1 + roll_2 == 9) or (roll_1 + roll_2 == 3):
outcome = SG_Repository.WagerOutcome.WIN
winnings = wager_amount * payout_factor
else:
outcome = SG_Repository.WagerOutcome.LOSE
winnings = 0
wager_result = {'roll_1' : roll_1, 'roll_2' : roll_2, 'outcome' : outcome,
'winnings' : winnings, 'roll_total' : (roll_1 + roll_2)}
# print("6-9 wager result:")
# pprint.pprint(wager_result)
return wager_result
# create our Reddit instance
def init(logger, responseWhitelist, user):
global reddit
global log
global whitelist
try:
reddit = praw.Reddit(
user,
user_agent=globals.USER_AGENT)
except configparser.NoSectionError:
log.error("User "+user+" not in praw.ini, aborting")
return False
globals.ACCOUNT_NAME = str(reddit.user.me())
log = logger
whitelist = responseWhitelist
log.info("Logged into reddit as /u/" + globals.ACCOUNT_NAME)
return True
def _authenticate(self):
response = requests.post(
'https://www.reddit.com/api/login',
{'user': self._username, 'passwd': self._password},
headers = _FAKE_HEADERS
)
self._cookies = response.cookies
if self._client_id:
self._reddit = praw.Reddit(user_agent=USER_AGENT,
client_id=self._client_id,
client_secret=self._client_secret,
username=self._username,
password=self._password)
def authenticate(request, username=None, code=None):
try:
return RedditUser.objects.get(username=username)
except RedditUser.DoesNotExist:
if code:
print('Creating new RedditUser')
with open('secret.json', 'r') as f:
secret = json.load(f)
reddit = praw.Reddit(client_id=secret['client_id'], client_secret=secret['client_secret'],
redirect_uri='http://localhost:8000/callback',
user_agent='Plan-Reddit by /u/SkullTech101')
token = reddit.auth.authorize(code)
user = RedditUser(username=str(reddit.user.me()), token=token)
user.save()
return user
return None
def auth_reddit():
global reddit
print('Attempting to authenticate with reddit...')
reddit = praw.Reddit(reddit_app_ua)
reddit.set_oauth_app_info(reddit_app_id, reddit_app_secret, reddit_app_uri)
reddit.refresh_access_information(reddit_app_refresh)
print('Success')
# connects to imgur with pyimgur
def __init__(self, *, subreddits, iniSite='bot',
newLimit=25, sleep=30, connectAttempts=1,
scopes=('submit', 'privatemessages', 'read', 'identity'),
dbName='praww.db'):
"""Create an instance of Reddit. Does not yet connect.
:param subreddits: list of subreddits to read
:param iniSite: see PRAW config docs using praw.ini (default: bot)
:param newLimit: number of entries to read (default: 25)
:param sleep: read reddit every n seconds (default: 30)
:param connectAttempts: attempt initial connection n times and
sleep 2^n sec between attempts (default: 1)
:param scopes: required scopes
:param dbName: name of file of seen-things db
"""
self.killed = False
signal.signal(signal.SIGTERM, self.__catchKill)
self.__subreddits = '+'.join(subreddits)
self.iniSite = iniSite
self.newLimit = newLimit
self.sleep = sleep
self.connectAttempts = connectAttempts
self.scopes = scopes
self.dbName = dbName
self.rateSleep = 0
self.roundStart = 0
# restart after 15 min of consecutive fails
self.__failLimit = 15*60 // max(sleep, 1)
# use with() setter
self.__commentListener = None
self.__submissionListener = None
self.__mentionListener = None
self.__pmListener = None
def __connect(self):
connectTry = 1
while True:
try:
log.debug("connect() creating reddit adapter")
self.r = praw.Reddit(self.iniSite)
# connect and check if instance has required scopes
for scope in self.scopes:
if scope not in self.r.auth.scopes():
raise Exception('reddit init missing scope', scope)
self.me = self.r.user.me()
log.debug('connect() logged into reddit as: %s', self.me)
return
except prawcore.exceptions.RequestException as e:
log.exception('connect() failed to send request')
if connectTry >= self.connectAttempts:
log.error('connect() connection attempt %s failed', connectTry)
raise Exception('failed to connect')
log.warn('connect() connection attempt %s failed', connectTry)
# sleep up to 2^try sec before failing (3 trys = 6s)
for s in range(2 ** connectTry):
if self.killed:
raise Exception('killed')
time.sleep(1)
connectTry += 1
def __init__(self, cache_size=500, maxcache_day=1,
popular_type="Daily", timeout=20):
""" The __init__ method for MemeGenerator class
Args:
cache_size (int): Number of memes stored as cache
maxcache_day (int): Number of days until the cache expires
"""
super(RedditMemes, self).__init__(
"https://www.reddit.com/r/memes/", cache_size, maxcache_day)
self._origin = Origins.REDDITMEMES
# Client ID and user agent requested by Reddit API
config = configparser.ConfigParser()
cdir = os.path.dirname(os.path.realpath(__file__))
config.read(os.path.join(cdir, 'config.ini'))
self._client_id = config['Reddit']['ClientID']
self._client_secret = config['Reddit']['ClientSecret']
if self._client_secret == '':
self._client_secret = None
self._user_agent = config['Reddit']['UserAgent'].format(sys.platform)
# Generate a Reddit instance
self._reddit = praw.Reddit(client_id=self._client_id,
client_secret=self._client_secret,
user_agent=self. _user_agent)
if self._no_cache() or self._cache_expired():
self._build_cache()
def get_memes(self, num):
""" Get memes from Reddit /r/meme subreddit
"""
if self._no_cache() or self._cache_expired():
self._build_cache()
else:
self._update_with_cache()
if self._cache_size >= num:
return self._pop_memes(num)
else:
# Haven't found a way to get memes
# in a specific range using PRAW
results = self._reddit.subreddit('memes').hot(limit=num)
meme_results = []
for submission in results:
# Get required properties for the memes
ctitle = submission.title
curl = submission.url
cmeme = Meme(curl, datetime.datetime.now(),
title=ctitle,
origin=Origins.REDDITMEMES)
meme_results.append(cmeme)
return meme_results
def init_reddit(self):
"""Initialize the reddit instance"""
# Get reddit instance
self.r = praw.Reddit(user_agent=self.reddit.user_agent,
client_id=self.reddit.client_id,
client_secret=self.reddit.secret,
username=self.reddit.user_name,
password=self.reddit.password)
# Get r/all/ instance
self.r_all = self.r.subreddit(self.reddit.subreddit)
# Get r/R_I_P instance
self.r_R_I_P = self.r.subreddit('R_I_P')
def __init__(self, sparcli):
self.sparcli = sparcli
tokens = getTokens()['Reddit']
# If the tokens are not set up properly, this command will fail right here
self.reddit = praw.Reddit(
client_id=tokens['ID'],
client_secret=tokens['Secret'],
user_agent=tokens['User Agent']
)
self.getRedditor = lambda name: self.reddit.redditor(name)
self.getSubreddit = lambda name: self.reddit.subreddit(name)
self.redditIcon = 'http://rawapk.com/wp-content/uploads/2016/04/Reddit-The-Official-App-Icon.png'
def on_command(self, command, ctx):
if command.cog_name == 'Reddit':
await self.sparcli.send_typing(ctx.message.channel)
def setup(bot):
bot.add_cog(Reddit(bot))
def authenticate():
"""
Logs into reddit, using credentials from praw.ini file
:return: Instance of Reddit
"""
print('Authenticating')
reddit = praw.Reddit('TsubasaRedditBot')
print("Authenticated as {}".format(reddit.user.me()))
return reddit
def login():
return praw.Reddit(username=config.reddit_username,
password=config.reddit_password,
client_id=config.reddit_client_id,
client_secret=config.reddit_client_secret,
user_agent='agent')