python类host_url()的实例源码

report.py 文件源码 项目:python-ares 作者: pynog 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def createTeam():
  """ """
  if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]:
    newTeam = Team.query.filter_by(team_name=request.args['team']).first()
    dbEnv = AresSql.SqliteDB(request.args['report_name'])
    if not newTeam:
      team = Team(request.args['team'], request.args['team_email'])
      db.session.add(team)
      db.session.commit()
      newTeam = Team.query.filter_by(team_name=request.args['team']).first()
    team_id = newTeam.team_id
    role = request.args.get('role', 'user')
    dbEnv.modify("""INSERT INTO team_def (team_id, team_name, role) VALUES (%s, '%s', '%s');
                 INSERT INTO env_auth (env_id, team_id)
                 SELECT env_def.env_id, %s
                 FROM env_def
                 WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name']))
    return json.dumps('Success'), 200

  return json.dumps('Forbidden', 403)
hal.py 文件源码 项目:chaos-monkey-engine 作者: BBVA 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        """Initialises a new ``Self`` link instance. Accepts the same
        Keyword Arguments as :class:`.Link`.

        Additional Keyword Args:
            external (bool): if true, force link to be fully-qualified URL, defaults to False

        See Also:
            :class:`.Link`
        """

        url = request.url
        external = kwargs.get('external', False)
        if not external and current_app.config['SERVER_NAME'] is None:
            url = request.url.replace(request.host_url, '/')

        return super(Self, self).__init__('self', url, **kwargs)
app.py 文件源码 项目:Office365-SharePoint-Python-Flask-Sample 作者: OneBitSoftware 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def auth():
    # Handles the Azure AD authorization endpoint response and sends second response to get access token.
    try:
        # Gets the token_id from the flask response form dictionary.
        token_id = request.form['id_token']
        # Gets the authorization code from the flask response form dictionary.
        code = request.form['code']
        # Constructs redirect uri to be send as query string param to Azure AD token issuance endpoint.
        redirect_uri = '{0}auth'.format(request.host_url)
        # Constructs Azure AD token issuance endpoint url.
        url = issuance_url(token_id, c['AUTHORITY'])
        # Requests access token and stores it in session.
        token = access_token(url, redirect_uri, c['CLIENT_ID'], code, c['CLIENT_SECRET'])
        if token != '':
            session['access_token'] = token
        else:
            flash('Could not get access token.')
    except:
        flash('Something went wrong.')
    return redirect(url_for('home'))

# This script runs the application using a development server.
app.py 文件源码 项目:line-bot-sdk-python 作者: line 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle_content_message(event):
    if isinstance(event.message, ImageMessage):
        ext = 'jpg'
    elif isinstance(event.message, VideoMessage):
        ext = 'mp4'
    elif isinstance(event.message, AudioMessage):
        ext = 'm4a'
    else:
        return

    message_content = line_bot_api.get_message_content(event.message.id)
    with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix=ext + '-', delete=False) as tf:
        for chunk in message_content.iter_content():
            tf.write(chunk)
        tempfile_path = tf.name

    dist_path = tempfile_path + '.' + ext
    dist_name = os.path.basename(dist_path)
    os.rename(tempfile_path, dist_path)

    line_bot_api.reply_message(
        event.reply_token, [
            TextSendMessage(text='Save content.'),
            TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name))
        ])
app.py 文件源码 项目:line-bot-sdk-python 作者: line 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_file_message(event):
    message_content = line_bot_api.get_message_content(event.message.id)
    with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix='file-', delete=False) as tf:
        for chunk in message_content.iter_content():
            tf.write(chunk)
        tempfile_path = tf.name

    dist_path = tempfile_path + '-' + event.message.file_name
    dist_name = os.path.basename(dist_path)
    os.rename(tempfile_path, dist_path)

    line_bot_api.reply_message(
        event.reply_token, [
            TextSendMessage(text='Save file.'),
            TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name))
        ])
main.py 文件源码 项目:pizza-auth 作者: xxpizzaxx 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def forgot_password():
    if request.method=="GET":
        return render_template("forgot_password.html")
    username = request.form["username"]
    email = request.form["email"]
    try:
        user = ldaptools.getuser(username)
        assert(user)
        assert(email == user.email[0])
        token = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(24))
        url = request.host_url+"recovery/"+token
        recoverymap[token] = username
        emailtools.render_email(email, "Password Recovery", "forgot_password.txt", url=url, config=app.config)
        flash("Email sent to "+email, "success")
    except Exception as e:
        print e
        flash("Username/Email mismatch", "danger")
    return redirect("/login")
api.py 文件源码 项目:mincloud 作者: number13dev 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def api_makepublic():
    if request.method == 'GET':
        uniqueid = request.args['uniqueid']

        file = File.query.filter_by(unique_id=uniqueid).first()

        if file is not None:
            if g.user.admin or (g.user.id == file.uploader_id):
                key = PublicKey()
                key.public = True
                if file.publickey is None:
                    file.publickey = key
                    db.session.commit()
                    url = request.host_url + "pub/dl/" + key.hash
                    button = get_sharebutton(file.publickey, 'ban', "Disable Public")
                    return jsonify(response=responds['PUBLIC_KEY_GENERATED'], url=url, button=button)
                else:
                    url = request.host_url + "pub/dl/" + file.publickey.hash
                    return jsonify(response=responds['PUBLIC_KEY_ALREADY_GEN'], url=url)

    return jsonify(response=responds['SOME_ERROR'])
integrations_shopify.py 文件源码 项目:drip 作者: Dripitio 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def initiate():
    """
    1. step
    Initiate app installation
    """
    args = request.args

    # get shop url from args
    shop_url = args.get('shop')
    # TODO: validate HMAC, so we know that request really is from shopify

    if not current_user.is_authenticated:
        return redirect(url_for('main.signup', next=url_join(request.host_url,
                                                             url_for('shopify.initiate',
                                                                     shop=shop_url))))

    api_key = current_app.config['SHOPIFY_API_KEY']
    secret = current_app.config['SHOPIFY_API_SECRET']
    url = get_permission_url(shop_url, api_key, secret)
    return redirect(url)
auth.py 文件源码 项目:luminance 作者: nginth 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and \
        ref_url.netloc == test_url.netloc
utils.py 文件源码 项目:sysu-ctf 作者: ssst0n3 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
utils.py 文件源码 项目:globus-sample-data-portal 作者: globus 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_safe_redirect_url(target):
    """https://security.openstack.org/guidelines/dg_avoid-unvalidated-redirects.html"""  # noqa
    host_url = urlparse(request.host_url)
    redirect_url = urlparse(urljoin(request.host_url, target))

    return redirect_url.scheme in ('http', 'https') and \
        host_url.netloc == redirect_url.netloc
app.py 文件源码 项目:Office365-SharePoint-Python-Flask-Sample 作者: OneBitSoftware 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def home(): 
    # Renders the home page. 
    redirect_uri = '{0}auth'.format(request.host_url)
    # Generates Azure AD authorization endpoint url with parameters so the user authenticates and consents, if consent is required.
    url = login_url(redirect_uri, c['CLIENT_ID'], c['RESOURCE'], c['AUTHORITY'])
    user = {}
    # Checks if access token has already been set in flask session.
    if 'access_token' in session:
        # Gets authenticated user details from SharePoint tenant if access token is acquired.
        user = user_details(c['RESOURCE'], session['access_token'])
    # Renders the index template with additional params for the login url and user.
    return render_template('index.html', url=url, user=user)
sql_manager.py 文件源码 项目:mysql_audit 作者: ycg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_mail_for_execute_success(sql_id):
    if (settings.EMAIL_SEND_ENABLE):
        sql_info = get_sql_info_by_id(sql_id)
        sql_info.status_str = settings.SQL_WORK_STATUS_DICT[sql_info.status]
        sql_info.host_url = request.host_url
        sql_info.email = "{0},{1}".format(cache.MyCache().get_user_email(sql_info.create_user_id), cache.MyCache().get_user_email(sql_info.audit_user_id))
        if (len(sql_info.email) > 0):
            subject = "SQL??-[{0}]-????".format(sql_info.title)
            sql_info.work_url = "{0}execute/sql/execute/new/{1}".format(request.host_url, sql_info.id)
            content = render_template("mail_template.html", sql_info=sql_info)
            common_util.send_html(subject, sql_info.email, content)


# ??????????
# ???????????
run.py 文件源码 项目:GOKU 作者: bingweichen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_request_log():
    # parse args and forms
    values = ''
    if len(request.values) > 0:
        for key in request.values:
            values += key + ': ' + request.values[key] + ', '
    route = '/' + request.base_url.replace(request.host_url, '')
    request_log = {
        'route': route,
        'method': request.method,
    }

    # ??xml ??
    category = request_log['route'].split('/')[1]
    if category != "virtual_card":
        return request_log

    if len(request.get_data()) != 0:
        body = json.loads(request.get_data())
        if "password" in body:
            body.pop("password")
        request_log['body'] = body

    if len(values) > 0:
        request_log['values'] = values
    return request_log
functions.py 文件源码 项目:picmeup 作者: zhonghcc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and\
           ref_url.netloc == test_url.netloc
util.py 文件源码 项目:graph-data-experiment 作者: occrp-attic 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and \
        ref_url.netloc == test_url.netloc
views.py 文件源码 项目:hubscrub 作者: sschneid 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def show_vulnerability(vulnerability_id):
    if authenticated() is False:
        return render_template('login.html',
            return_to='?return_to=' + request.host_url  + 'vulnerability/{}'.format(vulnerability_id)
        )

    return render_template('vulnerability.html', vulnerability_id=vulnerability_id)
auth.py 文件源码 项目:knowledge-repo 作者: airbnb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
auth.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def is_safe_url(target: str) -> bool:
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return (
        # same scheme
        test_url.scheme in ('http', 'https') and
        # same host and port
        ref_url.netloc == test_url.netloc and
        # and different endoint
        ref_url.path != test_url.path
    )
graphs.py 文件源码 项目:afl-mothership 作者: afl-mothership 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def render_graph():
    url = request.args.get('url')
    if not url:
        return 'Specify a graph URL in the request', 400
    return render_template('graph.html', url=request.host_url[:-1] + url)
fuzzers.py 文件源码 项目:afl-mothership 作者: afl-mothership 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def register():
    hostname = request.args.get('hostname')
    master = request.args.get('master')

    if not master:
        campaign = get_best_campaign()
        if not campaign:
            return 'No active campaigns', 404
        instance = models.FuzzerInstance.create(hostname=hostname)
        instance.start_time = time.time()
        campaign.fuzzers.append(instance)
        campaign.commit()
    else:
        campaign = models.Campaign.get(id=master)
        if not campaign:
            return 'Could not find specified campaign', 404
        if campaign.fuzzers.filter_by(master=True).first():
            return 'Campaign already has a master', 400
        instance = models.FuzzerInstance.create(hostname=hostname, master=True)
        instance.start_time = time.time()
        campaign.fuzzers.append(instance)
        campaign.commit()

    # avoid all hosts uploading at the same time from reporting at the same time
    deviation = random.randint(15, 30)
    return jsonify(
        id=instance.id,
        name=secure_filename(instance.name),
        program=campaign.executable_name,
        program_args=campaign.executable_args.split(' ') if campaign.executable_args else [],  # TODO: add support for spaces
        args=campaign.afl_args.split(' ') if campaign.afl_args else [],

        campaign_id=campaign.id,
        campaign_name=secure_filename(campaign.name),

        download=request.host_url[:-1] + url_for('fuzzers.download', campaign_id=campaign.id),
        submit=request.host_url[:-1] + url_for('fuzzers.submit', instance_id=instance.id),
        submit_crash=request.host_url[:-1] + url_for('fuzzers.submit_crash', instance_id=instance.id),
        upload=request.host_url[:-1] + url_for('fuzzers.upload', instance_id=instance.id),
        upload_in=current_app.config['UPLOAD_FREQUENCY'] + deviation
    )
fuzzers.py 文件源码 项目:afl-mothership 作者: afl-mothership 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def download(campaign_id):
    campaign = models.Campaign.get(id=campaign_id)
    sync_dir = os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(campaign.name), 'sync_dir', '*.tar')
    return jsonify(
        executable=request.host_url[:-1] + url_for('fuzzers.download_executable', campaign_id=campaign.id),
        libraries=request.host_url[:-1] + url_for('fuzzers.download_libraries', campaign_id=campaign.id),
        testcases=request.host_url[:-1] + url_for('fuzzers.download_testcases', campaign_id=campaign.id),
        ld_preload=request.host_url[:-1] + url_for('fuzzers.download_ld_preload', campaign_id=campaign.id),
        dictionary=request.host_url[:-1] + url_for('fuzzers.download_dictionary', campaign_id=campaign.id) if campaign.has_dictionary else None,
        sync_dirs=[
            request.host_url[:-1] + url_for('fuzzers.download_syncdir', campaign_id=campaign.id, filename=os.path.basename(filename)) for filename in glob.glob(sync_dir)
        ],
        sync_in=current_app.config['DOWNLOAD_FREQUENCY'],
    )
