python类url()的实例源码

audit.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init_audit(log_group):

    def audit(f):

        @functools.wraps(f)
        def handle(account_id, *args, **kw):
            envelope = {
                'timestamp': int(time.time() * 1000),
                'message': json.dumps({
                    'user': request.environ.get('REMOTE_USER', ''),
                    'url': request.url,
                    'path': request.path,
                    'method': request.method,
                    'pid': os.getpid(),
                    'account_id': account_id,
                    'ip': request.remote_addr})
            }
            transport.send_group("%s=%s" % (log_group, account_id), [envelope])
            return f(account_id, *args, **kw)

        return handle

    return audit
test_environ.py 文件源码 项目:Pardus-Bulut 作者: ferhatacikalin 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_url(self):
        """ Environ: URL building """
        request = BaseRequest({'HTTP_HOST':'example.com'})
        self.assertEqual('http://example.com/', request.url)
        request = BaseRequest({'SERVER_NAME':'example.com'})
        self.assertEqual('http://example.com/', request.url)
        request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'})
        self.assertEqual('http://example.com:81/', request.url)
        request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'})
        self.assertEqual('https://example.com/', request.url)
        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path',
                               'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'})
        self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th',
                               'SCRIPT_NAME':'/s p'})
        self.assertEqual('http://example.com/s%20p/pa%20th', request.url)
views.py 文件源码 项目:xqz.es 作者: arahayrabedian 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def callback():
    """ Step 3: Retrieving an access token.
    The user has been redirected back from the provider to your registered
    callback URL. With this redirection comes an authorization code included
    in the redirect URL. We will use that to obtain an access token.

    NOTE: your server name must be correctly configured in order for this to
    work, do this by adding the headers at your http layer, in particular:
    X_FORWARDED_HOST, X_FORWARDED_PROTO so that bottle can render the correct
    url and links for you.
    """
    oauth2session = OAuth2Session(settings.SLACK_OAUTH['client_id'],
                                  state=request.GET['state'])

    #PII - update privacy policy if oauth2 token is stored.
    token = oauth2session.fetch_token(
        settings.SLACK_OAUTH['token_url'],
        client_secret=settings.SLACK_OAUTH['client_secret'],
        authorization_response=request.url
    )

    # we don't need the token, we just need the user to have installed the app
    # in the future, if we need tokens, we'll get them.

    redirect('/?added_to_slack=true')
Mmrz-Sync.py 文件源码 项目:Mmrz-Sync 作者: zhanglintc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def mmrz():
    username = request.get_cookie('username')
    password = request.get_cookie('password')
    password = urllib.unquote(password) if password else None

    if not verify_login(username, password):
        redirect('/')

    # need_https = "localhost" not in request.url
    need_https = False

    return_dict = dict(universal_ROUTE_dict)
    return_dict.update(dict(need_https=need_https))
    return return_dict
Mmrz-Sync.py 文件源码 项目:Mmrz-Sync 作者: zhanglintc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dict_short():
    query = request.urlparts.query
    url = '/dictionary?{0}'.format(query) if query else '/dictionary'
    redirect(url)
pyload_app.py 文件源码 项目:download-manager 作者: thispc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def pre_processor():
    s = request.environ.get('beaker.session')
    user = parse_userdata(s)
    perms = parse_permissions(s)
    status = {}
    captcha = False
    update = False
    plugins = False
    if user["is_authenticated"]:
        status = PYLOAD.statusServer()
        info = PYLOAD.getInfoByPlugin("UpdateManager")
        captcha = PYLOAD.isCaptchaWaiting()

        # check if update check is available
        if info:
            if info["pyload"] == "True": update = True
            if info["plugins"] == "True": plugins = True


    return {"user": user,
            'status': status,
            'captcha': captcha,
            'perms': perms,
            'url': request.url,
            'update': update,
            'plugins': plugins}
app.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def info(account_id, resource_id):
    request_data = request.query
    if resource_id.startswith('sg-') and 'parent_id' not in request_data:
        abort(400, "Missing required parameter parent_id")
    result = controller.info(
        account_id, resource_id, request_data.get('parent_id', resource_id))
    response.content_type = "application/json"
    return json.dumps(result, indent=2, cls=Encoder)


# this set to post to restrict permissions, perhaps another url space.
app.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def api_url():
    parsed = request.urlparts
    url = "%s://%s%s" % (parsed.scheme, parsed.netloc, request.script_name)
    return url
