python类unquote_plus()的实例源码

directdl.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
uri_parser.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
uri_parser.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
uri_parser.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
uri_parser.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
uri_parser.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
uri_parser.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
misc.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_smtp_server():
    """
    Instanciate, configure and return a SMTP or SMTP_SSL instance from
    smtplib.
    :return: A SMTP instance. The quit() method must be call when all
    the calls to sendmail() have been made.
    """
    uri = parse_uri(config.get('email', 'uri'))
    if uri.scheme.startswith('smtps'):
        smtp_server = smtplib.SMTP_SSL(uri.hostname, uri.port)
    else:
        smtp_server = smtplib.SMTP(uri.hostname, uri.port)

    if 'tls' in uri.scheme:
        smtp_server.starttls()

    if uri.username and uri.password:
        smtp_server.login(
            urllib.unquote_plus(uri.username),
            urllib.unquote_plus(uri.password))

    return smtp_server
database.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def cursor(self, autocommit=False, readonly=False):
        conv = MySQLdb.converters.conversions.copy()
        conv[float] = lambda value, _: repr(value)
        conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None
        args = {
            'db': self.database_name,
            'sql_mode': 'traditional,postgresql',
            'use_unicode': True,
            'charset': 'utf8',
            'conv': conv,
        }
        uri = parse_uri(config.get('database', 'uri'))
        assert uri.scheme == 'mysql'
        if uri.hostname:
            args['host'] = uri.hostname
        if uri.port:
            args['port'] = uri.port
        if uri.username:
            args['user'] = uri.username
        if uri.password:
            args['passwd'] = urllib.unquote_plus(uri.password)
        conn = MySQLdb.connect(**args)
        cursor = Cursor(conn, self.database_name)
        cursor.execute('SET time_zone = "+00:00"')
        return cursor
database.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def connect(self):
        if self._connpool is not None:
            return self
        logger.info('connect to "%s"', self.database_name)
        uri = parse_uri(config.get('database', 'uri'))
        assert uri.scheme == 'postgresql'
        host = uri.hostname and "host=%s" % uri.hostname or ''
        port = uri.port and "port=%s" % uri.port or ''
        name = "dbname=%s" % self.database_name
        user = uri.username and "user=%s" % uri.username or ''
        password = ("password=%s" % urllib.unquote_plus(uri.password)
            if uri.password else '')
        minconn = config.getint('database', 'minconn', default=1)
        maxconn = config.getint('database', 'maxconn', default=64)
        dsn = '%s %s %s %s %s' % (host, port, name, user, password)
        self._connpool = ThreadedConnectionPool(minconn, maxconn, dsn)
        return self
gauth.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _split_token_parts(blob):
  """Extracts and unescapes fields from the provided binary string.

  Reverses the packing performed by _join_token_parts. Used to extract
  the members of a token object.

  Note: An empty string from the blob will be interpreted as None.

  Args:
    blob: str A string of the form 1x|member1|member2|member3 as created
        by _join_token_parts

  Returns:
    A list of unescaped strings.
  """
  return [urllib.unquote_plus(part) or None for part in blob.split('|')]
proxylib.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def filter(self, handler):
        path = urlparse.urlsplit(handler.path).path
        if path.startswith('/'):
            path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
            if os.path.isdir(path):
                index_file = os.path.join(path, self.index_file)
                if not os.path.isfile(index_file):
                    content = self.format_index_html(path).encode('UTF-8')
                    headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
                else:
                    path = index_file
            if os.path.isfile(path):
                content_type = 'application/octet-stream'
                try:
                    import mimetypes
                    content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
                    if os.path.splitext(path)[1].endswith(('crt', 'pem')):
                        content_type = 'application/x-x509-ca-cert'
                except StandardError as e:
                    logging.error('import mimetypes failed: %r', e)
                with open(path, 'rb') as fp:
                    content = fp.read()
                    headers = {'Connection': 'close', 'Content-Type': content_type}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
item.py 文件源码 项目:tvalacarta 作者: tvalacarta 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def fromurl(self, url):
        '''
        Genera un item a partir de una cadena de texto. La cadena puede ser creada por la funcion tourl() o tener
        el formato antiguo: plugin://plugin.video.pelisalacarta/?channel=... (+ otros parametros)
        Uso: item.fromurl("cadena")
        '''
        if "?" in url: url = url.split("?")[1]
        try:
            STRItem = base64.b64decode(urllib.unquote(url))
            JSONItem = json.loads(STRItem, object_hook=self.toutf8)
            self.__dict__.update(JSONItem)
        except:
            url = urllib.unquote_plus(url)
            dct = dict([[param.split("=")[0], param.split("=")[1]] for param in url.split("&") if "=" in param])
            self.__dict__.update(dct)
            self.__dict__ = self.toutf8(self.__dict__)
        return self
base.py 文件源码 项目:auth-srv 作者: openpermissions 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def prepare(self):
        if self.request.method == 'OPTIONS':
            return

        auth_header = self.request.headers.get('Authorization')
        if not auth_header or not auth_header.startswith('Basic '):
            raise exceptions.HTTPError(401, 'Unauthenticated')

        decoded = unquote_plus(base64.decodestring(auth_header[6:]))
        client_id, client_secret = decoded.split(':', 1)

        service = yield Service.authenticate(client_id, client_secret)
        if not service:
            raise exceptions.HTTPError(401, 'Unauthenticated')

        self.request.client_id = client_id
        self.request.client = service

        grant_type = self.request.body_arguments.get('grant_type', [None])[0]
        self.request.grant_type = grant_type
events.py 文件源码 项目:pyucwa 作者: tonybaloney 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process_message_event(message, resource, token, config):
    logging.debug('Processing message event')
    try:
        if str(message['_embedded']['message']['direction']) == 'Incoming':
            message_uri = message['_embedded']['message']['_links']['plainMessage']['href']
            logging.debug("Received raw message - %s" % message_uri)

            inbound_message = urllib.unquote_plus(DataURI(message_uri).data)
            logging.info("Received message - %s" % inbound_message)

            thread_uri = message['_embedded']['message']['_links']['messaging']['href']

            if MESSAGE_CALLBACK is not None:
                MESSAGE_CALLBACK(inbound_message, thread_uri, resource)
            # send_message(resource + thread_uri + '/messages', 'I found 4 matching incidents https://it12321.servicenow.com/search?query={0}'.format(inbound_message), token, config['redirect_uri'])
    except KeyError:
        logging.debug('not an inbound message')
    pass
directdl.py 文件源码 项目:plugin.video.exodus 作者: huberyhe 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
lambda_function.py 文件源码 项目:aws-lambda-unzip 作者: mehmetboraezer 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    url = event['Records'][0]['s3']['object']['key'].encode('utf8')
    key = urllib.unquote_plus(url)
    s3_path = os.path.dirname(key)
    try:
        s3.download_file(bucket, key, '/tmp/target.zip')
        zfile = zipfile.ZipFile('/tmp/target.zip')
        namelist = zfile.namelist()
        for filename in namelist:
            data = zfile.read(filename)
            localpath = '/tmp/{}'.format(str(filename))
            f = open(localpath, 'wb')
            f.write(data)
            f.close()
            s3.upload_file(localpath, bucket, os.path.join(s3_path, filename))
        s3.delete_object(Bucket=bucket, Key=key)
        return "AWS Key -> {}".format(key)
    except Exception as e:
        print(e)
        raise e
directdl_mv_tv.py 文件源码 项目:exodus-favourite-library 作者: arcane47 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = re.compile('url=(.*)').findall(result)[0]
            url = urllib.unquote_plus(url)

            return url
        except:
            return
test_urllib.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
test_urllib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
FlyFilesNet.py 文件源码 项目:download-manager 作者: thispc 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def process(self, pyfile):
        name = re.search(self.NAME_PATTERN, pyfile.url).group(1)
        pyfile.name = urllib.unquote_plus(name)

        session = re.search(self.SESSION_PATTERN, pyfile.url).group(1)

        url = "http://flyfiles.net"

        #: Get download URL
        parsed_url = self.load(url, post={'getDownLink': session})
        self.log_debug("Parsed URL: %s" % parsed_url)

        if parsed_url == "#downlink|" or parsed_url == "#downlink|#":
            self.log_warning(
                _("Could not get the download URL. Please wait 10 minutes"))
            self.wait(10 * 60, True)
            self.retry()

        self.link = parsed_url.replace('#downlink|', '')
uri_parser.py 文件源码 项目:covar_me_app 作者: CovarMe 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
uri_parser.py 文件源码 项目:covar_me_app 作者: CovarMe 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
uri_parser.py 文件源码 项目:websearch 作者: abelkhan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
pydevd_referrers.py 文件源码 项目:specto 作者: mrknow 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def print_var_node(xml_node, stream):
    name = xml_node.getAttribute('name')
    value = xml_node.getAttribute('value')
    val_type = xml_node.getAttribute('type')

    found_as = xml_node.getAttribute('found_as')
    stream.write('Name: ')
    stream.write(unquote_plus(name))
    stream.write(', Value: ')
    stream.write(unquote_plus(value))
    stream.write(', Type: ')
    stream.write(unquote_plus(val_type))
    if found_as:
        stream.write(', Found as: %s' % (unquote_plus(found_as),))
    stream.write('\n')