fuzzers.py 文件源码 项目:afl-mothership 作者: afl-mothership 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def analysis_queue(campaign_id):
    campaign = models.Campaign.get(id=campaign_id)
    return jsonify(
        program=campaign.executable_name,
        program_args=campaign.executable_args.split(' ') if campaign.executable_args else [],  # TODO: add support for spaces
        crashes=[{
            'crash_id': crash.id,
            'download': request.host_url[:-1] + url_for('fuzzers.download_crash', crash_id=crash.id)
        } for crash in campaign.crashes.filter_by(analyzed=False)]
    )
search.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _build_bundle_url(hit: dict, replica: Replica) -> str:
    uuid, version = hit['_id'].split('.', 1)
    return request.host_url + str(UrlBuilder()
                                  .set(path='v1/bundles/' + uuid)
                                  .add_query("version", version)
                                  .add_query("replica", replica.name)
                                  )
search.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _build_scroll_url(_scroll_id: str, per_page: int, replica: Replica, output_format: str) -> str:
    return request.host_url + str(UrlBuilder()
                                  .set(path="v1/search")
                                  .add_query('per_page', str(per_page))
                                  .add_query("replica", replica.name)
                                  .add_query("_scroll_id", _scroll_id)
                                  .add_query("output_format", output_format)
                                  )
