python类url_root()的实例源码

advisory.py 文件源码 项目:arch-security-tracker 作者: archlinux 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def advisory_atom():
    last_recent_entries = 15
    data = get_advisory_data()['published'][:last_recent_entries]
    feed = AtomFeed('Arch Linux Security - Recent advisories',
                    feed_url=request.url, url=request.url_root)

    for entry in data:
        advisory = entry['advisory']
        package = entry['package']
        title = '[{}] {}: {}'.format(advisory.id, package.pkgname, advisory.advisory_type)

        feed.add(title=title,
                 content=render_template('feed.html', content=advisory.content),
                 content_type='html',
                 summary=render_template('feed.html', content=advisory.impact),
                 summary_tpe='html',
                 author='Arch Linux Security Team',
                 url=TRACKER_ISSUE_URL.format(advisory.id),
                 published=advisory.created,
                 updated=advisory.created)
    return feed.get_response()
views.py 文件源码 项目:presidency 作者: jayrav13 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def rss_feed():

    feed = AtomFeed('White House Briefing Room Releases', feed_url=request.url, url=request.url_root)

    documents = WhiteHouse.query.order_by(WhiteHouse.document_date.desc())

    for document in documents:

        feed.add(document.title, document.tweet,
            content_type='text',
            author="@presproject2017",
            url=make_external(document.full_url),
            updated=document.document_date,
            published=document.document_date)

    return feed.get_response()
routes.py 文件源码 项目:sacredboard 作者: chovanecm 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def run_tensorboard(run_id, tflog_id):
    """Launch TensorBoard for a given run ID and log ID of that run."""
    data = current_app.config["data"]
    # optimisticaly suppose the run exists...
    run = data.get_run(run_id)
    base_dir = Path(run["experiment"]["base_dir"])
    log_dir = Path(run["info"]["tensorflow"]["logdirs"][tflog_id])
    # TODO ugly!!!
    if log_dir.is_absolute():
        path_to_log_dir = log_dir
    else:
        path_to_log_dir = base_dir.joinpath(log_dir)

    port = int(tensorboard.run_tensorboard(str(path_to_log_dir)))
    url_root = request.url_root
    url_parts = re.search("://([^:/]+)", url_root)
    redirect_to_address = url_parts.group(1)
    return redirect("http://%s:%d" % (redirect_to_address, port))
glance_dummy_api.py 文件源码 项目:son-emu 作者: sonata-nfv 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get(self):
        LOG.debug("API CALL: %s GET" % str(self.__class__.__name__))
        resp = dict()
        resp['versions'] = dict()
        versions = [{
            "status": "CURRENT",
            "id": "v2",
            "links": [
                {
                    "href": request.url_root + '/v2',
                    "rel": "self"
                }
            ]
        }]
        resp['versions'] = versions
        return Response(json.dumps(resp), status=200, mimetype='application/json')
neutron_dummy_api.py 文件源码 项目:son-emu 作者: sonata-nfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self):
        """
        Lists API versions.

        :return: Returns a json with API versions.
        :rtype: :class:`flask.response`
        """
        LOG.debug("API CALL: Neutron - List API Versions")
        resp = dict()
        resp['versions'] = dict()

        versions = [{
            "status": "CURRENT",
            "id": "v2.0",
            "links": [
                {
                    "href": request.url_root + '/v2.0',
                    "rel": "self"
                }
            ]
        }]
        resp['versions'] = versions

        return Response(json.dumps(resp), status=200, mimetype='application/json')
__init__.py 文件源码 项目:honeyd-python 作者: sookyp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_feed():
    from mhn.common.clio import Clio
    from mhn.auth import current_user
    authfeed = mhn.config['FEED_AUTH_REQUIRED']
    if authfeed and not current_user.is_authenticated():
        abort(404)
    feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url,
                    url=request.url_root)
    sessions = Clio().session.get(options={'limit': 1000})
    for s in sessions:
        feedtext = u'Sensor "{identifier}" '
        feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
        feedtext = feedtext.format(**s.to_dict())
        feed.add('Feed', feedtext, content_type='text',
                 published=s.timestamp, updated=s.timestamp,
                 url=makeurl(url_for('api.get_session', session_id=str(s._id))))
    return feed
