def log_url(log, message, url, level = logging.DEBUG ):
"""Nicely logs the given url.
Print out the url with the first part (protocol, host, port, authority,
user info, path, ref) and in sequence all the query parameters.
log: the log into which write the message
message: a message to print before the url
url: the url to log
level: (optional) the log level to use"""
urls = url.split('?')
log.log( level, message + urllib2.unquote(urls[0]) )
if len(urls) > 1:
for a in sorted(urls[1].split('&')):
param = a.split('=')
if( len(param) < 2 ):
param.append('')
log.log( level, ' . %s = %s', urllib2.unquote(param[0]), urllib2.unquote(param[1]) )
python类unquote()的实例源码
def handle_starttag(self, tag, attrs):
if tag == 'h3' and attrs == [('class', 'r')]:
self.h3_flag = True
if tag == 'a' and self.h3_flag:
self.a_flag = True
if tag == 'b' and self.a_flag:
self.b_flag = True
if self.a_flag:
for (key, value) in attrs:
if key == 'href':
if value.startswith("/url?"):
m = match('/url\?(url|q)=(.+?)&', value)
if m and len(m.groups()) == 2:
href = urllib2.unquote(m.group(2))
self.link = href
else:
self.link = value
def openload_clean(string):
import urllib2
if "function" in string:
matches = re.findall(r"=\"([^\"]+).*?} *\((\d+)\)", string, re.DOTALL)[0]
def substr(char):
char = char.group(0)
number = ord(char) + int(matches[1])
if char <= "Z":
char_value = 90
else:
char_value = 122
if char_value >= number:
return chr(ord(char))
else:
return chr(number - 26)
string = re.sub(r"[A-z]", substr, matches[0])
string = urllib2.unquote(string)
return string
def openload_clean(string):
import urllib2
if "function" in string:
matches = re.findall(r"=\"([^\"]+).*?} *\((\d+)\)", string, re.DOTALL)[0]
def substr(char):
char = char.group(0)
number = ord(char) + int(matches[1])
if char <= "Z":
char_value = 90
else:
char_value = 122
if char_value >= number:
return chr(ord(char))
else:
return chr(number - 26)
string = re.sub(r"[A-z]", substr, matches[0])
string = urllib2.unquote(string)
return string
googlevideo.py 文件源码
项目:plugin.video.streamondemand-pureita
作者: orione7
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def _parse_gdocs(html):
urls = []
for match in re.finditer('\[\s*"([^"]+)"\s*,\s*"([^"]+)"\s*\]', html):
key, value = match.groups()
if key == 'fmt_stream_map':
items = value.split(',')
for item in items:
_source_itag, source_url = item.split('|')
if isinstance(source_url, unicode):
source_url = source_url.encode('utf-8')
source_url = source_url.decode('unicode_escape')
quality = itag_map.get(_source_itag, 'Unknown Quality [%s]' % _source_itag)
source_url = urllib2.unquote(source_url)
urls.append([quality, source_url])
return urls
return urls
googlevideo.py 文件源码
项目:plugin.video.streamondemand-pureita
作者: orione7
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def _extract_video(item):
sources = []
for e in item:
if isinstance(e, dict):
for key in e:
for item2 in e[key]:
if isinstance(item2, list):
for item3 in item2:
if isinstance(item3, list):
for item4 in item3:
if isinstance(item4, unicode):
item4 = item4.encode('utf-8')
if isinstance(item4, basestring):
item4 = urllib2.unquote(item4).decode('unicode_escape')
for match in re.finditer('url=(?P<link>[^&]+).*?&itag=(?P<itag>[^&]+)', item4):
link = match.group('link')
itag = match.group('itag')
quality = itag_map.get(itag, 'Unknown Quality [%s]' % itag)
sources.append([quality, link])
if sources:
return sources
return sources
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def fetch():
# Get keywords
kw = request.form.get('keyword')
if kw is not None:
kw = unquote(kw)
# Get parameters
src = request.form.get("src")
start = request.form.get("start")
if src is None or start is None:
# Error if 'src' or 'start' parameter is not found
return ""
assert "." not in src # Just for security
start = int(start)
# Get target date string
target_date = get_date_str(request.cookies.get('datetoken'))
num_page = 80 if src == "twitter" else NUMBER_EACH_PAGE
return render_template(
"post_{}.html".format(src),
posts=get_posts(src, keywords=kw, since=target_date, start=start, num=num_page))
def injection_test_results(response, TAG, randvcalc):
if response == False:
return False
else:
# Check the execution results
html_data = response.read()
html_data = html_data.replace("\n"," ")
# cleanup string / unescape html to string
html_data = urllib2.unquote(html_data).decode(settings.DEFAULT_CHARSET)
html_data = HTMLParser.HTMLParser().unescape(html_data).encode(sys.getfilesystemencoding())
# Replace non-ASCII characters with a single space
re.sub(r"[^\x00-\x7f]",r" ", html_data)
if settings.SKIP_CALC:
shell = re.findall(r"" + TAG + TAG + TAG, html_data)
else:
shell = re.findall(r"" + TAG + str(randvcalc) + TAG + TAG, html_data)
if len(shell) > 1:
shell = shell[0]
return shell
def proxy_open(self, req, proxy, type):
# This block is copied wholesale from Python2.6 urllib2.
# It is idempotent, so the superclass method call executes as normal
# if invoked.
orig_type = req.get_type()
proxy_type, user, password, hostport = self._parse_proxy(proxy)
if proxy_type is None:
proxy_type = orig_type
if user and password:
user_pass = "%s:%s" % (urllib2.unquote(user), urllib2.unquote(password))
creds = base64.b64encode(user_pass).strip()
# Later calls overwrite earlier calls for the same header
req.add_header("Proxy-authorization", "Basic " + creds)
hostport = urllib2.unquote(hostport)
req.set_proxy(hostport, proxy_type)
# This condition is the change
if orig_type == "https":
return None
return urllib2.ProxyHandler.proxy_open(self, req, proxy, type)
def obfuscation_unescape(page):
soup = BeautifulSoup(page, "lxml")
for scr in soup(["script"]):
if re.search('unescape', str(scr), re.IGNORECASE):
encoded = re.search("(?:%[0-9A-F][0-9A-F][^\"]+)", str(scr), re.IGNORECASE)
decoded_content = urllib2.unquote(encoded.group(0))
scr.replace_with(decoded_content)
decoded_page = soup.decode(formatter=None)
tmp_file = "/tmp/tmp.html"
with open (tmp_file, "wb") as temp_f:
temp_f.write(decoded_page)
temp_f.close()
try:
response = br.open('file://' + tmp_file)
global using_selenium
using_selenium = True
return response
except Exception:
return False
def __init__(self, *args, **kwargs):
super(Download, self).__init__()
self.url = None
self.dirs_obj = Directories()
self.search = Scraper()
# def download(self, link, platform):
# """
# Downloads the ROM
# """
# # platform = " ".join(rom_url.split('/')[3].replace('_', ' ').split()[:-1])
# target = self.dirs_obj.target_directory(self.download_location, platform)
#
# req = urllib2.Request(link)
# req.add_header('Referer', 'https://www.emuparadise.me/')
# file_name = urllib2.unquote(link.split('/')[-1])
# target_file_name = os.path.join(target, file_name)
# urllib.urlretrieve(link, target_file_name)
# f = urllib2.urlopen(link)
# with open(target_file_name, 'wb') as code:
# total_length = f.headers.get('content-length')
# if not total_length:
# code.write(f.content)
# else:
# total_length = int(total_length)
# while True:
# data = f.read(total_length / 100)
# if not data:
# break
# code.write(data)
#
# ex = Compression(location)
# ex.extract(target_file_name)
def download(self, result_item):
"""
Downloads a ROM.
:param result_item: ResultItem object.
"""
self.current_url = result_item.download_url
location = os.path.join(PlatformBase().download_location, result_item.system_dir)
# Check if the ROM directory exists, if not, create it.
if not os.path.exists(location):
os.makedirs(location)
req = urllib2.Request(self.base_url)
req.add_header('Referer', 'https://www.emuparadise.me/')
self.current_url = self.get_download_url()
filename = urllib2.unquote(self.current_url.split('/')[-1])
target_file_name = os.path.join(location, filename)
urllib.urlretrieve(self.current_url, target_file_name)
# with open(target_file_name, 'wb') as code:
# total_length = f.headers.get('content-length')
# if not total_length:
# code.write(f.content)
# else:
# total_length = int(total_length)
# while True:
# data = f.read(total_length / 100)
# if not data:
# break
# code.write(data)
#
ex = Compression(location)
ex.extract(target_file_name)
def download(self, url, location):
"""
In many cases such as Emuparadise, hotlinking is blocked.
For that reason, we must follow the redirects with mechanize.
After which we will download the file required.
"""
link = url
req = urllib2.Request(url)
req.add_header('Referer', 'https://www.emuparadise.me/')
file_name = urllib2.unquote(link.split('/')[-1])
target_file_name = os.path.join(location, file_name)
urllib.urlretrieve(link, target_file_name)
f = urllib2.urlopen(link)
with open(target_file_name, 'wb') as code:
total_length = f.headers.get('content-length')
if not total_length:
code.write(f.content)
else:
total_length = int(total_length)
while True:
data = f.read(total_length / 100)
if not data:
break
code.write(data)
ex = Compression(location)
ex.extract(target_file_name)
def _showSiteVerificationInfo(site):
import urllib2
printKeyValueList([u'Site', site[u'site'][u'identifier']])
Ind.Increment()
printKeyValueList([u'ID', urllib2.unquote(site[u'id'])])
printKeyValueList([u'Type', site[u'site'][u'type']])
printKeyValueList([u'All Owners', None])
if u'owners' in site:
Ind.Increment()
for owner in site[u'owners']:
printKeyValueList([owner])
Ind.Decrement()
Ind.Decrement()
# gam update verify|verification <DomainName> cname|txt|text|file|site
def on_get(self, req, resp, query=''):
print("Req", req, query)
query = unquote(query)
print("Get result for", query)
result = self.handler(query)
print("Returning", result)
resp.body = json.dumps(result)
def _is_fetching_self(url, method):
"""Checks if the fetch is for the same URL from which it originated.
Args:
url: str, The URL being fetched.
method: value from _VALID_METHODS.
Returns:
boolean indicating whether or not it seems that the app is trying to fetch
itself.
"""
if (method != GET or
"HTTP_HOST" not in os.environ or
"PATH_INFO" not in os.environ):
return False
_, host_port, path, _, _ = urlparse.urlsplit(url)
if host_port == os.environ['HTTP_HOST']:
current_path = urllib2.unquote(os.environ['PATH_INFO'])
desired_path = urllib2.unquote(path)
if (current_path == desired_path or
(current_path in ('', '/') and desired_path in ('', '/'))):
return True
return False
def _is_fetching_self(url, method):
"""Checks if the fetch is for the same URL from which it originated.
Args:
url: str, The URL being fetched.
method: value from _VALID_METHODS.
Returns:
boolean indicating whether or not it seems that the app is trying to fetch
itself.
"""
if (method != GET or
"HTTP_HOST" not in os.environ or
"PATH_INFO" not in os.environ):
return False
_, host_port, path, _, _ = urlparse.urlsplit(url)
if host_port == os.environ['HTTP_HOST']:
current_path = urllib2.unquote(os.environ['PATH_INFO'])
desired_path = urllib2.unquote(path)
if (current_path == desired_path or
(current_path in ('', '/') and desired_path in ('', '/'))):
return True
return False
def _insert_links(data_dict, limit, offset):
'''Adds link to the next/prev part (same limit, offset=offset+limit)
and the resource page.'''
data_dict['_links'] = {}
# get the url from the request
try:
urlstring = toolkit.request.environ['CKAN_CURRENT_URL']
except (KeyError, TypeError):
return # no links required for local actions
# change the offset in the url
parsed = list(urlparse.urlparse(urlstring))
query = urllib2.unquote(parsed[4])
arguments = dict(urlparse.parse_qsl(query))
arguments_start = dict(arguments)
arguments_prev = dict(arguments)
arguments_next = dict(arguments)
if 'offset' in arguments_start:
arguments_start.pop('offset')
arguments_next['offset'] = int(offset) + int(limit)
arguments_prev['offset'] = int(offset) - int(limit)
parsed_start = parsed[:]
parsed_prev = parsed[:]
parsed_next = parsed[:]
parsed_start[4] = urllib.urlencode(arguments_start)
parsed_next[4] = urllib.urlencode(arguments_next)
parsed_prev[4] = urllib.urlencode(arguments_prev)
# add the links to the data dict
data_dict['_links']['start'] = urlparse.urlunparse(parsed_start)
data_dict['_links']['next'] = urlparse.urlunparse(parsed_next)
if int(offset) - int(limit) > 0:
data_dict['_links']['prev'] = urlparse.urlunparse(parsed_prev)
def __call__(self, environ, start_response):
path = environ['PATH_INFO']
method = environ.get('REQUEST_METHOD')
if path == '/_tracking' and method == 'POST':
# do the tracking
# get the post data
payload = environ['wsgi.input'].read()
parts = payload.split('&')
data = {}
for part in parts:
k, v = part.split('=')
data[k] = urllib2.unquote(v).decode("utf8")
start_response('200 OK', [('Content-Type', 'text/html')])
# we want a unique anonomized key for each user so that we do
# not count multiple clicks from the same user.
key = ''.join([
environ['HTTP_USER_AGENT'],
environ['REMOTE_ADDR'],
environ.get('HTTP_ACCEPT_LANGUAGE', ''),
environ.get('HTTP_ACCEPT_ENCODING', ''),
])
key = hashlib.md5(key).hexdigest()
# store key/data here
sql = '''INSERT INTO tracking_raw
(user_key, url, tracking_type)
VALUES (%s, %s, %s)'''
self.engine.execute(sql, key, data.get('url'), data.get('type'))
return []
return self.app(environ, start_response)
def DecodeURIComponent(uri):
while True:
dec = urllib2.unquote(uri)
if dec == uri:
break
uri = dec
return uri.decode('utf8')
####################################################################################################
def _insert_links(data_dict, limit, offset):
'''Adds link to the next/prev part (same limit, offset=offset+limit)
and the resource page.'''
data_dict['_links'] = {}
# get the url from the request
try:
urlstring = toolkit.request.environ['CKAN_CURRENT_URL']
except (KeyError, TypeError):
return # no links required for local actions
# change the offset in the url
parsed = list(urlparse.urlparse(urlstring))
query = urllib2.unquote(parsed[4])
arguments = dict(urlparse.parse_qsl(query))
arguments_start = dict(arguments)
arguments_prev = dict(arguments)
arguments_next = dict(arguments)
if 'offset' in arguments_start:
arguments_start.pop('offset')
arguments_next['offset'] = int(offset) + int(limit)
arguments_prev['offset'] = int(offset) - int(limit)
parsed_start = parsed[:]
parsed_prev = parsed[:]
parsed_next = parsed[:]
parsed_start[4] = urllib.urlencode(arguments_start)
parsed_next[4] = urllib.urlencode(arguments_next)
parsed_prev[4] = urllib.urlencode(arguments_prev)
# add the links to the data dict
data_dict['_links']['start'] = urlparse.urlunparse(parsed_start)
data_dict['_links']['next'] = urlparse.urlunparse(parsed_next)
if int(offset) - int(limit) > 0:
data_dict['_links']['prev'] = urlparse.urlunparse(parsed_prev)
def unquote_unicode(text, encoding="utf-8"):
"""urllib2.unquote wrapper to handle unicode items."""
if isinstance(text, unicode):
text = text.encode(encoding)
return urllib2.unquote(text).decode(encoding)
def _is_fetching_self(url, method):
"""Checks if the fetch is for the same URL from which it originated.
Args:
url: str, The URL being fetched.
method: value from _VALID_METHODS.
Returns:
boolean indicating whether or not it seems that the app is trying to fetch
itself.
"""
if (method != GET or
"HTTP_HOST" not in os.environ or
"PATH_INFO" not in os.environ):
return False
_, host_port, path, _, _ = urlparse.urlsplit(url)
if host_port == os.environ['HTTP_HOST']:
current_path = urllib2.unquote(os.environ['PATH_INFO'])
desired_path = urllib2.unquote(path)
if (current_path == desired_path or
(current_path in ('', '/') and desired_path in ('', '/'))):
return True
return False
def doc_exalead(domain, user_agents, prox, q):
document_list = []
uas = user_agents
info('Exalead Document Search Started')
for start in range(0,80,10):
ua = random.choice(uas)
link = 'http://www.exalead.com/search/web/results/?search_language=&q=(filetype:xls+OR+filetype:doc+OR++filetype:pdf+OR+filetype:ppt)+site:{}&search_language=&elements_per_page=10&start_index={}'.format(domain, start)
if prox == True:
proxy = {'http' : 'http://127.0.0.1:8080'}
else:
pass
try:
headers = {"Connection" : "close",
"User-Agent" : ua,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate'}
if prox == True:
response = requests.get(link, headers=headers, proxies=proxy, verify=False)
else:
response = requests.get(link, headers=headers, verify=False)
soup = BeautifulSoup(response.text, "lxml")
if soup.find('label', {'class': 'control-label', 'for': 'id_captcha'}):
info("So you don't like spinach?")
info("Captchas are preventing some document searches.")
break
for div in soup.findAll('li', {'class': 'media'}):
document = div.find('a', href=True)['href']
document = urllib2.unquote(document)
document_list.append(document)
except Exception:
info('An Unhandled Exception Has Occured, Please Check The Log For Details' + INFO_LOG_FILE)
continue
time.sleep(10)
potential_docs = len(document_list)
info('Exalead Document Search Finished')
info('Potential Exalead Documents Found: {}'.format(potential_docs))
q.put(document_list)
def doc_bing(domain, user_agents, prox, q):
document_list = []
uas = user_agents
info('Bing Document Search Started')
for start in range(1,300,10):
ua = random.choice(uas)
if prox == True:
proxy = {'http' : 'http://127.0.0.1:8080'}
else:
pass
try:
headers = {"Connection" : "close",
"User-Agent" : ua,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate'}
payload = { 'q': 'filetype:(doc dot docx docm dotx dotm docb xls xlt xlm xlsx xlsm xltx xltm xlsb xla xlam xll xlw ppt pot pps pptx pptm potx potm ppam ppsx ppsm sldx sldm pub pdf) site:{}'.format(domain), 'first': start}
link = 'http://www.bing.com/search'
if prox == True:
response = requests.get(link, headers=headers, proxies=proxy, params=payload, verify=False)
else:
response = requests.get(link, headers=headers, params=payload, verify=False)
soup = BeautifulSoup(response.text, "lxml")
divs = soup.findAll('li', {'class': 'b_algo'})
for div in divs:
h2 = div.find('h2')
document = h2.find('a', href=True)['href']
document = urllib2.unquote(document)
document_list.append(document)
except requests.models.ChunkedEncodingError:
continue
except Exception:
traceback.print_exc()
continue
potential_docs = len(document_list)
info('Bing Document Search Finished')
q.put(document_list)
def parse_play_flash_cookie(response):
flash_cookie = response.cookies['PLAY_FLASH']
messageType, message = flash_cookie.split("=")
# Format message into user friendly string
message = urllib2.unquote(message).replace("+", " ")
# Discern error disposition
if(messageType == "dominoFlashError"):
error = True
else:
error = False
return dict(messageType=messageType, message=message, error=error)
def downloadApk(apkid, apkfilename):
s = requests.session()
headers = {
"Accept" : "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language" : "zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3",
"Accept-Encoding" : "gzip, deflate,sdch",
"Host" : "app.mi.com",
"User-Agent" : "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.111 Safari/537.36",
"Connection" : "keep-alive",
"Cache-Control" : "no-cache",
}
s.headers.update(headers)
s.headers['Host'] = 'app.mi.com'
resp = s.get('http://app.mi.com/download/'+str(apkid), timeout = 100, allow_redirects=False)
content = resp.content
#print "Content:", content
template = '<a href="(.*?)">here</a>'
real_url = re.compile(template)
real_url = re.search(real_url,content).group(1)
#print real_url
apkrealname = real_url[real_url.rfind('/')+1:]
apkrealname = urllib2.unquote(apkrealname)
s.headers['Host'] = 'f3.market.xiaomi.com'
resp = s.get(real_url,timeout = 100)
content = resp.content
with open(apkfilename,'wb+') as f:
f.write(content)
#
pass