api.py 文件源码 项目:mincloud 作者: number13dev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def api_unpublish():
    if request.method == 'GET':
        uniqueid = request.args['uniqueid']
        file = File.query.filter_by(unique_id=uniqueid).first()
        if file is not None:
            if g.user.admin or (g.user.id == file.uploader_id):
                key = file.publickey

                if key is not None:
                    file.publickey.public = False
                    db.session.commit()
                    url = request.host_url + "pub/dl/" + key.hash
                    return jsonify(response=responds['PUBLIC_KEY_UNPUBLISH'], url=url)

    return jsonify(response=responds['SOME_ERROR'])
api.py 文件源码 项目:mincloud 作者: number13dev 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def api_publish():
    if request.method == 'GET':
        uniqueid = request.args['uniqueid']
        file = File.query.filter_by(unique_id=uniqueid).first()
        if file is not None:
            if g.user.admin or (g.user.id == file.uploader_id):
                key = file.publickey

                if (key is not None) and (key.public is False):
                    file.publickey.public = True
                    db.session.commit()
                    url = request.host_url + "pub/dl/" + key.hash
                    return jsonify(response=responds['PUBLIC_KEY_PUBLISH'], url=url)

    return jsonify(response=responds['SOME_ERROR'])
utils.py 文件源码 项目:extranet 作者: demaisj 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and \
        ref_url.netloc == test_url.netloc
__init__.py 文件源码 项目:Saylua 作者: LikeMyBread 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    if not target:
        return False
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
__init__.py 文件源码 项目:Saylua 作者: LikeMyBread 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def is_safe_url(target):
    if not target:
        return False
    ref_url = urlparse(request.host_url)
    test_url = urlparse(urljoin(request.host_url, target))
    return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc


问题


面经


文章

微信
公众号

扫码关注公众号