views.py 文件源码 项目:veripress 作者: veripress 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def post(year, month, day, post_name):
    rel_url = request.path[len('/post/'):]
    fixed_rel_url = storage.fix_post_relative_url(rel_url)
    if rel_url != fixed_rel_url:
        return redirect(request.url_root + 'post/' + fixed_rel_url)  # it's not the correct relative url, so redirect

    post_ = storage.get_post(rel_url, include_draft=False)
    if post_ is None:
        abort(404)

    post_d = post_.to_dict()
    del post_d['raw_content']
    post_d['content'] = get_parser(post_.format).parse_whole(post_.raw_content)
    post_d['content'], post_d['toc'], post_d['toc_html'] = parse_toc(post_d['content'])
    post_d['url'] = make_abs_url(post_.unique_key)
    post_ = post_d

    return custom_render_template(post_['layout'] + '.html', entry=post_)
remote_control_cozmo.py 文件源码 项目:cozmo-python-sdk 作者: anki 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def streaming_video(url_root):
    '''Video streaming generator function'''
    try:
        while True:
            if remote_control_cozmo:
                image = get_annotated_image()

                img_io = io.BytesIO()
                image.save(img_io, 'PNG')
                img_io.seek(0)
                yield (b'--frame\r\n'
                    b'Content-Type: image/png\r\n\r\n' + img_io.getvalue() + b'\r\n')
            else:
                asyncio.sleep(.1)
    except cozmo.exceptions.SDKShutdown:
        # Tell the main flask thread to shutdown
        requests.post(url_root + 'shutdown')
views.py 文件源码 项目:pygameweb 作者: pygame 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def atom():
    """ of the news page.
    """
    resp = render_template('news/atom.xml', news=latest_news(current_session))
    response = make_response(resp)
    response.headers['Content-Type'] = 'application/atom+xml; charset=utf-8; filename=news-ATOM'
    return response

    # This makes output which crashes a feed validator.
    # from werkzeug.contrib.atom import AtomFeed
    # news=latest_news(current_session)
    # feed = AtomFeed('pygame news', feed_url=request.url, url=request.url_root)
    # for new in news:
    #     feed.add(new.title, new.description_html,
    #              content_type='html',
    #              author='pygame',
    #              url='https://www.pygame.org/news.html',
    #              updated=new.datetimeon,
    #              published=new.datetimeon)
    # return feed.get_response()
Update.py 文件源码 项目:FuzzFlow 作者: talos-vulndev 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get(self, hash):
        path = 'static' + os.sep + 'client.zip'
        try:
            os.remove(path)
        except:
            None
        zip = zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED)
        for root, dirs, files in os.walk(CLIENT_FOLDER):
            for f in files:
                zip.write(os.path.join(root, f))
        zip.close()

        client = open(path).read()

        if hash == hashlib.md5(client).hexdigest():
            return {"err": "invalid request"}, 400
        else:
            return {"url": request.url_root + path}, 200
util.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def server_netloc():
    """
    Figure out the name of the server end of the request, punting if it's
    the local host or not available.
    """

    return urlparse.urlparse(request.url_root).netloc


#
# URLs
#
util.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def internal_url(path):
    return request.url_root + path
util.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def root_url(path = None):
    return request.url_root + ("" if path is None else path)
views.py 文件源码 项目:presidency 作者: jayrav13 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_external(url):
    return urljoin(request.url_root, url)
namespaces.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def href_for(self, operation, qs=None, **kwargs):
        """
        Construct an full href for an operation against a resource.

        :parm qs: the query string dictionary, if any
        :param kwargs: additional arguments for path expansion

        """
        url = urljoin(request.url_root, self.url_for(operation, **kwargs))
        qs_character = "?" if url.find("?") == -1 else "&"

        return "{}{}".format(
            url,
            "{}{}".format(qs_character, urlencode(qs)) if qs else "",
        )
blog.py 文件源码 项目:akamatsu 作者: rmed 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def feed():
    """Return an atom feed for the blog."""
    feed = AtomFeed(
        '%s: Recent Posts' % app.config.get('SITENAME', 'akamatsu'),
        feed_url=request.url,
        url=request.url_root
    )

    posts = (
        Post.query
        .filter_by(is_published=True, ghost='')
        .order_by(Post.timestamp.desc())
        .limit(15)
    )

    for post in posts:
        # unicode conversion is needed for the content
        feed.add(
            post.title,
            markdown.render(post.content).unescape(),
            content_type='html',
            author=post.author.username,
            url=url_for('blog.show', slug=post.slug, _external=True),
            updated=post.timestamp
        )

    return feed.get_response()
