def itemDetail(id,form=None):
article = Article.query.get(id)
author = User.query.get(article.author_id)
tag = Tag.query.filter_by(article_id=id,status=STATUS_NORMAL).all()
keywords = ""
for t in tag:
keywords = keywords + t.title + ','
keywords = keywords[0:-1]
article.view_num = article.view_num+1
view = ArticleView()
view.article_id=id
view.ip = request.remote_addr
isLike = False
if current_user.is_authenticated:
user_id = current_user.get_id()
view.user_id = user_id
articleLike = ArticleLike.query.filter_by(article_id=id,user_id=user_id,status=STATUS_NORMAL).first()
if articleLike != None:
isLike = True
db.session.add(view)
db.session.commit()
return render_template('item.html',article=article,author=author,isLike=isLike,tags=tag,form=form,keywords=keywords)
python类remote_addr()的实例源码
def __call__(self, form, field):
if current_app.testing:
return True
if request.json:
response = request.json.get('g-recaptcha-response', '')
else:
response = request.form.get('g-recaptcha-response', '')
remote_ip = request.remote_addr
if not response:
raise ValidationError(field.gettext(self.message))
if not self._validate_recaptcha(response, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def login():
if request.method == 'POST':
if request.form['user'] is '' or request.form['password'] is '':
flash("Can't leave it blank",'danger')
else:
au=auth()
user=au.check(request.form['user'],request.form['password'])
if user:
login_user(user)
flash('Logged in successfully.','success')
if user.is_admin:
return redirect(url_for('admin'))
return redirect(url_for('desktops'))
else:
flash('Username not found or incorrect password.','warning')
remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr
disposables=app.isardapi.show_disposable(remote_addr)
log.info(disposables)
log.info(remote_addr)
return render_template('login_disposables.html', disposables=disposables if disposables else '')
def voucher_login():
if request.method == 'POST':
remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr
au=auth_voucher()
if au.check_voucher(request.form['voucher']):
if au.check_user_exists(request.form['email']):
au.register_user(request.form['voucher'],request.form['email'],remote_addr)
flash('Resetting account. Email with new isard user sent to '+request.form['email']+'. Please check your email','warning')
else:
au.register_user(request.form['voucher'],request.form['email'],remote_addr)
flash('Email with isard user sent to '+request.form['email']+'. Please check your email','success')
else:
flash('Invalid registration voucher code','danger')
disposables=False
return render_template('login.html', disposables=disposables if disposables else '')
def __call__(self, form, field):
if current_app.testing:
return True
if request.json:
response = request.json.get('g-recaptcha-response', '')
else:
response = request.form.get('g-recaptcha-response', '')
remote_ip = request.remote_addr
if not response:
raise ValidationError(field.gettext(self.message))
if not self._validate_recaptcha(response, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def __call__(self, form, field):
if current_app.testing:
return True
if request.json:
response = request.json.get('g-recaptcha-response', '')
else:
response = request.form.get('g-recaptcha-response', '')
remote_ip = request.remote_addr
if not response:
raise ValidationError(field.gettext(self.message))
if not self._validate_recaptcha(response, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def game():
# Controller page
time_full = time.perf_counter()
player_ip = request.remote_addr
# To redirect players who isent in the ip list and has thearfor ni team
redirect_var = True
for i in players:
if player_ip == i.ip:
print("OK")
redirect_var = False
if redirect_var:
return redirect(url_for('index'))
team_var = get_team(player_ip)
direction_var = None
if request.method == 'POST':
# Adds a request to move the robot in the multithread queue
q.put(request.form['submit'])
print(time.perf_counter() - time_full)
return render_template('game.html',team=team_var, direction=direction_var)
def __call__(self, form, field):
if current_app.testing:
return True
if request.json:
response = request.json.get('g-recaptcha-response', '')
else:
response = request.form.get('g-recaptcha-response', '')
remote_ip = request.remote_addr
if not response:
raise ValidationError(field.gettext(self.message))
if not self._validate_recaptcha(response, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def dist_index(dist):
netbsd_logo_url = url_for('static', filename='images/netbsd.png')
if dist is None or dist == '':
dist = 'NetBSD-current'
if dist not in config.DB_PATHS and dist != 'favicon.ico':
return redirect(url_for('search'))
ip = request.remote_addr
user_agent = request.user_agent
platform = user_agent.platform
browser = user_agent.browser
version = user_agent.version
language = user_agent.language
referrer = request.referrer
dblogger.log_page_visit(1, ip, platform, browser, version, language, referrer,
int(time.time()), user_agent.string, dist)
return render_template('index.html',
netbsd_logo_url=netbsd_logo_url, distnames=distnames)
def catch_hash_page(some_hash):
if ip_in_db(request.remote_addr):
if some_hash in image_mappings:
image = 'Banned!<p></p><img src="images/{}">'\
.format(image_mappings[some_hash])
return render_template(
"index.html", content={"link": "", "text": image})
else:
return render_template(
"index.html", content={"link": "", "text": "wrong way:("})
add_ip(request.remote_addr)
return render_template(
"index.html",
content={
"link": next_url(some_hash),
"text": "You're on the right way!"})
def monkey_patch_email_field(form_class):
""" We use our monkey patched Email validator, and also a html5 email input.
"""
from wtforms.fields.html5 import EmailField
from flask_security.forms import (email_required,
unique_user_email,
get_form_field_label)
import wtforms.validators
from pygameweb.user.rbl import rbl
def rbl_spamlist_validator(form, field):
""" If the ip address of the person signing up is listed in a spam list,
we abort with an error.
"""
remote_addr = request.remote_addr or None
if rbl(remote_addr):
abort(500)
email_validator = wtforms.validators.Email(message='INVALID_EMAIL_ADDRESS')
form_class.email = EmailField(get_form_field_label('email'),
validators=[email_required,
email_validator,
rbl_spamlist_validator,
unique_user_email])
def request_register():
username = str(request.form['username'])
requestid = str(request.form['requestid'])
client_ip = request.remote_addr
if not pivportal.security.username_is_valid(username) or not pivportal.security.requestid_is_valid(requestid) or not pivportal.security.ip_is_valid(client_ip):
# client_ip is None when testing, so its ok
return Response(response=json.dumps({"response": " invalid request"}), status=400, mimetype="application/json")
if pivportal.security.is_duplicate_register(username, requestid, redis_store.hgetall("requests")):
return Response(response=json.dumps({"response": " invalid request"}), status=400, mimetype="application/json")
this_request = {"username": username, "client_ip": client_ip, "authorized": False, "time": time.time()}
redis_store.hmset("requests", {requestid: json.dumps(this_request)})
return Response(response=json.dumps({"response": "success"}), status=200, mimetype="application/json")
def request_status():
username = str(request.form['username'])
requestid = str(request.form['requestid'])
client_ip = request.remote_addr
if not pivportal.security.username_is_valid(username) or not pivportal.security.requestid_is_valid(requestid) or not pivportal.security.ip_is_valid(client_ip):
return Response(response=json.dumps({"response": " invalid request"}), status=400, mimetype="application/json")
auth_requests = redis_store.hgetall("requests")
if requestid in auth_requests:
this_request = json.loads(auth_requests[requestid])
if this_request["username"] == username and this_request["client_ip"] == client_ip:
if this_request["authorized"] == True:
# Success
redis_store.hdel("requests", requestid)
return Response(response=json.dumps({"response": "success"}), status=200, mimetype="application/json")
else:
# Delete auth_request, it failed anyway
redis_store.hdel("requests", requestid)
return Response(response=json.dumps({"response": "Authentication Failure"}), status=401, mimetype="application/json")
def register():
""" Register a new client """
if request.method == 'POST':
client_id = request.form.get('uuid','')
user_agent = request.form.get('user_agent','')
ip = request.remote_addr
if client_id and user_agent and ip:
print("Register: ", client_id)
c = Client(client_id, user_agent, ip)
db.session.add(c)
# add pre flight scripts
c.add_preflight()
db.session.commit()
return 'Hello {}!'.format(client_id)
else:
return 'UUID is not present'
def before_request(self,*args):
"""Verfies that the API is Vaild for the correct user
and the IP address hasn't changed since log in"""
signer = TimestampSigner(SECRET_KEY)
api_key = request.headers['API_KEY']
Client_id = request.headers['Client_ID']
ip_address = request.remote_addr
user = User.query.filter_by(Client_id=Client_id).first()
if user == None:
return make_response(jsonify({'Failure': 'Invaild User'}), 400)
if api_key != user.api_key:
return make_response(jsonify({'Failure': 'Incorrect API Key'}), 400)
if ip_address != user.current_login_ip:
return make_response(jsonify({'Failure': 'Incorrect IP for Client, Please Re-login in'}), 400)
try:
signer.unsign(api_key, max_age=86164)
except:
return make_response(jsonify({'Failure': 'Invaild API Key, please request new API Key'}), 400)
def route():
args = request.json
args = args if args else {}
cfg = args.get('cfg', None)
log_request(
args.get('user', 'unknown'),
args.get('hostname', 'unknown'),
request.remote_addr,
request.method,
request.path,
args)
try:
endpoint = create_endpoint(request.method, cfg, args)
json, status = endpoint.execute()
except AutocertError as ae:
status = 500
json = dict(errors={ae.name: ae.message})
return make_response(jsonify(json), status)
if not json:
raise EmptyJsonError(json)
return make_response(jsonify(json), status)
def main():
iplow = ip2long('192.30.252.0')
iphigh = ip2long('192.30.255.255')
if request.remote_addr in range(iplow, iphigh):
payload = request.get_json()
if payload["repository"]["name"] == "Python-IRC-Bot":
try:
subprocess.check_call(["git", "pull"])
except subprocess.CalledProcessError:
irc.privmsg("##wolfy1339", "git pull failed!")
else:
if "handlers.py" in payload['head_commit']['modified']:
reload_handlers(bot)
return flask.Response("Thanks.", mimetype="text/plain")
return flask.Response("Wrong repo.", mimetype="text/plain")
else:
flask.abort(403)
def __call__(self, form, field):
if current_app.testing:
return True
if request.json:
response = request.json.get('g-recaptcha-response', '')
else:
response = request.form.get('g-recaptcha-response', '')
remote_ip = request.remote_addr
if not response:
raise ValidationError(field.gettext(self.message))
if not self._validate_recaptcha(response, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def app_getinfo(ver):
""" Returns Flask API Info """
response = dict()
response['api_version'] = ver
response['message'] = "Flask API Data"
response['status'] = "200"
response['method'] = request.method
response['path'] = request.path
response['remote_addr'] = request.remote_addr
response['user_agent'] = request.headers.get('User-Agent')
# GET attributes
for key in request.args:
response['GET ' + key] = request.args.get(key, '')
# POST Attributes
for key in request.form.keys():
response['POST ' + key] = request.form[key]
return jsonify(response)
def app_getinfo(ver):
""" Returns Flask API Info """
response = dict()
response['api_version'] = ver
response['message'] = "Flask API Data"
response['status'] = "200"
response['method'] = request.method
response['path'] = request.path
response['remote_addr'] = request.remote_addr
response['user_agent'] = request.headers.get('User-Agent')
# GET attributes
for key in request.args:
response['GET ' + key] = request.args.get(key, '')
# POST Attributes
for key in request.form.keys():
response['POST ' + key] = request.form[key]
return jsonify(response)