def createTeam():
""" """
if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]:
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
dbEnv = AresSql.SqliteDB(request.args['report_name'])
if not newTeam:
team = Team(request.args['team'], request.args['team_email'])
db.session.add(team)
db.session.commit()
newTeam = Team.query.filter_by(team_name=request.args['team']).first()
team_id = newTeam.team_id
role = request.args.get('role', 'user')
dbEnv.modify("""INSERT INTO team_def (team_id, team_name, role) VALUES (%s, '%s', '%s');
INSERT INTO env_auth (env_id, team_id)
SELECT env_def.env_id, %s
FROM env_def
WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name']))
return json.dumps('Success'), 200
return json.dumps('Forbidden', 403)
python类host_url()的实例源码
def __init__(self, **kwargs):
"""Initialises a new ``Self`` link instance. Accepts the same
Keyword Arguments as :class:`.Link`.
Additional Keyword Args:
external (bool): if true, force link to be fully-qualified URL, defaults to False
See Also:
:class:`.Link`
"""
url = request.url
external = kwargs.get('external', False)
if not external and current_app.config['SERVER_NAME'] is None:
url = request.url.replace(request.host_url, '/')
return super(Self, self).__init__('self', url, **kwargs)
app.py 文件源码
项目:Office365-SharePoint-Python-Flask-Sample
作者: OneBitSoftware
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def auth():
# Handles the Azure AD authorization endpoint response and sends second response to get access token.
try:
# Gets the token_id from the flask response form dictionary.
token_id = request.form['id_token']
# Gets the authorization code from the flask response form dictionary.
code = request.form['code']
# Constructs redirect uri to be send as query string param to Azure AD token issuance endpoint.
redirect_uri = '{0}auth'.format(request.host_url)
# Constructs Azure AD token issuance endpoint url.
url = issuance_url(token_id, c['AUTHORITY'])
# Requests access token and stores it in session.
token = access_token(url, redirect_uri, c['CLIENT_ID'], code, c['CLIENT_SECRET'])
if token != '':
session['access_token'] = token
else:
flash('Could not get access token.')
except:
flash('Something went wrong.')
return redirect(url_for('home'))
# This script runs the application using a development server.
def handle_content_message(event):
if isinstance(event.message, ImageMessage):
ext = 'jpg'
elif isinstance(event.message, VideoMessage):
ext = 'mp4'
elif isinstance(event.message, AudioMessage):
ext = 'm4a'
else:
return
message_content = line_bot_api.get_message_content(event.message.id)
with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix=ext + '-', delete=False) as tf:
for chunk in message_content.iter_content():
tf.write(chunk)
tempfile_path = tf.name
dist_path = tempfile_path + '.' + ext
dist_name = os.path.basename(dist_path)
os.rename(tempfile_path, dist_path)
line_bot_api.reply_message(
event.reply_token, [
TextSendMessage(text='Save content.'),
TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name))
])
def handle_file_message(event):
message_content = line_bot_api.get_message_content(event.message.id)
with tempfile.NamedTemporaryFile(dir=static_tmp_path, prefix='file-', delete=False) as tf:
for chunk in message_content.iter_content():
tf.write(chunk)
tempfile_path = tf.name
dist_path = tempfile_path + '-' + event.message.file_name
dist_name = os.path.basename(dist_path)
os.rename(tempfile_path, dist_path)
line_bot_api.reply_message(
event.reply_token, [
TextSendMessage(text='Save file.'),
TextSendMessage(text=request.host_url + os.path.join('static', 'tmp', dist_name))
])
def forgot_password():
if request.method=="GET":
return render_template("forgot_password.html")
username = request.form["username"]
email = request.form["email"]
try:
user = ldaptools.getuser(username)
assert(user)
assert(email == user.email[0])
token = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(24))
url = request.host_url+"recovery/"+token
recoverymap[token] = username
emailtools.render_email(email, "Password Recovery", "forgot_password.txt", url=url, config=app.config)
flash("Email sent to "+email, "success")
except Exception as e:
print e
flash("Username/Email mismatch", "danger")
return redirect("/login")
def api_makepublic():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = PublicKey()
key.public = True
if file.publickey is None:
file.publickey = key
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
button = get_sharebutton(file.publickey, 'ban', "Disable Public")
return jsonify(response=responds['PUBLIC_KEY_GENERATED'], url=url, button=button)
else:
url = request.host_url + "pub/dl/" + file.publickey.hash
return jsonify(response=responds['PUBLIC_KEY_ALREADY_GEN'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def initiate():
"""
1. step
Initiate app installation
"""
args = request.args
# get shop url from args
shop_url = args.get('shop')
# TODO: validate HMAC, so we know that request really is from shopify
if not current_user.is_authenticated:
return redirect(url_for('main.signup', next=url_join(request.host_url,
url_for('shopify.initiate',
shop=shop_url))))
api_key = current_app.config['SHOPIFY_API_KEY']
secret = current_app.config['SHOPIFY_API_SECRET']
url = get_permission_url(shop_url, api_key, secret)
return redirect(url)
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def is_safe_redirect_url(target):
"""https://security.openstack.org/guidelines/dg_avoid-unvalidated-redirects.html""" # noqa
host_url = urlparse(request.host_url)
redirect_url = urlparse(urljoin(request.host_url, target))
return redirect_url.scheme in ('http', 'https') and \
host_url.netloc == redirect_url.netloc
app.py 文件源码
项目:Office365-SharePoint-Python-Flask-Sample
作者: OneBitSoftware
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def home():
# Renders the home page.
redirect_uri = '{0}auth'.format(request.host_url)
# Generates Azure AD authorization endpoint url with parameters so the user authenticates and consents, if consent is required.
url = login_url(redirect_uri, c['CLIENT_ID'], c['RESOURCE'], c['AUTHORITY'])
user = {}
# Checks if access token has already been set in flask session.
if 'access_token' in session:
# Gets authenticated user details from SharePoint tenant if access token is acquired.
user = user_details(c['RESOURCE'], session['access_token'])
# Renders the index template with additional params for the login url and user.
return render_template('index.html', url=url, user=user)
def send_mail_for_execute_success(sql_id):
if (settings.EMAIL_SEND_ENABLE):
sql_info = get_sql_info_by_id(sql_id)
sql_info.status_str = settings.SQL_WORK_STATUS_DICT[sql_info.status]
sql_info.host_url = request.host_url
sql_info.email = "{0},{1}".format(cache.MyCache().get_user_email(sql_info.create_user_id), cache.MyCache().get_user_email(sql_info.audit_user_id))
if (len(sql_info.email) > 0):
subject = "SQL??-[{0}]-????".format(sql_info.title)
sql_info.work_url = "{0}execute/sql/execute/new/{1}".format(request.host_url, sql_info.id)
content = render_template("mail_template.html", sql_info=sql_info)
common_util.send_html(subject, sql_info.email, content)
# ??????????
# ???????????
def get_request_log():
# parse args and forms
values = ''
if len(request.values) > 0:
for key in request.values:
values += key + ': ' + request.values[key] + ', '
route = '/' + request.base_url.replace(request.host_url, '')
request_log = {
'route': route,
'method': request.method,
}
# ??xml ??
category = request_log['route'].split('/')[1]
if category != "virtual_card":
return request_log
if len(request.get_data()) != 0:
body = json.loads(request.get_data())
if "password" in body:
body.pop("password")
request_log['body'] = body
if len(values) > 0:
request_log['values'] = values
return request_log
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and\
ref_url.netloc == test_url.netloc
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
def show_vulnerability(vulnerability_id):
if authenticated() is False:
return render_template('login.html',
return_to='?return_to=' + request.host_url + 'vulnerability/{}'.format(vulnerability_id)
)
return render_template('vulnerability.html', vulnerability_id=vulnerability_id)
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def is_safe_url(target: str) -> bool:
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return (
# same scheme
test_url.scheme in ('http', 'https') and
# same host and port
ref_url.netloc == test_url.netloc and
# and different endoint
ref_url.path != test_url.path
)
def render_graph():
url = request.args.get('url')
if not url:
return 'Specify a graph URL in the request', 400
return render_template('graph.html', url=request.host_url[:-1] + url)
def register():
hostname = request.args.get('hostname')
master = request.args.get('master')
if not master:
campaign = get_best_campaign()
if not campaign:
return 'No active campaigns', 404
instance = models.FuzzerInstance.create(hostname=hostname)
instance.start_time = time.time()
campaign.fuzzers.append(instance)
campaign.commit()
else:
campaign = models.Campaign.get(id=master)
if not campaign:
return 'Could not find specified campaign', 404
if campaign.fuzzers.filter_by(master=True).first():
return 'Campaign already has a master', 400
instance = models.FuzzerInstance.create(hostname=hostname, master=True)
instance.start_time = time.time()
campaign.fuzzers.append(instance)
campaign.commit()
# avoid all hosts uploading at the same time from reporting at the same time
deviation = random.randint(15, 30)
return jsonify(
id=instance.id,
name=secure_filename(instance.name),
program=campaign.executable_name,
program_args=campaign.executable_args.split(' ') if campaign.executable_args else [], # TODO: add support for spaces
args=campaign.afl_args.split(' ') if campaign.afl_args else [],
campaign_id=campaign.id,
campaign_name=secure_filename(campaign.name),
download=request.host_url[:-1] + url_for('fuzzers.download', campaign_id=campaign.id),
submit=request.host_url[:-1] + url_for('fuzzers.submit', instance_id=instance.id),
submit_crash=request.host_url[:-1] + url_for('fuzzers.submit_crash', instance_id=instance.id),
upload=request.host_url[:-1] + url_for('fuzzers.upload', instance_id=instance.id),
upload_in=current_app.config['UPLOAD_FREQUENCY'] + deviation
)
def download(campaign_id):
campaign = models.Campaign.get(id=campaign_id)
sync_dir = os.path.join(current_app.config['DATA_DIRECTORY'], secure_filename(campaign.name), 'sync_dir', '*.tar')
return jsonify(
executable=request.host_url[:-1] + url_for('fuzzers.download_executable', campaign_id=campaign.id),
libraries=request.host_url[:-1] + url_for('fuzzers.download_libraries', campaign_id=campaign.id),
testcases=request.host_url[:-1] + url_for('fuzzers.download_testcases', campaign_id=campaign.id),
ld_preload=request.host_url[:-1] + url_for('fuzzers.download_ld_preload', campaign_id=campaign.id),
dictionary=request.host_url[:-1] + url_for('fuzzers.download_dictionary', campaign_id=campaign.id) if campaign.has_dictionary else None,
sync_dirs=[
request.host_url[:-1] + url_for('fuzzers.download_syncdir', campaign_id=campaign.id, filename=os.path.basename(filename)) for filename in glob.glob(sync_dir)
],
sync_in=current_app.config['DOWNLOAD_FREQUENCY'],
)
def analysis_queue(campaign_id):
campaign = models.Campaign.get(id=campaign_id)
return jsonify(
program=campaign.executable_name,
program_args=campaign.executable_args.split(' ') if campaign.executable_args else [], # TODO: add support for spaces
crashes=[{
'crash_id': crash.id,
'download': request.host_url[:-1] + url_for('fuzzers.download_crash', crash_id=crash.id)
} for crash in campaign.crashes.filter_by(analyzed=False)]
)
def _build_bundle_url(hit: dict, replica: Replica) -> str:
uuid, version = hit['_id'].split('.', 1)
return request.host_url + str(UrlBuilder()
.set(path='v1/bundles/' + uuid)
.add_query("version", version)
.add_query("replica", replica.name)
)
def _build_scroll_url(_scroll_id: str, per_page: int, replica: Replica, output_format: str) -> str:
return request.host_url + str(UrlBuilder()
.set(path="v1/search")
.add_query('per_page', str(per_page))
.add_query("replica", replica.name)
.add_query("_scroll_id", _scroll_id)
.add_query("output_format", output_format)
)
def api_unpublish():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = file.publickey
if key is not None:
file.publickey.public = False
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
return jsonify(response=responds['PUBLIC_KEY_UNPUBLISH'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def api_publish():
if request.method == 'GET':
uniqueid = request.args['uniqueid']
file = File.query.filter_by(unique_id=uniqueid).first()
if file is not None:
if g.user.admin or (g.user.id == file.uploader_id):
key = file.publickey
if (key is not None) and (key.public is False):
file.publickey.public = True
db.session.commit()
url = request.host_url + "pub/dl/" + key.hash
return jsonify(response=responds['PUBLIC_KEY_PUBLISH'], url=url)
return jsonify(response=responds['SOME_ERROR'])
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and \
ref_url.netloc == test_url.netloc
def is_safe_url(target):
if not target:
return False
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc
def is_safe_url(target):
if not target:
return False
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc