python类base_url()的实例源码

views.py 文件源码 项目:guides-cms 作者: pluralsight 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def review(title):
    """
    This URL only exists for legacy reasons so try to find the article where
    it is in the new scheme and return 301 to indicate moved.
    """

    branch = request.args.get('branch', u'master')

    article = models.search_for_article(title)
    if article is not None:
        return redirect(filters.url_for_article(article, branch=branch), 301)

    return missing_article(request.base_url, title=title, branch=branch)


# Note this URL is directly linked to the filters.url_for_article filter.
# These must be changed together!
images.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def headerize(func):
    """The decorator adds header links to response for paginator"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        data = func(*args, **kwargs)
        resp = jsonify(data)
        fmt = '<{base}?page={page}&per_page={per_page}>; rel="{rel}"'
        links = [{'page': data['page'] - 1, 'rel':'prev'},
                 {'page': data['page'] + 1, 'rel':'next'},
                 {'page': 1, 'rel': 'first'},
                 {'page': data['num_pages'], 'rel':'last'}]
        header = ', '.join(fmt.format(base=request.base_url,
                                      page=i['page'],
                                      per_page=data['per_page'],
                                      rel=i['rel'])
                           for i in links)
        resp.headers.extend({'Link': header})
        return resp
    return wrapper
resolver.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, data):
        self._structure = data
        self._dispatcher = {
            'get': requests.get,
            'post': requests.post,
            'put': requests.put,
            'delete': requests.delete}

        self._get_system_settings()

        dgst = data.get('password-digest')
        if (dgst is not None and isinstance(dgst, basestring) and
                hasattr(hashlib, dgst.lower())):
            m = operator.methodcaller(dgst.lower(),
                                      self.billing_password)(hashlib)
            self.billing_password = m.hexdigest()

        _url = urlparse(request.base_url)
        self.master_url = '{0}://{1}'.format(_url.scheme, _url.netloc)
mock_yarn.py 文件源码 项目:yarnitor 作者: maxpoint 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def spark_application(app_id):
    """Mock of the Spark jobs REST resource."""
    if 'last' in request.args:
        return jsonify(redis.get(request.base_url))

    d = st.fixed_dictionaries({
        'jobId': st.integers(0),
        'name': st.text(),
        'submissionTime': st.text(),
        'completionTime': st.text(),
        'stageIds': st.lists(st.integers(0), average_size=3),
        'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']),
        'numTasks': st.integers(0),
        'numActiveTasks': st.integers(0),
        'numCompletedTasks': st.integers(0),
        'numSkippedTasks': st.integers(0),
        'numFailedTasks': st.integers(0),
        'numActiveStages': st.integers(0),
        'numCompletedStages': st.integers(0),
        'numSkippedStages': st.integers(0),
        'numFailedStages': st.integers(0),
    })
    result = json.dumps(st.lists(d, average_size=3).example())
    redis.set(request.base_url, result)
    return jsonify(result)
views.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _cache_key(ui, url=None, locale=None, additional_key_data=None):
    if url is None:
        url = request.base_url
    if locale is None:
        locale = g.locale.language if g.locale else "en"

    k = "ui:{}:{}:{}".format(ui, url, locale)
    if callable(additional_key_data):
        try:
            ak = additional_key_data()
            if ak:
                # we have some additional key components, let's attach them
                if not isinstance(ak, (list, tuple)):
                    ak = [ak]
                k = "{}:{}".format(k, ":".join(ak))
        except:
            _logger.exception("Error while trying to retrieve additional cache key parts for ui {}".format(ui))
    return k
middleware.py 文件源码 项目:service-level-reporting 作者: zalando-zmon 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_request():
    """
    Process request.

    - Set api_url

    """
    base_url = request.base_url

    referrer = request.headers.get('referer')

    if referrer:
        # we use referrer as base url
        parts = urlparse(referrer)
        base_url = urlunparse((parts.scheme, parts.netloc, '', '', '', ''))
    elif APP_URL:
        base_url = APP_URL

    # Used in building full URIs
    request.api_url = urljoin(base_url, API_PREFIX + '/')
    request.user = flask_session.get('user')
    request.realm = flask_session.get('realm', 'employees')
authentication_service.py 文件源码 项目:tasking-manager 作者: hotosm 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def verify_token(token):
    """ Verify the supplied token and check user role is correct for the requested resource"""

    if not token:
        current_app.logger.debug(f'Token not supplied {request.base_url}')
        return False

    try:
        decoded_token = base64.b64decode(token).decode('utf-8')
    except UnicodeDecodeError:
        current_app.logger.debug(f'Unable to decode token {request.base_url}')
        return False  # Can't decode token, so fail login

    valid_token, user_id = AuthenticationService.is_valid_token(decoded_token, 604800)
    if not valid_token:
        current_app.logger.debug(f'Token not valid {request.base_url}')
        return False

    if tm.is_pm_only_resource:
        if not UserService.is_user_a_project_manager(user_id):
            current_app.logger.debug(f'User {user_id} is not a PM {request.base_url}')
            return False

    tm.authenticated_user_id = user_id  # Set the user ID on the decorator as a convenience
    return True  # All tests passed token is good for the requested resource
__init__.py 文件源码 项目:go_basketball 作者: ZhaoPengkun 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def before_request():
    request.start_time = datetime.now()


# @bp.after_request
# def after_request(resp):
#     try:
#         if '_' in request.endpoint:
#             dbcon = influx_db.connection
#             point = [{"measurement": config.APP_NAME,
#                       "tags": {"method": request.method, "status": resp.status_code, "endpoint":
#                           request.endpoint},
#                       "fields": {"base_url": request.base_url, "remote_address": request.remote_addr,
#                                  'response_time': (datetime.now() - request.start_time).microseconds}}]
#             dbcon.write_points(point)
#     except Exception as e:
#         pass
#         logger.debug('Write api statistics data to influxdb failed, error?' + e.message)
#     return resp
response.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def forbidden(message="Forbidden."):
    log.debug("Response 403: %s", message)
    log.info("Denied %s %s %s", request.remote_addr, request.method, request.base_url)
    return Response(message + "\n", status=403, mimetype="text/plain")
util.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def base_url(path = None):
    return request.base_url + ("" if path is None else "/" + path)
payments.py 文件源码 项目:payments 作者: devops-s17-payments 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def index():
    return jsonify(name='Payments REST API Service',
                   version='1.0',
                   docs=request.base_url + 'apidocs/index.html',
                   site=request.base_url + 'payments'), status.HTTP_200_OK

######################################################################
# LIST ALL PAYMENTS
######################################################################
views.py 文件源码 项目:guides-cms 作者: pluralsight 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def partner(article_path):
    """
    URL for articles from hackhands blog -- these articles are not
    editable.
    """

    try:
        repo_path = '%s/%s' % (app.config['SECONDARY_REPO_OWNER'],
                               app.config['SECONDARY_REPO_NAME'])
    except KeyError:
        flash('No secondary guide configuration', category='error')
        return redirect(url_for('index'))

    if article_path is None:
        articles = models.get_available_articles(status=PUBLISHED,
                                                 repo_path=repo_path)
        return render_template('review.html', articles=articles)

    article = models.read_article(article_path, repo_path=repo_path)
    if article is None:
        flash('Failed reading guide', category='error')
        return redirect(url_for('index'))

    # Use http as canonical protocol for url to avoid having two separate
    # comment threads for an article. Disqus uses this variable to save
    # comments.
    canonical_url = request.base_url.replace('https://', 'http://')

    form = forms.SignupForm()

    return render_template('article.html',
                           article=article,
                           allow_edits=False,
                           canonical_url=canonical_url,
                           form=form,
                           disclaimer=True)
middleware.py 文件源码 项目:dd-trace-py 作者: DataDog 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _finish_span(self, response=None, exception=None):
        """ Close and finish the active span if it exists. """
        span = getattr(g, 'flask_datadog_span', None)
        if span:
            if span.sampled:
                error = 0
                code = response.status_code if response else None
                method = request.method if request else None

                # if we didn't get a response, but we did get an exception, set
                # codes accordingly.
                if not response and exception:
                    code = 500
                    # The 3 next lines might not be strictly required, since `set_traceback`
                    # also get the exception from the sys.exc_info (and fill the error meta).
                    # Since we aren't sure it always work/for insuring no BC break, keep
                    # these lines which get overridden anyway.
                    error = 1
                    span.set_tag(errors.ERROR_TYPE, type(exception))
                    span.set_tag(errors.ERROR_MSG, exception)
                    # The provided `exception` object doesn't have a stack trace attached,
                    # so attach the stack trace with `set_traceback`.
                    span.set_traceback()

                # the endpoint that matched the request is None if an exception
                # happened so we fallback to a common resource
                resource = code if not request.endpoint else request.endpoint
                span.resource = compat.to_unicode(resource).lower()
                span.set_tag(http.URL, compat.to_unicode(request.base_url or ''))
                span.set_tag(http.STATUS_CODE, code)
                span.set_tag(http.METHOD, method)
                span.error = error
            span.finish()
            # Clear our span just in case.
            g.flask_datadog_span = None

    # Request hook methods
app.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def after_request(response):
    """ called after every request """

    # log the endpoint hit and any errors
    delta = int((time.time() - g.start_time) * 1000)
    start_utc = datetime.datetime.utcfromtimestamp(g.start_time)
    username = request.authorization.username if request.authorization else None
    err_msg = response.get_data(as_text=True) if response.status_code // 100 >= 4 else None
    Logger.endpoint_hit(start_utc, delta, request.base_url, username, request.method,
                        response.status_code, err_msg)
    return response
views.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_link_string(page, last_page, per_page):
    """Returns a string representing the value of the ``Link`` header.

    `page` is the number of the current page, `last_page` is the last page in
    the pagination, and `per_page` is the number of results per page.

    """
    linkstring = ''
    if page < last_page:
        next_page = page + 1
        linkstring = LINKTEMPLATE.format(request.base_url, next_page,
                                         per_page, 'next') + ', '
    linkstring += LINKTEMPLATE.format(request.base_url, last_page,
                                      per_page, 'last')
    return linkstring
flask_openid.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_current_url(self):
        """the current URL + next."""
        return request.base_url + '?next=' + url_quote(self.get_next_url())
mock_yarn.py 文件源码 项目:yarnitor 作者: maxpoint 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def metrics():
    """Mock of the YARN cluster metrics REST resource."""
    if 'last' in request.args:
        return jsonify(redis.get(request.base_url))

    d = st.fixed_dictionaries({
        'activeNodes': st.integers(0),
        'allocatedMB': st.integers(0),
        'allocatedVirtualCores': st.integers(0),
        'appsCompleted': st.integers(0),
        'appsFailed': st.integers(0),
        'appsKilled': st.integers(0),
        'appsPending': st.integers(0),
        'appsRunning': st.integers(0),
        'appsSubmitted': st.integers(0),
        'availableMB': st.integers(0),
        'availableVirtualCores': st.integers(0),
        'containersAllocated': st.integers(0),
        'containersPending': st.integers(0),
        'containersReserved': st.integers(0),
        'decommissionedNodes': st.integers(0),
        'lostNodes': st.integers(0),
        'rebootedNodes': st.integers(0),
        'reservedMB': st.integers(0),
        'reservedVirtualCores': st.integers(0),
        'totalMB': st.integers(0),
        'totalNodes': st.integers(0),
        'totalVirtualCores': st.integers(0),
        'unhealthyNodes': st.integers(0)
    })
    result = json.dumps({
        'clusterMetrics': d.example()
    })
    redis.set(request.base_url, result)
    return jsonify(result)
__init__.py 文件源码 项目:books.archivelab.org 作者: ArchiveLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get(self, uri=None):      
        urlbase = request.base_url
        return dict([(urls[i+1].__name__.split(".")[-1].lower(),
                      urlbase + urls[i])
            for i in range(len(urls))[::2]])
flask-test.py 文件源码 项目:Python-scripts 作者: kojibhy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def home():
    html = 'HERE CAN BE PHISHING PAGE FOR  {}'.format(request.base_url)
    return html
wishlists.py 文件源码 项目:wishlists 作者: devops-golf-s17 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def index():
    #wishlist_url = request.base_url + 'wishlists'
    #return (jsonify(service='wishlists', version='0.1',
    #        url=wishlist_url), HTTP_200_OK)
    return app.send_static_file('index.html')
server.py 文件源码 项目:wishlists 作者: devops-golf-s17 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def index():
    #wishlist_url = request.base_url + 'wishlists'
    #return (jsonify(service='wishlists', version='0.1',
    #        url=wishlist_url), HTTP_200_OK)
    return app.send_static_file('index.html')
views.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _preemptive_unless(base_url=None, additional_unless=None):
    if base_url is None:
        base_url = request.url_root

    disabled_for_root = not settings().getBoolean(["devel", "cache", "preemptive"]) \
                        or base_url in settings().get(["server", "preemptiveCache", "exceptions"]) \
                        or not (base_url.startswith("http://") or base_url.startswith("https://"))

    recording_disabled = request.headers.get("X-Preemptive-Record", "yes") == "no"

    if callable(additional_unless):
        return recording_disabled or disabled_for_root or additional_unless()
    else:
        return recording_disabled or disabled_for_root
views.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _preemptive_data(key, path=None, base_url=None, data=None, additional_request_data=None):
    if path is None:
        path = request.path
    if base_url is None:
        base_url = request.url_root

    d = dict(path=path,
             base_url=base_url,
             query_string="l10n={}".format(g.locale.language if g.locale else "en"))

    if key != "_default":
        d["plugin"] = key

    # add data if we have any
    if data is not None:
        try:
            if callable(data):
                data = data()
            if data:
                if "query_string" in data:
                    data["query_string"] = "l10n={}&{}".format(g.locale.language, data["query_string"])
                d.update(data)
        except:
            _logger.exception("Error collecting data for preemptive cache from plugin {}".format(key))

    # add additional request data if we have any
    if callable(additional_request_data):
        try:
            ard = additional_request_data()
            if ard:
                d.update(dict(
                    _additional_request_data=ard
                ))
        except:
            _logger.exception("Error retrieving additional data for preemptive cache from plugin {}".format(key))

    return d
paging.py 文件源码 项目:graygram-web 作者: devxoul 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def next_url(limit=None, offset=None):
    limit = limit or request.values.get('limit', 30, type=int)
    offset = offset or request.values.get('offset', 0, type=int)
    values = request.values.to_dict()
    values['offset'] = limit + offset
    return request.base_url + '?' + urllib.urlencode(values)
authentication_service.py 文件源码 项目:tasking-manager 作者: hotosm 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _get_email_validated_url(is_valid: bool) -> str:
        """ Helper function to generate redirect url for email verification """
        base_url = current_app.config['APP_BASE_URL']

        verification_params = {'is_valid': is_valid}
        verification_url = '{0}/validate-email?{1}'.format(base_url, urllib.parse.urlencode(verification_params))
        return verification_url
authentication_service.py 文件源码 项目:tasking-manager 作者: hotosm 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_authentication_failed_url():
        """ Generates the auth-failed URL for the running app """
        base_url = current_app.config['APP_BASE_URL']
        auth_failed_url = f'{base_url}/auth-failed'
        return auth_failed_url
authentication_service.py 文件源码 项目:tasking-manager 作者: hotosm 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_authorized_url(username, session_token, redirect_to):
        """ Generate URL that we'll redirect the user to once authenticated """
        base_url = current_app.config['APP_BASE_URL']

        redirect_query = ''
        if redirect_to:
            redirect_query = f'&redirect_to={urllib.parse.quote(redirect_to)}'

        # Trailing & added as Angular a bit flaky with parsing querystring
        authorized_url = f'{base_url}/authorized?username={urllib.parse.quote(username)}&session_token={session_token}&ng=0' \
                         f'{redirect_query}'
        return authorized_url
flask_openid.py 文件源码 项目:micro-blog 作者: nickChenyx 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_current_url(self):
        """the current URL + next."""
        return request.base_url + '?next=' + url_quote(self.get_next_url())
flask_openid.py 文件源码 项目:Hawkeye 作者: tozhengxq 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_current_url(self):
        """the current URL + next."""
        return request.base_url + '?next=' + url_quote(self.get_next_url())
blueprint.py 文件源码 项目:tm-manifesting 作者: FabricAttachedMemory 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def web_node_all():
    nodes_info = BP.nodes
    for node in nodes_info:
        status = get_node_status(node.coordinate)
        if status:
            node.manifest = status['manifest']
            node.status = status['status']

    return render_template(
        _ERS_element + '_all.tpl',
        label=__doc__,
        nodes=BP.nodes,
        base_url=request.url)


问题


面经


文章

微信
公众号

扫码关注公众号