def prepare_flask_request(request):
url_data = urlparse(request.url)
return {
'https': 'on',
'http_host': request.host,
'script_name': request.path,
'get_data': request.args.copy(),
'post_data': request.form.copy()
}
python类host()的实例源码
def make_url_dfiles_list(dfiles_list):
cwd = os.getcwd()
# recast local fs image paths as server paths using hostname from request
for dfile_ifo in dfiles_list:
dfile_ifo['orig_fn'] = 'http://' + request.host + dfile_ifo['orig_fn'].replace(cwd, '')
dfile_ifo['thumb_fn'] = 'http://' + request.host + dfile_ifo['thumb_fn'].replace(cwd, '')
dfile_ifo['clean_fn'] = 'http://' + request.host + dfile_ifo['clean_fn'].replace(cwd, '')
return dfiles_list
def sms_alert(gif_url, user_id):
# Get User Phone Number
current_user = User.query.filter_by(id=user_id).first()
# Create Twilio Message
message = client.sms.messages.create(to=current_user.phone, from_=twilio_caller,
body="Intruder! Visit\n" + request.host + " \nfor more information",
media_url=['https://demo.twilio.com/owl.png', 'https://demo.twilio.com/logo.png'])
def get_spec(app):
"""Build API swagger scecifiction."""
swag = swagger(app)
swag["info"]["version"] = "0.1"
swag["info"]["title"] = "ORCID HUB API"
# swag["basePath"] = "/api/v0.1"
swag["host"] = request.host # "dev.orcidhub.org.nz"
swag["consumes"] = [
"application/json",
]
swag["produces"] = [
"application/json",
]
swag["schemes"] = [
"https",
]
swag["securityDefinitions"] = {
"application": {
"type": "oauth2",
"tokenUrl": url_for("access_token", _external=True),
"flow": "application",
"scopes": {
"write": "allows modifying resources",
"read": "allows reading resources",
}
}
}
swag["security"] = [
{
"application": [
"read",
"write",
]
},
]
return swag
def init_app(self, app):
@app.before_first_request
def make_logger():
handler = RotatingFileHandler('server_log.log', maxBytes=100000, backupCount=5)
handler.setFormatter(Formatter("[%(asctime)s] %(levelname)s - %(message)s"))
current_app.logger.addHandler(handler)
current_app.logger.setLevel(INFO)
current_app.logger.info('------ Logger Initialized ------')
@app.before_request
def before_request():
current_app.logger.info('Requested from {0} [ {1} {2} ]'.format(request.host, request.method, request.url))
current_app.logger.info('Request values : {0}'.format(request.values))
@app.after_request
def after_request(response):
current_app.logger.info('Respond : {0}'.format(response.status))
response.headers['X-Powered-By'] = app.config['SERVICE_NAME']
return response
@app.teardown_appcontext
def teardown_appcontext(exception):
if not exception:
current_app.logger.info('Teardown appcontext successfully.')
def __init__(self, app_name, listen='0.0.0.0', port=8181):
"""Start a Flask+SocketIO server.
Args:
app_name(string): String representing a App Name
listen (string): host name used by api server instance
port (int): Port number used by api server instance
"""
dirname = os.path.dirname(os.path.abspath(__file__))
self.flask_dir = os.path.join(dirname, '../web-ui')
self.log = logging.getLogger('api_server')
self.listen = listen
self.port = port
self.app = Flask(app_name, root_path=self.flask_dir,
static_folder="dist", static_url_path="/dist")
self.server = SocketIO(self.app, async_mode='threading')
self._enable_websocket_rooms()
# ENABLE CROSS ORIGIN RESOURCE SHARING
CORS(self.app)
# Disable trailing slash
self.app.url_map.strict_slashes = False
# Update web-ui if necessary
self.update_web_ui(force=False)
def shutdown_api(self):
"""Handle shutdown requests received by Api Server.
This method must be called by kytos using the method
stop_api_server, otherwise this request will be ignored.
"""
allowed_host = ['127.0.0.1:'+str(self.port),
'localhost:'+str(self.port)]
if request.host not in allowed_host:
return "", 403
self.server.stop()
return 'Server shutting down...', 200
def before_request():
# ? ??? ???? ??
# ???? request ??? ??? ? ??
# ?? ??? ???? ?? ? ??
print('Before request :', request.host)
def before_request():
global hostname, master_ip, master_port, run_ids, c_type
hostname = request.host
run_ids = []
try:
min_run_id = int(request.form.get('min_run_id'))
max_run_id = int(request.form.get('max_run_id'))
for run_id in range(min_run_id, max_run_id + 1):
run_ids.append(run_id)
except:
pass
master = request.form.get('master')
if master is not None:
if ":" in master:
(master_ip, master_port) = master.split(":")
else:
master_ip = master
c_type = request.form.get('type')
#####################
# Resource End Points
#####################
##
# System Memory
def get(self):
esstore = current_app.extensions['es_access_store']
mpstore = current_app.extensions['mp_access_store']
args = self.parser.parse_args()
event = args['event'][:120]
auth_token=request.headers.get('Auth-Token')
ip_resolver = current_app.config['IP_RESOLVER']
ip = RateLimiter.get_remote_addr()
ip_net = IPNetwork(ip)
resolved_org = ip_resolver['default']
for net, org in ip_resolver.items():
if isinstance(net, (IPv4Network, IPv6Network)):
if net.overlaps(ip_net):
resolved_org = org
break
data = dict(ip=ip,
org=resolved_org,
host=request.host,
timestamp=datetime.now(),
event=event)
if auth_token:
payload = TokenAuthentication.get_payload_from_token(auth_token)
data['app_name'] = payload['app_name']
# esstore.store_event(data)
mpstore.store_event(data)
data['timestamp']= str(data['timestamp'])
return CTTVResponse.OK(SimpleResult(None, data=data))
def init_app(self, app):
app.jinja_env.globals['csrf_token'] = generate_csrf
strict = app.config.get('WTF_CSRF_SSL_STRICT', True)
csrf_enabled = app.config.get('WTF_CSRF_ENABLED', True)
@app.before_request
def _csrf_protect():
# many things come from django.middleware.csrf
if not csrf_enabled:
return
if request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
return
if self._exempt_views:
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
csrf_token = None
if request.method in ('POST', 'PUT', 'PATCH'):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be ``{prefix}-csrf_token``
for key in request.form:
if key.endswith('csrf_token'):
csrf_token = request.form[key]
if not csrf_token:
# You can get csrf token from header
# The header name is the same as Django
csrf_token = request.headers.get('X-CSRFToken')
if not csrf_token:
# The header name is the same as Rails
csrf_token = request.headers.get('X-CSRF-Token')
if not validate_csrf(csrf_token):
reason = 'CSRF token missing or incorrect.'
return self._error_response(reason)
if request.is_secure and strict:
if not request.referrer:
reason = 'Referrer checking failed - no Referrer.'
return self._error_response(reason)
good_referrer = 'https://%s/' % request.host
if not same_origin(request.referrer, good_referrer):
reason = 'Referrer checking failed - origin not match.'
return self._error_response(reason)
request.csrf_valid = True # mark this request is csrf valid
def init_app(self, app):
app.jinja_env.globals['csrf_token'] = generate_csrf
strict = app.config.get('WTF_CSRF_SSL_STRICT', True)
csrf_enabled = app.config.get('WTF_CSRF_ENABLED', True)
@app.before_request
def _csrf_protect():
# many things come from django.middleware.csrf
if not csrf_enabled:
return
if request.method in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):
return
if self._exempt_views:
if not request.endpoint:
return
view = app.view_functions.get(request.endpoint)
if not view:
return
dest = '%s.%s' % (view.__module__, view.__name__)
if dest in self._exempt_views:
return
csrf_token = None
if request.method in ('POST', 'PUT', 'PATCH'):
# find the ``csrf_token`` field in the subitted form
# if the form had a prefix, the name will be ``{prefix}-csrf_token``
for key in request.form:
if key.endswith('csrf_token'):
csrf_token = request.form[key]
if not csrf_token:
# You can get csrf token from header
# The header name is the same as Django
csrf_token = request.headers.get('X-CSRFToken')
if not csrf_token:
# The header name is the same as Rails
csrf_token = request.headers.get('X-CSRF-Token')
if not validate_csrf(csrf_token):
reason = 'CSRF token missing or incorrect.'
return self._error_response(reason)
if request.is_secure and strict:
if not request.referrer:
reason = 'Referrer checking failed - no Referrer.'
return self._error_response(reason)
good_referrer = 'https://%s/' % request.host
if not same_origin(request.referrer, good_referrer):
reason = 'Referrer checking failed - origin not match.'
return self._error_response(reason)
request.csrf_valid = True # mark this request is csrf valid
def AddPlugin():
result = 'fail'
if request.referrer and request.referrer.replace('http://', '').split('/')[0] == request.host:
f = request.files['file']
if f:
fname = secure_filename(f.filename)
if fname.split('.')[-1] == 'py':
path = file_path + fname
if os.path.exists(file_path + fname):
fname = fname.split('.')[0] + '_' + datetime.now().strftime("%Y%m%d%H%M%S") + '.py'
path = file_path + fname
f.save(path)
if os.path.exists(path):
file_name = fname.split('.')[0]
module = __import__(file_name)
mark_json = module.get_plugin_info()
mark_json['filename'] = file_name
mark_json['add_time'] = datetime.now()
mark_json['count'] = 0
if 'source' not in mark_json:
mark_json['source'] = 0
insert_result = Mongo.coll['Plugin'].insert(mark_json)
if insert_result:
result = 'success'
else:
name = request.form.get('name', '')
info = request.form.get('info', '')
author = request.form.get('author', '')
level = request.form.get('level', '')
type = request.form.get('vultype', '')
keyword = request.form.get('keyword', '')
pluginurl = request.form.get('pluginurl', '')
methodurl = request.form.get('methodurl', '')
pdata = request.form.get('pdata', '')
analyzing = request.form.get('analyzing', '')
analyzingdata = request.form.get('analyzingdata', '')
tag = request.form.get('tag', '')
try:
query = {'name': name, 'info': info, 'level': level, 'type': type, 'author': author, 'url': pluginurl,
'keyword': keyword, 'source': 0}
query['plugin'] = {'method': methodurl.split(' ', 1)[0], 'url': methodurl.split(' ', 1)[1],
'analyzing': analyzing, 'analyzingdata': analyzingdata, 'data': pdata, 'tag': tag}
file_name = secure_filename(name) + '_' + datetime.now().strftime("%Y%m%d%H%M%S") + ".json"
with open(file_path + file_name, 'wb') as wt:
wt.writelines(json.dumps(query))
query.pop('plugin')
query['add_time'] = datetime.now()
query['count'] = 0
query['filename'] = file_name
insert_result = Mongo.coll['Plugin'].insert(query)
if insert_result:
result = 'success'
except:
pass
return result
# ??????
def protect(self):
if request.method not in current_app.config['WTF_CSRF_METHODS']:
return
try:
validate_csrf(self._get_csrf_token())
except ValidationError as e:
logger.info(e.args[0])
self._error_response(e.args[0])
if request.is_secure and current_app.config['WTF_CSRF_SSL_STRICT']:
if not request.referrer:
self._error_response('The referrer header is missing.')
good_referrer = 'https://{0}/'.format(request.host)
if not same_origin(request.referrer, good_referrer):
self._error_response('The referrer does not match the host.')
g.csrf_valid = True # mark this request as CSRF valid