python类base_url()的实例源码

blueprint.py 文件源码 项目:tm-manifesting 作者: FabricAttachedMemory 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def web_node_button_action(name=None):
    # Either way, name has no leading slash.
    if 'unbind' in request.form:
        delete_node_binding(name)
    elif 'bind' in request.form:
        manname = request.form['manifest_sel']
        manifest = BP.manifest_lookup(manname)
        build_node(manifest, name)
    return redirect(request.base_url)   # Eliminates browser caching of POST
manifest_api.py 文件源码 项目:tm-manifesting 作者: FabricAttachedMemory 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def root():
    return render_template(
        'index.tpl',
        api_version=mainapp.config['API_VERSION'],
        base_url=request.base_url,
        mirror=mainapp.config['L4TM_MIRROR'],
        release=mainapp.config['L4TM_RELEASE'],
        rules=mainapp.config['rules'],
        url_root=request.url_root,
        coordinate=mainapp.config['tmconfig'].racks[1]['coordinate'])

###########################################################################
# Networking stuff
views.py 文件源码 项目:MagicPress 作者: huang-zp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def article(article_id):
    comment_form = CommentForm()
    if comment_form.validate_on_submit():
        from MagicPress.utils.tasks import send_async_email
        new_comment = Comment(username=comment_form.name.data)
        new_comment.text = comment_form.text.data
        new_comment.create_time = datetime.utcnow()
        new_comment.site = comment_form.site.data
        new_comment.email = comment_form.email.data
        new_comment.ip = request.remote_addr
        new_comment.language = request.accept_languages.best
        new_comment.os = request.user_agent.platform
        new_comment.browser = request.user_agent.browser
        new_comment.article_id = str(request.base_url).split('/')[-1]
        info = get_ip_info(request.remote_addr)
        new_comment.location = info['country']+info['region']+info['city']
        new_comment.network = info['isp']
        if gfw.filter(comment_form.text.data) or gfw.filter(comment_form.name.data):
            new_comment.hidden = False
            flash(u'????????????')
        else:
            new_comment.hidden = True

        message_details = {}
        message_details['subject'] = 'New Comment'
        message_details['recipients'] = [current_app.config['ADMIN_EMAIL']]
        message_details['body'] = "Name:     %s\nEmail:     %s\nSite:    %s\nLocation:    %s\n" \
                                  "Hihhen:    %s\nText:\n\n             %s" % (
            comment_form.name.data, current_app.config['ADMIN_EMAIL'], comment_form.site.data,
            info['country']+info['region']+info['city'], str(new_comment.hidden), comment_form.text.data)
        send_async_email.delay(message_details)
        db.session.add(new_comment)
        db.session.commit()
    the_article = Article.query.filter_by(id=article_id).first()
    next_article = db.session.query(Article).filter(Article.id < article_id, Article.state == True).order_by(Article.id.desc()).first()
    pre_article = db.session.query(Article).filter(Article.id > article_id, Article.state == True).order_by(Article.id.asc()).first()
    comments = Comment.query.filter_by(article_id=article_id, hidden=True).all()
    return render_template(get_theme() + '/article.html', article=the_article, next_article=next_article,
                           pre_article=pre_article, comment_form=comment_form, comments=comments)
flask_swagger_ui.py 文件源码 项目:flask-swagger-ui 作者: sveint 项目源码 文件源码 阅读 13 收藏 0 点赞 0 评论 0
def get_swaggerui_blueprint(base_url, api_url, config=None, oauth_config=None):

    swagger_ui = Blueprint('swagger_ui',
                           __name__,
                           static_folder='dist',
                           template_folder='templates')

    default_config = {
        'app_name': 'Swagger UI',
        'dom_id': '#swagger-ui',
        'url': api_url,
        'layout': 'StandaloneLayout'
    }

    if config:
        default_config.update(config)

    fields = {
        # Some fields are used directly in template
        'base_url': base_url,
        'app_name': default_config.pop('app_name'),
        # Rest are just serialized into json string for inclusion in the .js file
        'config_json': json.dumps(default_config),

    }
    if oauth_config:
        fields['oauth_config_json'] = json.dumps(oauth_config)

    @swagger_ui.route('/')
    @swagger_ui.route('/<path:path>')
    def show(path=None):
        if not path or path == 'index.html':
            if not default_config.get('oauth2RedirectUrl', None):
                default_config.update(
                    {"oauth2RedirectUrl": "%s/oauth2-redirect.html" % request.base_url}
                )
                fields['config_json'] = json.dumps(default_config)
            return render_template('index.template.html', **fields)
        else:
            return send_from_directory(
                # A bit of a hack to not pollute the default /static path with our files.
                os.path.join(
                    swagger_ui.root_path,
                    swagger_ui._static_folder
                ),
                path
            )

    return swagger_ui