sign_in_handler.py 文件源码 项目:encore 作者: statgen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_sign_in_view(target):
    signin_url = request.url_root + target
    oauth_service = OAuth2Service(
        name="google",
        client_id=current_app.config["GOOGLE_LOGIN_CLIENT_ID"],
        client_secret=current_app.config["GOOGLE_LOGIN_CLIENT_SECRET"],
        authorize_url=google_params.get("authorization_endpoint"),
        base_url=google_params.get("userinfo_endpoint"),
        access_token_url=google_params.get("token_endpoint"))

    if "code" in request.args:
        oauth_session = oauth_service.get_auth_session(
            data={"code": request.args["code"],
                  "grant_type": "authorization_code",
                  "redirect_uri": signin_url},
            decoder=json.loads)
        user_data = oauth_session.get("").json()
        user = load_user(user_data["email"])
        if user:
            flask_login.login_user(user)
            return redirect(url_for("index"))
        else:
            error_message = "Not an authorized user ({})".format(user_data["email"])
            return render_template("/sign_in.html", error_message=error_message)
    elif "authorize" in request.args:
        return redirect(oauth_service.get_authorize_url(
            scope="email",
            response_type="code",
            prompt="select_account",
            redirect_uri=signin_url))
    else:
        return render_template("/sign_in.html")
flask_openid.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_next_url(self):
        """Returns the URL where we want to redirect to.  This will
        always return a valid URL.
        """
        return (
            self.check_safe_root(request.values.get('next')) or
            self.check_safe_root(request.referrer) or
            (self.fallback_endpoint and
             self.check_safe_root(url_for(self.fallback_endpoint))) or
            request.url_root
        )
flask_openid.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_safe_root(self, url):
        if url is None:
            return None
        if self.safe_roots is None:
            return url
        if url.startswith(request.url_root) or url.startswith('/'):
            # A URL inside the same app is deemed to always be safe
            return url
        for safe_root in self.safe_roots:
            if url.startswith(safe_root):
                return url
        return None
helpers.py 文件源码 项目:fame 作者: certsocietegenerale 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def csrf_protect():
    if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
        referer = request.headers.get('Referer')

        if referer is None or different_origin(referer, request.url_root):
            raise Forbidden(description="Referer check failed.")
decorator.py 文件源码 项目:two1-python 作者: 21dotco 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def required(self, price, **kwargs):
        """API route decorator to request payment for a resource.

        This function stores the resource price in a closure. It will verify
        the validity of a payment, and allow access to the resource if the
        payment is successfully accepted.
        """
        def decorator(fn):
            """Validates payment and returns the original API route."""
            @wraps(fn)
            def _fn(*fn_args, **fn_kwargs):
                # Calculate resource cost
                nonlocal price
                _price = price(request, *fn_args, **fn_kwargs) if callable(price) else price

                # Need better way to pass server url to payment methods (FIXME)
                if 'server_url' not in kwargs:
                    url = urlparse(request.url_root)
                    kwargs.update({'server_url': url.scheme + '://' + url.netloc})

                # Continue to the API view if payment is valid or price is 0
                if _price == 0:
                    return fn(*fn_args, **fn_kwargs)
                try:
                    contains_payment = self.contains_payment(_price, request.headers, **kwargs)
                except BadRequest as e:
                    return Response(e.description, BAD_REQUEST)
                if contains_payment:
                    return fn(*fn_args, **fn_kwargs)
                else:
                    # Get headers for initial 402 response
                    payment_headers = {}
                    for method in self.allowed_methods:
                        payment_headers.update(method.get_402_headers(_price, **kwargs))
                    # Accessing the .files attribute of a request
                    # drains the input stream.
                    request.files
                    raise PaymentRequiredException(payment_headers)
            return _fn
        return decorator
views.py 文件源码 项目:duckpond 作者: alexmilowski 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def siteURL(config,request):
   u = current_app.config.get('SITE_URL')
   return u if u is not None else request.url_root[0:-1]
views.py 文件源码 项目:duckpond 作者: alexmilowski 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def page_not_found(error):
   return render_template_string(generate_template(current_app.config,'error.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], path=request.path, entry=None, error="I'm sorry.  I can't find that page.")
views.py 文件源码 项目:duckpond 作者: alexmilowski 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def send_doc(path):
   siteURL = current_app.config.get('SITE_URL')

   location = current_app.config.get('DOCS')
   if location is None:
      abort(404)

   if location[0:4]=='http':
      url = location + path
      req = requests.get(url, stream = True,headers={'Connection' : 'close'})
      if req.headers['Content-Type'][0:9]=='text/html':
         return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=req.text, entry=None)
      else:
         return Response(stream_with_context(req.iter_content()), headers = dict(req.headers))

   else:

      dir = os.path.abspath(location)
      if path.endswith('.html'):

         glob = StringIO()
         try:
            with open(os.path.join(dir,path), mode='r', encoding='utf-8') as doc:
               peeked = doc.readline()
               if peeked.startswith('<!DOCTYPE'):
                  return send_from_directory(dir, path)
               glob.write(peeked)
               for line in doc:
                  glob.write(line)

               return render_template_string(generate_template(current_app.config,'content.html'), siteURL=siteURL if siteURL is not None else request.url_root[0:-1], html=glob.getvalue(), entry=None)
         except FileNotFoundError:
            abort(404)

      return send_from_directory(dir, path)
app.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ipxe_boot(node):
    response = config_renderer.ipxe.render(node, request.url_root)
    return Response(response, mimetype='text/plain')
app.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def report(node):
    if not node.maintenance_mode:
        try:
            node.active_config_version = int(request.args.get('version'))
        except (ValueError, TypeError):
            return abort(400)

        if request.content_type != 'application/json':
            return abort(400)

        provision = models.Provision()
        provision.node = node
        provision.config_version = node.active_config_version
        provision.ignition_config = request.data
        if node.target_config_version == node.active_config_version:
            provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root)
        models.db.session.add(provision)

    models.db.session.add(node)

    node.disks.update({
        models.Disk.wipe_next_boot: False
    })

    if node.cluster.are_etcd_nodes_configured:
        node.cluster.assert_etcd_cluster_exists = True
        models.db.session.add(node.cluster)

    models.db.session.commit()
    return Response('ok', mimetype='application/json')
__init__.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_content(self):
        packages = [P(self.node, request.url_root) for P in self.get_package_classes()]
        files = list(itertools.chain.from_iterable(p.get_files() for p in packages))
        units = list(itertools.chain.from_iterable(p.get_units() for p in packages))
        networkd_units = list(itertools.chain.from_iterable(p.get_networkd_units() for p in packages))
        ssh_keys = self.get_ssh_keys()

        return {
            'ignition': {
                'version': '2.0.0',
                'config': {},
            },
            'storage': self.get_storage_config(files),
            'networkd': {
                'units': networkd_units
            },
            'passwd': {
                'users': [{
                    'name': 'root',
                    'sshAuthorizedKeys': ssh_keys,
                }, {
                    'name': 'core',
                    'sshAuthorizedKeys': ssh_keys,
                }],
            },
            'systemd': {
                'units': units
            },
        }
node.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def target_ipxe_config_view(self):
        node = self.get_one(request.args.get('id'))
        response = config_renderer.ipxe.render(node, request.url_root)
        return Response(response, mimetype='text/plain')
remote_control_cozmo.py 文件源码 项目:cozmo-python-sdk 作者: anki 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_cozmoImage():
    if is_microsoft_browser(request):
        return serve_single_image()
    return flask_helpers.stream_video(streaming_video, request.url_root)
auth.py 文件源码 项目:SwarmOps 作者: staugur 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def login():
    if g.auth:
        return redirect(url_for("index"))
    else:
        query = {"sso": True,
           "sso_r": SpliceURL.Modify(request.url_root, "/sso/").geturl,
           "sso_p": SSO["SSO.PROJECT"],
           "sso_t": md5("%s:%s" %(SSO["SSO.PROJECT"], SpliceURL.Modify(request.url_root, "/sso/").geturl))
        }
        SSOLoginURL = SpliceURL.Modify(url=SSO["SSO.URL"], path="/login/", query=query).geturl
        logger.info("User request login to SSO: %s" %SSOLoginURL)
        return redirect(SSOLoginURL)


问题


面经


文章

微信
公众号

扫码关注公众号