def resolve(self, url):
try:
b = urlparse.urlparse(url).netloc
b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]
if not b in base64.b64decode(self.b_link): return url
u, p, h = url.split('|')
r = urlparse.parse_qs(h)['Referer'][0]
#u += '&app_id=Exodus'
c = self.request(r, output='cookie', close=False)
result = self.request(u, post=p, referer=r, cookie=c)
url = result.split('url=')
url = [urllib.unquote_plus(i.strip()) for i in url]
url = [i for i in url if i.startswith('http')]
url = url[-1]
return url
except:
return
python类unquote_plus()的实例源码
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def _parse_options(opts, delim):
"""Helper method for split_options which creates the options dict.
Also handles the creation of a list for the URI tag_sets/
readpreferencetags portion."""
options = {}
for opt in opts.split(delim):
key, val = opt.split("=")
if key.lower() == 'readpreferencetags':
options.setdefault('readpreferencetags', []).append(val)
else:
# str(option) to ensure that a unicode URI results in plain 'str'
# option names. 'normalized' is then suitable to be passed as
# kwargs in all Python versions.
if str(key) in options:
warnings.warn("Duplicate URI option %s" % (str(key),))
options[str(key)] = unquote_plus(val)
# Special case for deprecated options
if "wtimeout" in options:
if "wtimeoutMS" in options:
options.pop("wtimeout")
warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
" instead")
return options
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def get_smtp_server():
"""
Instanciate, configure and return a SMTP or SMTP_SSL instance from
smtplib.
:return: A SMTP instance. The quit() method must be call when all
the calls to sendmail() have been made.
"""
uri = parse_uri(config.get('email', 'uri'))
if uri.scheme.startswith('smtps'):
smtp_server = smtplib.SMTP_SSL(uri.hostname, uri.port)
else:
smtp_server = smtplib.SMTP(uri.hostname, uri.port)
if 'tls' in uri.scheme:
smtp_server.starttls()
if uri.username and uri.password:
smtp_server.login(
urllib.unquote_plus(uri.username),
urllib.unquote_plus(uri.password))
return smtp_server
def cursor(self, autocommit=False, readonly=False):
conv = MySQLdb.converters.conversions.copy()
conv[float] = lambda value, _: repr(value)
conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None
args = {
'db': self.database_name,
'sql_mode': 'traditional,postgresql',
'use_unicode': True,
'charset': 'utf8',
'conv': conv,
}
uri = parse_uri(config.get('database', 'uri'))
assert uri.scheme == 'mysql'
if uri.hostname:
args['host'] = uri.hostname
if uri.port:
args['port'] = uri.port
if uri.username:
args['user'] = uri.username
if uri.password:
args['passwd'] = urllib.unquote_plus(uri.password)
conn = MySQLdb.connect(**args)
cursor = Cursor(conn, self.database_name)
cursor.execute('SET time_zone = "+00:00"')
return cursor
def connect(self):
if self._connpool is not None:
return self
logger.info('connect to "%s"', self.database_name)
uri = parse_uri(config.get('database', 'uri'))
assert uri.scheme == 'postgresql'
host = uri.hostname and "host=%s" % uri.hostname or ''
port = uri.port and "port=%s" % uri.port or ''
name = "dbname=%s" % self.database_name
user = uri.username and "user=%s" % uri.username or ''
password = ("password=%s" % urllib.unquote_plus(uri.password)
if uri.password else '')
minconn = config.getint('database', 'minconn', default=1)
maxconn = config.getint('database', 'maxconn', default=64)
dsn = '%s %s %s %s %s' % (host, port, name, user, password)
self._connpool = ThreadedConnectionPool(minconn, maxconn, dsn)
return self
def _split_token_parts(blob):
"""Extracts and unescapes fields from the provided binary string.
Reverses the packing performed by _join_token_parts. Used to extract
the members of a token object.
Note: An empty string from the blob will be interpreted as None.
Args:
blob: str A string of the form 1x|member1|member2|member3 as created
by _join_token_parts
Returns:
A list of unescaped strings.
"""
return [urllib.unquote_plus(part) or None for part in blob.split('|')]
def filter(self, handler):
path = urlparse.urlsplit(handler.path).path
if path.startswith('/'):
path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
if os.path.isdir(path):
index_file = os.path.join(path, self.index_file)
if not os.path.isfile(index_file):
content = self.format_index_html(path).encode('UTF-8')
headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
return 'mock', {'status': 200, 'headers': headers, 'body': content}
else:
path = index_file
if os.path.isfile(path):
content_type = 'application/octet-stream'
try:
import mimetypes
content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
if os.path.splitext(path)[1].endswith(('crt', 'pem')):
content_type = 'application/x-x509-ca-cert'
except StandardError as e:
logging.error('import mimetypes failed: %r', e)
with open(path, 'rb') as fp:
content = fp.read()
headers = {'Connection': 'close', 'Content-Type': content_type}
return 'mock', {'status': 200, 'headers': headers, 'body': content}
def fromurl(self, url):
'''
Genera un item a partir de una cadena de texto. La cadena puede ser creada por la funcion tourl() o tener
el formato antiguo: plugin://plugin.video.pelisalacarta/?channel=... (+ otros parametros)
Uso: item.fromurl("cadena")
'''
if "?" in url: url = url.split("?")[1]
try:
STRItem = base64.b64decode(urllib.unquote(url))
JSONItem = json.loads(STRItem, object_hook=self.toutf8)
self.__dict__.update(JSONItem)
except:
url = urllib.unquote_plus(url)
dct = dict([[param.split("=")[0], param.split("=")[1]] for param in url.split("&") if "=" in param])
self.__dict__.update(dct)
self.__dict__ = self.toutf8(self.__dict__)
return self
def prepare(self):
if self.request.method == 'OPTIONS':
return
auth_header = self.request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Basic '):
raise exceptions.HTTPError(401, 'Unauthenticated')
decoded = unquote_plus(base64.decodestring(auth_header[6:]))
client_id, client_secret = decoded.split(':', 1)
service = yield Service.authenticate(client_id, client_secret)
if not service:
raise exceptions.HTTPError(401, 'Unauthenticated')
self.request.client_id = client_id
self.request.client = service
grant_type = self.request.body_arguments.get('grant_type', [None])[0]
self.request.grant_type = grant_type
def process_message_event(message, resource, token, config):
logging.debug('Processing message event')
try:
if str(message['_embedded']['message']['direction']) == 'Incoming':
message_uri = message['_embedded']['message']['_links']['plainMessage']['href']
logging.debug("Received raw message - %s" % message_uri)
inbound_message = urllib.unquote_plus(DataURI(message_uri).data)
logging.info("Received message - %s" % inbound_message)
thread_uri = message['_embedded']['message']['_links']['messaging']['href']
if MESSAGE_CALLBACK is not None:
MESSAGE_CALLBACK(inbound_message, thread_uri, resource)
# send_message(resource + thread_uri + '/messages', 'I found 4 matching incidents https://it12321.servicenow.com/search?query={0}'.format(inbound_message), token, config['redirect_uri'])
except KeyError:
logging.debug('not an inbound message')
pass
def resolve(self, url):
try:
b = urlparse.urlparse(url).netloc
b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]
if not b in base64.b64decode(self.b_link): return url
u, p, h = url.split('|')
r = urlparse.parse_qs(h)['Referer'][0]
c = self.request(r, output='cookie', close=False)
result = self.request(u, post=p, referer=r, cookie=c)
url = result.split('url=')
url = [urllib.unquote_plus(i.strip()) for i in url]
url = [i for i in url if i.startswith('http')]
url = url[-1]
return url
except:
return
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
url = event['Records'][0]['s3']['object']['key'].encode('utf8')
key = urllib.unquote_plus(url)
s3_path = os.path.dirname(key)
try:
s3.download_file(bucket, key, '/tmp/target.zip')
zfile = zipfile.ZipFile('/tmp/target.zip')
namelist = zfile.namelist()
for filename in namelist:
data = zfile.read(filename)
localpath = '/tmp/{}'.format(str(filename))
f = open(localpath, 'wb')
f.write(data)
f.close()
s3.upload_file(localpath, bucket, os.path.join(s3_path, filename))
s3.delete_object(Bucket=bucket, Key=key)
return "AWS Key -> {}".format(key)
except Exception as e:
print(e)
raise e
def resolve(self, url):
try:
b = urlparse.urlparse(url).netloc
b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]
if not b in base64.b64decode(self.b_link): return url
u, p, h = url.split('|')
r = urlparse.parse_qs(h)['Referer'][0]
#u += '&app_id=Exodus'
c = self.request(r, output='cookie', close=False)
result = self.request(u, post=p, referer=r, cookie=c)
url = re.compile('url=(.*)').findall(result)[0]
url = urllib.unquote_plus(url)
return url
except:
return
def test_unquoting(self):
# Make sure unquoting of all ASCII values works
escape_list = []
for num in range(128):
given = hexescape(chr(num))
expect = chr(num)
result = urllib.unquote(given)
self.assertEqual(expect, result,
"using unquote(): %s != %s" % (expect, result))
result = urllib.unquote_plus(given)
self.assertEqual(expect, result,
"using unquote_plus(): %s != %s" %
(expect, result))
escape_list.append(given)
escape_string = ''.join(escape_list)
del escape_list
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using quote(): not all characters escaped; %s" %
result)
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using unquote(): not all characters escaped: "
"%s" % result)
def test_unquoting(self):
# Make sure unquoting of all ASCII values works
escape_list = []
for num in range(128):
given = hexescape(chr(num))
expect = chr(num)
result = urllib.unquote(given)
self.assertEqual(expect, result,
"using unquote(): %s != %s" % (expect, result))
result = urllib.unquote_plus(given)
self.assertEqual(expect, result,
"using unquote_plus(): %s != %s" %
(expect, result))
escape_list.append(given)
escape_string = ''.join(escape_list)
del escape_list
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using quote(): not all characters escaped; %s" %
result)
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using unquote(): not all characters escaped: "
"%s" % result)
def process(self, pyfile):
name = re.search(self.NAME_PATTERN, pyfile.url).group(1)
pyfile.name = urllib.unquote_plus(name)
session = re.search(self.SESSION_PATTERN, pyfile.url).group(1)
url = "http://flyfiles.net"
#: Get download URL
parsed_url = self.load(url, post={'getDownLink': session})
self.log_debug("Parsed URL: %s" % parsed_url)
if parsed_url == "#downlink|" or parsed_url == "#downlink|#":
self.log_warning(
_("Could not get the download URL. Please wait 10 minutes"))
self.wait(10 * 60, True)
self.retry()
self.link = parsed_url.replace('#downlink|', '')
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def _parse_options(opts, delim):
"""Helper method for split_options which creates the options dict.
Also handles the creation of a list for the URI tag_sets/
readpreferencetags portion."""
options = {}
for opt in opts.split(delim):
key, val = opt.split("=")
if key.lower() == 'readpreferencetags':
options.setdefault('readpreferencetags', []).append(val)
else:
# str(option) to ensure that a unicode URI results in plain 'str'
# option names. 'normalized' is then suitable to be passed as
# kwargs in all Python versions.
if str(key) in options:
warnings.warn("Duplicate URI option %s" % (str(key),))
options[str(key)] = unquote_plus(val)
# Special case for deprecated options
if "wtimeout" in options:
if "wtimeoutMS" in options:
options.pop("wtimeout")
warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
" instead")
return options
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def print_var_node(xml_node, stream):
name = xml_node.getAttribute('name')
value = xml_node.getAttribute('value')
val_type = xml_node.getAttribute('type')
found_as = xml_node.getAttribute('found_as')
stream.write('Name: ')
stream.write(unquote_plus(name))
stream.write(', Value: ')
stream.write(unquote_plus(value))
stream.write(', Type: ')
stream.write(unquote_plus(val_type))
if found_as:
stream.write(', Found as: %s' % (unquote_plus(found_as),))
stream.write('\n')
#===================================================================================================
# print_referrers
#===================================================================================================
def GetImports(module_name):
try:
processor = pycompletionserver.Processor()
data = urllib.unquote_plus(module_name)
def_file, completions = _pydev_imports_tipper.GenerateTip(data)
return processor.formatCompletionMessage(def_file, completions)
except:
s = StringIO.StringIO()
exc_info = sys.exc_info()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], limit=None, file=s)
err = s.getvalue()
pycompletionserver.dbg('Received error: ' + str(err), pycompletionserver.ERROR)
raise
#=======================================================================================================================
# main
#=======================================================================================================================
def get_video_url(page_url, user="", password="", video_password=""):
video_urls = []
urls = []
response = httptools.downloadpage(page_url, cookies=False, headers={"Referer": page_url})
cookies = ""
cookie = response.headers["set-cookie"].split("HttpOnly, ")
for c in cookie:
cookies += c.split(";", 1)[0] + "; "
data = response.data.decode('unicode-escape')
data = urllib.unquote_plus(urllib.unquote_plus(data))
headers_string = "|Cookie=" + cookies
url_streams = scrapertools.find_single_match(data, 'url_encoded_fmt_stream_map=(.*)')
streams = scrapertools.find_multiple_matches(url_streams,
'itag=(\d+)&url=(.*?)(?:;.*?quality=.*?(?:,|&)|&quality=.*?(?:,|&))')
itags = {'18':'360p', '22':'720p', '34':'360p', '35':'480p', '37':'1080p', '43':'360p', '59':'480p'}
for itag, video_url in streams:
if not video_url in urls:
video_url += headers_string
video_urls.append([itags[itag], video_url])
urls.append(video_url)
video_urls.sort(key=lambda video_urls: int(video_urls[0].replace("p", "")))
return video_urls
def lambda_handler(event, context):
"""
Demonstrates S3 trigger that uses Rekognition APIs to detect faces, labels and index faces in S3 Object.
"""
# Get the object from the event
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
rekognition_faces_response = detect_faces(bucket, key)
rekognition_faces_response_json = json.dumps(rekognition_faces_response, indent=4)
rekognition_faces_response_csv = transform_json_to_csv(bucket, key, rekognition_faces_response)
write_s3(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv)
return rekognition_faces_response
except Exception as e:
print("Error processing object {} from bucket {}".format(key, bucket))
print("Exception: {}. {}".format(e, sys.exc_info()[0]))
raise
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
print (bucket)
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
print (key)
try:
response = s3.get_object(Bucket=bucket, Key=key)
print (response)
print ("CONTENT TYPE: " + response['ContentType'])
s3.put_object(Body=response['Body'].read(), Bucket='lambdabkt-testsave123',Key=key+'123')
return response['ContentType']
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
def playstrm(params,url,category):
'''Play para videos en ficheros strm
'''
logger.info("[xbmctools.py] playstrm url="+url)
title = unicode( xbmc.getInfoLabel( "ListItem.Title" ), "utf-8" )
thumbnail = urllib.unquote_plus( params.get("thumbnail") )
plot = unicode( xbmc.getInfoLabel( "ListItem.Plot" ), "utf-8" )
server = params["server"]
if (params.has_key("Serie")):
serie = params.get("Serie")
else:
serie = ""
if (params.has_key("subtitle")):
subtitle = params.get("subtitle")
else:
subtitle = ""
from core.item import Item
from platformcode.subtitletools import saveSubtitleName
item = Item(title=title,show=serie)
saveSubtitleName(item)
play_video("Biblioteca streamondemand",server,url,category,title,thumbnail,plot,strmfile=True,Serie=serie,subtitle=subtitle)