mock_yarn.py 文件源码 项目:yarnitor 作者: maxpoint 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def applications():
    """Mock of the YARN cluster apps REST resource."""
    if 'last' in request.args:
        return jsonify(redis.get(request.base_url))

    d = st.fixed_dictionaries({
        'allocatedMB': st.integers(-1),
        'allocatedVCores': st.integers(-1),
        'amContainerLogs': st.text(),
        'amHostHttpAddress': st.text(),
        'applicationTags': st.text(),
        'applicationType': st.sampled_from(['MAPREDUCE', 'SPARK']),
        'clusterId': st.integers(0),
        'diagnostics': st.text(),
        'elapsedTime': st.integers(0),
        'finalStatus': st.sampled_from(['UNDEFINED', 'SUCCEEDED', 'FAILED', 'KILLED']),
        'finishedTime': st.integers(0),
        'id': st.text(string.ascii_letters, min_size=5, max_size=25),
        'memorySeconds': st.integers(0),
        'name': st.text(min_size=5),
        'numAMContainerPreempted': st.integers(0),
        'numNonAMContainerPreempted': st.integers(0),
        'preemptedResourceMB': st.integers(0),
        'preemptedResourceVCores': st.integers(0),
        'progress': st.floats(0, 100),
        'queue': st.text(),
        'runningContainers': st.integers(-1),
        'startedTime': st.integers(0),
        'state': st.sampled_from(['NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED', 'RUNNING', 'FINISHED', 'FAILED', 'KILLED']),
        'trackingUI': st.text(),
        'trackingUrl': st.just(os.environ['YARN_ENDPOINT']),
        'user': st.text(),
        'vcoreSeconds': st.integers(0)
    })

    result = json.dumps({
        'apps': {
            'app': st.lists(d, min_size=4, average_size=10).example()
        }
    })
    redis.set(request.base_url, result)
    return jsonify(result)
mock_yarn.py 文件源码 项目:yarnitor 作者: maxpoint 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def mapreduce_application():
    """Mock of the mapreduce jobs REST resource."""
    if 'last' in request.args:
        return jsonify(redis.get(request.base_url))

    d = st.fixed_dictionaries({
        'startTime': st.integers(0),
        'finishTime': st.integers(0),
        'elapsedTime': st.integers(0),
        'id': st.integers(0),
        'name': st.text(),
        'user': st.text(),
        'state': st.sampled_from(['NEW', 'SUCCEEDED', 'RUNNING', 'FAILED', 'KILLED']),
        'mapsTotal': st.integers(0),
        'mapsCompleted': st.integers(0),
        'reducesTotal': st.integers(0),
        'reducesCompleted': st.integers(0),
        'mapProgress': st.floats(0, 100),
        'reduceProgress': st.floats(0, 100),
        'mapsPending': st.integers(0),
        'mapsRunning': st.integers(0),
        'reducesPending': st.integers(0),
        'reducesRunning': st.integers(0),
        'uberized': st.booleans(),
        'diagnostics': st.text(),
        'newReduceAttempts': st.integers(0),
        'runningReduceAttempts': st.integers(0),
        'failedReduceAttempts': st.integers(0),
        'killedReduceAttempts': st.integers(0),
        'successfulReduceAttempts': st.integers(0),
        'newMapAttempts': st.integers(0),
        'runningMapAttempts': st.integers(0),
        'failedMapAttempts': st.integers(0),
        'killedMapAttempts': st.integers(0),
        'successfulMapAttempts': st.integers(0)
    })
    result = json.dumps({
        'jobs': {
            'job': st.lists(d, average_size=3).example()
        }
    })
    redis.set(request.base_url, result)
    return jsonify(result)
return_styles.py 文件源码 项目:airbnb_clone 作者: bennettbuchanan 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def list(select, request):
        '''Returns a styled list from the data passed. The pagination style
        is defined in the request data. The default values of `number` and
        `page` will be 10 and 1, respectively. If these parameters are passed
        as data in the request, then the values will be updated accordingly.

        Keyword arguments:
        select -- A database query of data.
        request -- A request of some type.
        '''
        number = 10
        page = 1

        for key in request.values:
            if key == 'number':
                number = int(request.values.get('number'))
            elif key == 'page':
                page = int(request.values.get('page'))

        '''Call peewee paginate method on the query.'''
        arr = []
        for i in select.paginate(page, number):
            arr.append(i.to_dict())

        '''By default, `next_page_path` and `prev_page_path` are None.'''
        next_page_path = None
        prev_page_path = None
        base_path = request.base_url + "?page="
        end_path = "&number=" + str(number)

        '''Update `next_page_path` and `prev_page_path` if there is data on
        either a next or previous page from the pagination.
        '''
        if len(arr) == number:
            next_page_path = base_path + str(page + 1) + end_path
        if page > 1:
            prev_page_path = base_path + str(page - 1) + end_path

        '''Return an array of dicts, containing the data and pagination.'''
        data = [dict(data=arr)]
        data.append(dict(paging=dict(next=next_page_path,
                                     previous=prev_page_path)))
        return data
views.py 文件源码 项目:SuperOcto 作者: mcecchi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def in_cache():
    url = request.base_url.replace("/cached.gif", "/")
    path = request.path.replace("/cached.gif", "/")
    base_url = request.url_root

    # select view from plugins and fall back on default view if no plugin will handle it
    ui_plugins = pluginManager.get_implementations(octoprint.plugin.UiPlugin,
                                                   sorting_context="UiPlugin.on_ui_render")
    for plugin in ui_plugins:
        if plugin.will_handle_ui(request):
            ui = plugin._identifier
            key = _cache_key(plugin._identifier,
                             url=url,
                             additional_key_data=plugin.get_ui_additional_key_data_for_cache)
            unless = _preemptive_unless(url, additional_unless=plugin.get_ui_preemptive_caching_additional_unless)
            data = _preemptive_data(plugin._identifier,
                                    path=path,
                                    base_url=base_url,
                                    data=plugin.get_ui_data_for_preemptive_caching,
                                    additional_request_data=plugin.get_ui_additional_request_data_for_preemptive_caching)
            break
    else:
        ui = "_default"
        key = _cache_key("_default", url=url)
        unless = _preemptive_unless(url)
        data = _preemptive_data("_default", path=path, base_url=base_url)

    response = make_response(bytes(base64.b64decode("R0lGODlhAQABAIAAAAAAAP///yH5BAEAAAAALAAAAAABAAEAAAIBRAA7")))
    response.headers["Content-Type"] = "image/gif"

    if unless or not preemptiveCache.has_record(data, root=path):
        _logger.info("Preemptive cache not active for path {}, ui {} and data {!r}, signaling as cached".format(path, ui, data))
        return response
    elif util.flask.is_in_cache(key):
        _logger.info("Found path {} in cache (key: {}), signaling as cached".format(path, key))
        return response
    elif util.flask.is_cache_bypassed(key):
        _logger.info("Path {} was bypassed from cache (key: {}), signaling as cached".format(path, key))
        return response
    else:
        _logger.debug("Path {} not yet cached (key: {}), signaling as missing".format(path, key))
        return abort(404)
firetactoe.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def main_page():
    """Renders the main page. When this page is shown, we create a new
    channel to push asynchronous updates to the client."""
    user = users.get_current_user()
    game_key = request.args.get('g')

    if not game_key:
        game_key = user.user_id()
        game = Game(id=game_key, userX=user, moveX=True, board=' '*9)
        game.put()
    else:
        game = Game.get_by_id(game_key)
        if not game:
            return 'No such game', 404
        if not game.userO:
            game.userO = user
            game.put()

    # [START pass_token]
    # choose a unique identifier for channel_id
    channel_id = user.user_id() + game_key
    # encrypt the channel_id and send it as a custom token to the
    # client
    # Firebase's data security rules will be able to decrypt the
    # token and prevent unauthorized access
    client_auth_token = create_custom_token(channel_id)
    _send_firebase_message(channel_id, message=game.to_json())

    # game_link is a url that you can open in another browser to play
    # against this player
    game_link = '{}?g={}'.format(request.base_url, game_key)

    # push all the data to the html template so the client will
    # have access
    template_values = {
        'token': client_auth_token,
        'channel_id': channel_id,
        'me': user.user_id(),
        'game_key': game_key,
        'game_link': game_link,
        'initial_message': urllib.unquote(game.to_json())
    }

    return flask.render_template('fire_index.html', **template_values)
    # [END pass_token]
bugsnag_extra.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add_pillar_request_to_notification(notification):
    """Adds request metadata to the Bugsnag notifications.

    This basically copies bugsnag.flask.add_flask_request_to_notification,
    but is altered to include Pillar-specific metadata.
    """
    from flask import request, session
    from bugsnag.wsgi import request_path
    import pillar.auth

    if not request:
        return

    notification.context = "%s %s" % (request.method,
                                      request_path(request.environ))

    if 'id' not in notification.user:
        user: pillar.auth.UserClass = pillar.auth.current_user._get_current_object()
        notification.set_user(id=user.user_id,
                              email=user.email,
                              name=user.username)
        notification.user['roles'] = sorted(user.roles)
        notification.user['capabilities'] = sorted(user.capabilities)

    session_dict = dict(session)
    for key in SESSION_KEYS_TO_REMOVE:
        try:
            del session_dict[key]
        except KeyError:
            pass
    notification.add_tab("session", session_dict)
    notification.add_tab("environment", dict(request.environ))

    remote_addr = request.remote_addr
    forwarded_for = request.headers.get('X-Forwarded-For')
    if forwarded_for:
        remote_addr = f'{forwarded_for} (proxied via {remote_addr})'

    notification.add_tab("request", {
        "method": request.method,
        "url": request.base_url,
        "headers": dict(request.headers),
        "params": dict(request.form),
        "data": {'request.data': request.data,
                 'request.json': request.get_json()},
        "endpoint": request.endpoint,
        "remote_addr": remote_addr,
    })
blueprint.py 文件源码 项目:tm-manifesting 作者: FabricAttachedMemory 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def web_node_status(name=None):
    '''name will never have a leading / but now always needs one.'''
    name = '/' + name
    try:
        node = BP.nodes[name][0]
        ESPURL = None    # testable value in Jinja2
        ESPsizeMB = 0
        installsh = installlog = None
        status = get_node_status(name)
        if status is not None:
            if status['status'] == 'ready':
                ESPpath = '%s/%s/%s.ESP' % (
                    BP.config['TFTP_IMAGES'], node.hostname, node.hostname)
                if os.path.isfile(ESPpath):
                    prefix = request.url.split(_ERS_element)[0]
                    ESPURL = '%s%s/ESP/%s' % (
                        prefix, _ERS_element, node.hostname)
                    ESPsizeMB = os.stat(ESPpath).st_size >> 20

            if status['status'] in ('building', 'ready'):
                installpath = '%s/%s/untar/root' % (
                    BP.config['FILESYSTEM_IMAGES'], node.hostname)
                try:
                    with open(installpath + '/install.sh') as f:
                        installsh = f.read()
                    with open(installpath + '/install.log') as f:
                        installlog = f.read()
                except Exception as e:
                    installsh = installlog = None
                    pass

        # all manifests' names with namespace
        manifests = sorted(BP.blueprints['manifest'].get_all())
        return render_template(
            _ERS_element + '.tpl',
            label=__doc__,
            node=node,
            manifests=manifests,
            status=status,
            base_url=request.url.split(name)[0],
            ESPURL=ESPURL,
            ESPsizeMB=ESPsizeMB,
            installsh=installsh,
            installlog=installlog
        )
    except Exception as e:
        return make_response('Kaboom: %s' % str(e), 404)
return_styles.py 文件源码 项目:holbertonschool_airbnb_clone 作者: johndspence 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def list(select, request):
        '''Returns a styled list from the data passed. The pagination style
        is defined in the request data. The default values of `number` and
        `page` will be 10 and 1, respectively. If these parameters are passed
        as data in the request, then the values will be updated accordingly.

        Keyword arguments:
        select -- A database query of data.
        request -- A request of some type.
        '''
        number = 10
        page = 1

        for key in request.values:
            if key == 'number':
                number = int(request.values.get('number'))
            elif key == 'page':
                page = int(request.values.get('page'))

        '''Call peewee paginate method on the query.'''
        arr = []
        for i in select.paginate(page, number):
            arr.append(i.to_dict())

        '''By default, `next_page_path` and `prev_page_path` are None.'''
        next_page_path = None
        prev_page_path = None
        base_path = request.base_url + "?page="
        end_path = "&number=" + str(number)

        '''Update `next_page_path` and `prev_page_path` if there is data on
        either a next or previous page from the pagination.
        '''
        if len(arr) == number:
            next_page_path = base_path + str(page + 1) + end_path
        if page > 1:
            prev_page_path = base_path + str(page - 1) + end_path

        '''Return an array of dicts, containing the data and pagination.'''
        data = [dict(data=arr)]
        data.append(dict(paging=dict(next=next_page_path,
                                     previous=prev_page_path)))
        return data


问题


面经


文章

微信
公众号

扫码关注公众号