app.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def error(e):
    response.content_type = "application/json"
    return json.dumps({
        "status": e.status,
        "url": repr(request.url),
        "exception": repr(e.exception),
        #  "traceback": e.traceback and e.traceback.split('\n') or '',
        "body": repr(e.body)
    }, indent=2)
api.py 文件源码 项目:ray 作者: felipevolpone 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def dispatch(url):
    """
        This class is the beginning of all entrypoints in the Ray API. Here, each url
        will be redirect to the right handler
    """

    url = bottle_req.path
    log.info('request: %s', bottle_req.url)

    if url[-1] == '/':
        url = url[:-1]

    response_code = 200

    try:
        processed = process(url, bottle_req, bottle_resp)

        try:
            from_func, http_status = processed[0], processed[1]
            bottle_resp.status = http_status
            return from_func
        except:
            return processed

    except exceptions.RayException as e:
        log.exception('ray exception: ')
        response_code = e.http_code

    except:
        log.exception('exception:')
        raise

    bottle_resp.status = response_code
api.py 文件源码 项目:ray 作者: felipevolpone 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __handle_action(url):
    # url e.g: /api/user/123/action

    arg = None
    if len(url.split('/')) >= 5:  # indicate that has an id between endpoint and action_name
        arg = http.param_at(url, -2)

    return Action(url, arg, bottle_req).process_action()
Mmrz-Sync.py 文件源码 项目:Mmrz-Sync 作者: zhanglintc 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def query_hujiang(key_word):
    if not key_word:
        return []

    headers = {
        'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1',
        'Accept-Encoding': 'gzip, deflate, sdch',
        }

    url = "https://m.hujiang.com/d/dict_jp_api.ashx?type=jc&w={0}".format(urllib.quote(key_word))

    proxies = {}
    response = requests.get(url, headers=headers, verify=False, proxies=proxies)
    try:
        defines = response.json()
    except:
        return []

    for i in range(len(defines)):
        Comment = defines[i]["Comment"]
        comments = re.findall("<br/>([^a-zA-Z]+)<br/>", Comment)

        tmp = ", ".join(comments)

        # ??tmp??, ??????????
        # tmp = Comment if not tmp else tmp

        # ???????????
        # mch = re.search(u"(?.+??)", tmp)
        # tmp = mch.group(1) if mch else tmp

        # ??????
        tmp = re.sub(u"\?.+?\?",   "", tmp)
        tmp = re.sub(u"\(.+?\)",  "", tmp)
        tmp = re.sub(u"?+?",     "", tmp)

        defines[i]["Comment"] = tmp

        defines[i]["PronounceJp"] = re.sub("\[|\]", "", defines[i]["PronounceJp"])

    return defines

### static files
Mmrz-Sync.py 文件源码 项目:Mmrz-Sync 作者: zhanglintc 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def get_hujiang_tts():
    key_word = request.params.get('key_word', None)
    job_id   = request.params.get('job_id', None)

    if not key_word:
        return "key_word is null"

    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.89 Safari/537.36',
        'Accept-Encoding': 'gzip, deflate, sdch',
    }

    url = "http://dict.hjenglish.com/jp/jc/" + urllib.quote(key_word)

    req = urllib2.Request(url, None, headers)
    response = urllib2.urlopen(req)
    compressedData = response.read()

    compressedStream = StringIO.StringIO(compressedData)
    gzipper = gzip.GzipFile(fileobj=compressedStream)
    html = gzipper.read()

    soup = BeautifulSoup(html, "html.parser")

    ret_info = {
        "found": False,
        "message_str": "",
        "tts_url": "",
        "job_id": job_id,
    }

    jpSound_list = soup.select('span[class=jpSound]')
    if len(jpSound_list) < 1:
        ret_info["found"] = False
        ret_info["message_str"] = "jpSound not found"
        return json.dumps(ret_info)

    jpSound = str(jpSound_list[0])
    mc = re.search("GetTTSVoice\(\"(.*?)\"\)", jpSound)
    if not mc:
        ret_info["found"] = False
        ret_info["message_str"] = "tts_url not found"
        return json.dumps(ret_info)

    tts_url = mc.group(1)
    ret_info["found"] = True
    ret_info["message_str"] = "tts_url is found"
    ret_info["tts_url"] = tts_url
    return json.dumps(ret_info)


问题


面经


文章

微信
公众号

扫码关注公众号