def __search(self, titles, year):
try:
n = cache.get(self.__get_nonce, 24)
query = self.search_link % (urllib.quote_plus(cleantitle.query(titles[0])), n)
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = client.request(query)
r = json.loads(r)
r = [(r[i].get('url'), r[i].get('title'), r[i].get('extra').get('date')) for i in r]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] in y][0]
return source_utils.strip_domain(r)
except:
return
python类quote_plus()的实例源码
def __search(self, search_link, imdb, titles):
try:
query = search_link % (urllib.quote_plus(cleantitle.query(titles[0])))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) for i in set(titles) if i]
r = client.request(query)
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'big-list'})
r = dom_parser.parse_dom(r, 'table', attrs={'class': 'row'})
r = dom_parser.parse_dom(r, 'td', attrs={'class': 'list-name'})
r = dom_parser.parse_dom(r, 'a', req='href')
r = [i.attrs['href']for i in r if i and cleantitle.get(i.content) in t][0]
url = source_utils.strip_domain(r)
r = client.request(urlparse.urljoin(self.base_link, url))
r = dom_parser.parse_dom(r, 'a', attrs={'href': re.compile('.*/tt\d+.*')}, req='href')
r = [re.findall('.+?(tt\d+).*?', i.attrs['href']) for i in r]
r = [i[0] for i in r if i]
return url if imdb in r else None
except:
return
def __search(self, titles, year):
try:
query = self.search_link % (urllib.quote_plus(cleantitle.query(titles[0])))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) for i in set(titles) if i]
r = client.request(query)
r = dom_parser.parse_dom(r, 'div', attrs={'id': 'main'})
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'panel-body'})
r = [(dom_parser.parse_dom(i.content, 'h4', attrs={'class': 'title-list'}), dom_parser.parse_dom(i.content, 'a', attrs={'href': re.compile('.*/year/.*')})) for i in r]
r = [(dom_parser.parse_dom(i[0][0].content, 'a', req='href'), i[1][0].content if i[1] else '0') for i in r if i[0]]
r = [(i[0][0].attrs['href'], i[0][0].content, re.sub('<.+?>|</.+?>', '', i[1])) for i in r if i[0] and i[1]]
r = [(i[0], i[1], i[2].strip()) for i in r if i[2]]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] == year][0]
return source_utils.strip_domain(r)
except:
return
def __search(self, titles, year):
try:
query = self.search_link % (urllib.quote_plus(cleantitle.query(titles[0])))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = client.request(query)
r = dom_parser.parse_dom(r, 'article')
r = [(dom_parser.parse_dom(i, 'div', attrs={'class': 'title'}), dom_parser.parse_dom(i, 'span', attrs={'class': 'year'})) for i in r]
r = [(dom_parser.parse_dom(i[0][0], 'a', req='href'), i[1][0].content) for i in r if i[0] and i[1]]
r = [(i[0][0].attrs['href'], i[0][0].content, i[1]) for i in r if i[0]]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] in y][0]
return source_utils.strip_domain(r)
except:
return
def __search(self, titles, episode):
try:
query = self.search_link % urllib.quote_plus(cleantitle.query(titles[0]) + ' ' + str(episode))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) + str(episode) for i in set(titles) if i]
r = client.request(query)
r = r.split('</style>')[-1].strip()
r = json.loads(r)
r = [(i.get('title', {}).get('rendered'), i.get('content', {}).get('rendered')) for i in r]
r = [(re.sub('ger (?:sub|dub)', '', i[0], flags=re.I).strip(), i[1]) for i in r if i[0] and i[1]]
r = [(i[0], re.findall('(.+?) (\d*)$', i[0]), i[1]) for i in r]
r = [(i[0] if not i[1] else i[1][0][0] + ' ' + str(int(i[1][0][1])), i[2]) for i in r]
r = [dom_parser.parse_dom(i[1], 'div') for i in r if cleantitle.get(i[0]) in t]
r = [[x.attrs['href'] for x in dom_parser.parse_dom(i, 'a', req='href')] + [x.attrs['src'] for x in dom_parser.parse_dom(i, 'iframe', req='src')] for i in r]
return r[0]
except:
return
def __search(self, titles, year):
try:
query = self.search_link % (urllib.quote_plus(cleantitle.query(titles[0])))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = client.request(query)
r = dom_parser.parse_dom(r, 'div', attrs={'class': 'details'})
r = [(dom_parser.parse_dom(i, 'div', attrs={'class': 'title'}), dom_parser.parse_dom(i, 'span', attrs={'class': 'year'})) for i in r]
r = [(dom_parser.parse_dom(i[0][0], 'a', req='href'), i[1][0].content) for i in r if i[0] and i[1]]
r = [(i[0][0].attrs['href'], i[0][0].content, i[1]) for i in r if i[0]]
r = sorted(r, key=lambda i: int(i[2]), reverse=True) # with year > no year
r = [i[0] for i in r if cleantitle.get(i[1]) in t and i[2] in y][0]
return source_utils.strip_domain(r)
except:
return
def __search(self, titles, imdb, year):
try:
query = self.search_link % (urllib.quote_plus(cleantitle.query(titles[0])))
query = urlparse.urljoin(self.base_link, query)
t = [cleantitle.get(i) for i in set(titles) if i]
y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']
r = client.request(query, XHR=True)
r = json.loads(r)
r = [(i.get('title'), i.get('custom_fields', {})) for i in r.get('posts', [])]
r = [(i[0], i[1]) for i in r if i[0] and i[1]]
r = [(i[0], i[1].get('Streaming', ['']), i[1].get('Jahr', ['0']), i[1].get('IMDb-Link', [''])) for i in r if i]
r = [(i[0], i[1][0], i[2][0], re.findall('.+?(tt\d+).*?', i[3][0])) for i in r if i[0] and i[1] and i[2] and i[3]]
r = [i[1] for i in r if imdb in i[3] or (cleantitle.get(i[0]) in t and i[2] in y)][0]
return source_utils.strip_domain(r)
except:
return
def init():
connection = MongoClient(secret.mongo_url, secret.mongo_port)
db = connection[secret.mongo_db]
db.authenticate(secret.mongo_user, urllib.quote_plus(secret.mongo_pass))
r = praw.Reddit(user_agent="Samachar Bot for /r/india by /u/sallurocks")
scopes = {u'edit', u'submit', u'read', u'privatemessages', u'identity', u'history'}
oauth_helper = PrawOAuth2Mini(r, app_key=secret.news_app_key,
app_secret=secret.news_app_secret,
access_token=secret.news_access_token,
refresh_token=secret.news_refresh_token, scopes=scopes)
init_object = {'db': db,
'reddit': r,
'oauth': oauth_helper,
'goose': Goose()}
return init_object
def urlencode_utf8(params):
"""
UTF-8 safe variant of urllib.urlencode.
http://stackoverflow.com/a/8152242
"""
if hasattr(params, 'items'):
params = params.items()
params = (
'='.join((
urllib.quote_plus(k.encode('utf8'), safe='/'),
urllib.quote_plus(v.encode('utf8'), safe='/')
)) for k, v in params
)
return '&'.join(params)
def do_command(self, verb, args):
conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
try:
body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
for i in range(len(args)):
body += '&' + unicode(i+1) + '=' + \
urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
if (None != self.sessionId):
body += "&sessionId=" + unicode(self.sessionId)
headers = {
"Content-Type":
"application/x-www-form-urlencoded; charset=utf-8"
}
conn.request("POST", "/selenium-server/driver/", body, headers)
response = conn.getresponse()
data = unicode(response.read(), "UTF-8")
if (not data.startswith('OK')):
raise Exception(data)
return data
finally:
conn.close()
def get_url_config(_options, data = None):
# prepare arguments
kargs = {}
# proxy
if _options.proxy:
#proxyUrl = _options.proxy_server.partition(':')
proxyUrl = urlparse.urlparse(_options.proxy_server)
kargs['proxy'] = { "scheme": proxyUrl.scheme,
"netloc": proxyUrl.netloc }
if _options.proxy_user != None:
kargs['proxy']['user'] = _options.proxy_user
kargs['proxy']['password'] = _options.proxy_pwd
# authentication
if _options.auth_mode == AUTHENTICATION_MODE_BASIC:
kargs['authentication'] = { 'mode' : 'basic',
'user' : _options.user,
'password': _options.pwd }
# headers
kargs['headers'] = {"X-Client-Id" : get_client_artefact(),
"X-Client-Version": urllib.quote_plus(get_client_version())}
# data
if data != None:
kargs['data'] = data
return kargs
def try_redirect_on_error(http_object, request, ticket=None):
"""Called from main.wsgibase to rewrite the http response"""
status = int(str(http_object.status).split()[0])
if status > 399 and THREAD_LOCAL.routes.routes_onerror:
keys = set(('%s/%s' % (request.application, status),
'%s/*' % (request.application),
'*/%s' % (status),
'*/*'))
for (key, redir) in THREAD_LOCAL.routes.routes_onerror:
if key in keys:
if redir == '!':
break
elif '?' in redir:
url = '%s&code=%s&ticket=%s&requested_uri=%s&request_url=%s' % \
(redir, status, ticket,
urllib.quote_plus(request.env.request_uri),
request.url)
else:
url = '%s?code=%s&ticket=%s&requested_uri=%s&request_url=%s' % \
(redir, status, ticket,
urllib.quote_plus(request.env.request_uri),
request.url)
return HTTP(303, 'You are being redirected <a href="%s">here</a>' % url, Location=url)
return http_object
def strencode (data):
if not data: return data
if data.find ('%') != -1 or (data.find ('+') != -1 and data.find (' ') == -1):
return data
d = []
for x in data.split('&'):
try: k, v = x.split('=', 1)
except ValueError: d.append ((k, None))
else:
v = quote_plus (v)
d.append ((k, v))
d2 = []
for k, v in d:
if v == None:
d2.append (k)
else:
d2.append ('%s=%s' % (k, v))
return '&'.join (d2)
def strdecode (data, value_quote = 0):
if not data: return []
do_quote = 1
if data.find('%') > -1 or data.find('+') > -1:
do_quote = 0
if not value_quote:
do_quote = 0
d = []
for x in data.split(';'):
try: k, v = x.split('=', 1)
except ValueError: pass
else:
if do_quote:
v = quote_plus (v.strip())
d.append((k.strip(), v.strip()))
return d
def parse_search_page(self, response):
# handle current page
for item in self.parse_tweets_block(response.body):
yield item
# get next page
tmp = self.reScrollCursor.search(response.body)
if tmp:
query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
scroll_cursor = tmp.group(1)
url = 'https://twitter.com/i/search/timeline?q=%s&' \
'include_available_features=1&include_entities=1&max_position=%s' % \
(urllib.quote_plus(query), scroll_cursor)
yield http.Request(url, callback=self.parse_more_page)
# TODO: # get refresh page
# tmp = self.reRefreshCursor.search(response.body)
# if tmp:
# query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
# refresh_cursor=tmp.group(1)
def getYoutubeURLFromSearch(searchString):
if pyVersion < 3:
urlParse = urllib.quote_plus(searchString)
else:
urlParse = urllib.parse.quote_plus(searchString)
urlToGet = "https://www.youtube.com/results?search_query=" + urlParse # NOQA
r = get(urlToGet)
soup = BeautifulSoup(r.content, 'html.parser')
videos = soup.find_all('h3', class_='yt-lockup-title')
for video in videos:
link = video.find_all('a')[0]
url = "https://www.youtube.com" + link.get('href')
if 'googleads' in url:
continue
title = link.text
if 'doubleclick' in title:
continue
if 'list=' in url:
continue
if 'album review' in title.lower():
continue
return url
return ""
def fishbans(inp):
"fishbans <user> -- Gets information on <user>s minecraft bans from fishbans"
user = inp.strip()
try:
request = http.get_json(api_url.format(quote_plus(user)))
except (http.HTTPError, http.URLError) as e:
return "Could not fetch ban data from the Fishbans API: {}".format(e)
if request["success"] == False:
return "Could not fetch ban data for {}.".format(user)
user_url = "http://fishbans.com/u/{}/".format(user)
ban_count = request["stats"]["totalbans"]
return "The user \x02{}\x02 has \x02{}\x02 ban(s). See detailed info " \
"at {}".format(user, ban_count, user_url)
def do_command(self, verb, args):
conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
try:
body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
for i in range(len(args)):
body += '&' + unicode(i+1) + '=' + \
urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
if (None != self.sessionId):
body += "&sessionId=" + unicode(self.sessionId)
headers = {
"Content-Type":
"application/x-www-form-urlencoded; charset=utf-8"
}
conn.request("POST", "/selenium-server/driver/", body, headers)
response = conn.getresponse()
data = unicode(response.read(), "UTF-8")
if (not data.startswith('OK')):
raise Exception(data)
return data
finally:
conn.close()
def _get_results_page(self, set_type):
if set_type == LARGE_SET:
url = GoogleSets.URL_LARGE
else:
url = GoogleSets.URL_SMALL
safe_items = [urllib.quote_plus(i) for i in self.items]
blank_items = 5 - len(safe_items)
if blank_items > 0:
safe_items += ['']*blank_items
safe_url = url % tuple(safe_items)
try:
page = self.browser.get_page(safe_url)
except BrowserError, e:
raise GSError, "Failed getting %s: %s" % (e.url, e.error)
return BeautifulSoup(page)
def _get_results_page(self):
if self._page == 0:
if self._results_per_page == 10:
url = SponsoredLinks.SEARCH_URL_0
else:
url = SponsoredLinks.SEARCH_URL_1
else:
if self._results_per_page == 10:
url = SponsoredLinks.NEXT_PAGE_0
else:
url = SponsoredLinks.NEXT_PAGE_1
safe_url = url % { 'query': urllib.quote_plus(self.query),
'start': self._page * self._results_per_page,
'num': self._results_per_page }
try:
page = self.browser.get_page(safe_url)
except BrowserError, e:
raise SLError, "Failed getting %s: %s" % (e.url, e.error)
return BeautifulSoup(page)