#===================================================================================================
# print_referrers
#===================================================================================================
pycompletion.py 文件源码 项目:specto 作者: mrknow 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def GetImports(module_name):
    try:
        processor = pycompletionserver.Processor()
        data = urllib.unquote_plus(module_name)
        def_file, completions = _pydev_imports_tipper.GenerateTip(data)
        return processor.formatCompletionMessage(def_file, completions)
    except:
        s = StringIO.StringIO()
        exc_info = sys.exc_info()

        traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], limit=None, file=s)
        err = s.getvalue()
        pycompletionserver.dbg('Received error: ' + str(err), pycompletionserver.ERROR)
        raise


#=======================================================================================================================
# main
#=======================================================================================================================
gvideo.py 文件源码 项目:pelisalacarta-ce 作者: pelisalacarta-ce 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_video_url(page_url, user="", password="", video_password=""):
    video_urls = []
    urls = []
    response = httptools.downloadpage(page_url, cookies=False, headers={"Referer": page_url})
    cookies = ""
    cookie = response.headers["set-cookie"].split("HttpOnly, ")
    for c in cookie:
        cookies += c.split(";", 1)[0] + "; "
    data = response.data.decode('unicode-escape')
    data = urllib.unquote_plus(urllib.unquote_plus(data))
    headers_string = "|Cookie=" + cookies
    url_streams = scrapertools.find_single_match(data, 'url_encoded_fmt_stream_map=(.*)')
    streams = scrapertools.find_multiple_matches(url_streams,
                                             'itag=(\d+)&url=(.*?)(?:;.*?quality=.*?(?:,|&)|&quality=.*?(?:,|&))')
    itags = {'18':'360p', '22':'720p', '34':'360p', '35':'480p', '37':'1080p', '43':'360p', '59':'480p'}
    for itag, video_url in streams:
        if not video_url in urls:
            video_url += headers_string
            video_urls.append([itags[itag], video_url])
            urls.append(video_url)
    video_urls.sort(key=lambda video_urls: int(video_urls[0].replace("p", "")))
    return video_urls
lambda.py 文件源码 项目:curso-aws-bigdata-ai 作者: paradigmadigital 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def lambda_handler(event, context):
    """
    Demonstrates S3 trigger that uses Rekognition APIs to detect faces, labels and index faces in S3 Object.
    """

    # Get the object from the event
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))

    try:
        rekognition_faces_response = detect_faces(bucket, key)
        rekognition_faces_response_json = json.dumps(rekognition_faces_response, indent=4)
        rekognition_faces_response_csv = transform_json_to_csv(bucket, key, rekognition_faces_response)

        write_s3(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv)

        return rekognition_faces_response

    except Exception as e:
        print("Error processing object {} from bucket {}".format(key, bucket))
        print("Exception: {}. {}".format(e, sys.exc_info()[0]))
        raise
lambda_function.py 文件源码 项目:aws_lambda_backup_s3 作者: ogckw 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    print (bucket)
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    print (key)
    try:
        response = s3.get_object(Bucket=bucket, Key=key)
        print (response)
        print ("CONTENT TYPE: " + response['ContentType'])
        s3.put_object(Body=response['Body'].read(), Bucket='lambdabkt-testsave123',Key=key+'123')
        return response['ContentType']
    except Exception as e:
        print(e)
        print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
        raise e
xbmctools.py 文件源码 项目:plugin.video.streamondemand-pureita 作者: orione7 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def playstrm(params,url,category):
    '''Play para videos en ficheros strm
    '''
    logger.info("[xbmctools.py] playstrm url="+url)

    title = unicode( xbmc.getInfoLabel( "ListItem.Title" ), "utf-8" )
    thumbnail = urllib.unquote_plus( params.get("thumbnail") )
    plot = unicode( xbmc.getInfoLabel( "ListItem.Plot" ), "utf-8" )
    server = params["server"]
    if (params.has_key("Serie")):
        serie = params.get("Serie")
    else:
        serie = ""
    if (params.has_key("subtitle")):
        subtitle = params.get("subtitle")
    else:
        subtitle = ""
    from core.item import Item
    from platformcode.subtitletools import saveSubtitleName
    item = Item(title=title,show=serie)
    saveSubtitleName(item)
    play_video("Biblioteca streamondemand",server,url,category,title,thumbnail,plot,strmfile=True,Serie=serie,subtitle=subtitle)


问题


面经


文章

微信
公众号

扫码关注公众号