def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
python类request()的实例源码
def launch_lti() -> t.Any:
"""Do a LTI launch.
.. :quickref: LTI; Do a LTI Launch.
"""
lti = {
'params': CanvasLTI.create_from_request(flask.request).launch_params,
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
}
return flask.redirect(
'{}/lti_launch/?inLTI=true&jwt={}'.format(
app.config['EXTERNAL_URL'],
urllib.parse.quote(
jwt.encode(
lti, app.config['LTI_SECRET_KEY'], algorithm='HS512'
).decode('utf8')
)
)
)
def index():
if request.args.get('code'):
unlock_code = request.args.get('code')
# unlock, new password
re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none()
if re:
jid = re.jid
re.password_code = None
s.merge(re)
s.commit()
# set new password and send email
email_address = re.email
password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10))
p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
args = bytes("%s\n%s\n" % (password, password), encoding='utf8')
p.communicate(args)
sendMail(email_address, 'new password', password)
content = render_template('success.html', message='password was sent')
else:
content = render_template('error.html', message='link invalid')
else:
content = render_template('index.html')
return content
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def api_send_loves():
sender = request.form.get('sender')
recipients = sanitize_recipients(request.form.get('recipient'))
message = request.form.get('message')
try:
recipients = send_loves(recipients, message, sender_username=sender)
recipients_display_str = ', '.join(recipients)
link_url = create_love_link(recipients_display_str, message).url
return make_response(
u'Love sent to {}! Share: {}'.format(recipients_display_str, link_url),
LOVE_CREATED_STATUS_CODE,
{}
)
except TaintedLove as exc:
return make_response(
exc.user_message,
LOVE_FAILED_STATUS_CODE if exc.is_error else LOVE_CREATED_STATUS_CODE,
{}
)
def sent():
link_id = request.args.get('link_id', None)
recipients_str = request.args.get('recipients', None)
message = request.args.get('message', None)
if not link_id or not recipients_str or not message:
return redirect(url_for('home'))
recipients = sanitize_recipients(recipients_str)
loved = [
Employee.get_key_for_username(recipient).get()
for recipient in recipients
]
return render_template(
'sent.html',
current_time=datetime.utcnow(),
current_user=Employee.get_current_employee(),
message=message,
loved=loved,
url='{0}l/{1}'.format(config.APP_BASE_URL, link_id),
)
def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def login():
username = request.headers.get('username')
password = request.headers.get('password')
if username is None or password is None:
raise InvalidRequest()
user = UsersCollection().find_one({'username': username})
if user is None:
raise AuthFailed()
is_valid = check_password_hash(user['password_hash'], password)
if not is_valid:
raise AuthFailed()
return jsonify({'token': UserJWT.new(username, user['scope'])})
def fixUrl(destinationPort, transport, url, peerType):
"""
fixes the URL (original request string)
"""
transportProtocol = ""
if transport.lower() in "udp" or transport.lower() in "tcp":
transportProtocol="/"+transport
if ("honeytrap" in peerType):
return "Attack on port " + str(destinationPort) + transportProtocol
# prepared dionaea to output additional information in ticker
if ("Dionaea" in peerType):
return "Attack on port " + str(destinationPort)+ transportProtocol
return url
def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def password_reset_request():
"""Request for reseting password when user forget his/her password.
"""
if not current_user.is_anonymous:
return redirect(url_for('.index'))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
token = user.generate_reset_token()
send_email(user.email, 'Reset Your Password',
'auth/email/reset_password',
user=user, token=token,
next=request.args.get('next'))
flash('An email with instructions to reset your password has been '
'sent to you.')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password.html', form=form)
def inject():
return {
'root': _cfg("protocol") + "://" + _cfg("domain"),
'domain': _cfg("domain"),
'protocol': _cfg("protocol"),
'len': len,
'any': any,
'request': request,
'locale': locale,
'url_for': url_for,
'file_link': file_link,
'disown_link': disown_link,
'user': current_user,
'moe': random.choice(moe),
'random': random,
'owner': _cfg("owner"),
'owner_email': _cfg("owner_email"),
'_cfg': _cfg
}
def make_embed(request, message="New Transaction"):
"""
create an embed for logging transactions
"""
fields = []
fields.append({"name": "ID", "value": request['_id']})
if request['type'] == "deposit":
fields.append({"name": "Amount", "value": f"+{request['amount']}"})
elif request['type'] == "withdrawl":
fields.append({"name": "Amount", "value": f"-{request['amount']}"})
fields.append({"name": "Reason", "value": request['reason']})
fields.append({"name": "Balance", "value": request['user']['balance']})
fields.append(
{"name": "User", "value": f"{request['user']['name']}#{request['user']['discrim']} ({request['user']['_id']})"})
fields.append(
{"name": "Bot", "value": f"{request['bot']['name']}#{request['bot']['discrim']} ({request['bot']['_id']})"})
embed = {"title": message, "fields": fields, "timestamp":datetime.now().isoformat()}
test = requests.post('https://canary.discordapp.com/api/webhooks/338371642277494786/vG8DJjpXC-NEXB4ZISo1r7QQ0Ras_RaqZbuhzjYOklKu70l73PmumdUCgBruypPv3fQp', json={"embeds": [embed]})
def get_transactions(request):
transactions = []
if "type" in request:
transactions.append(list(r.table('transactions').filter(
r.row['type'] == request['type']).run(conn)))
if "amount" in request:
transactions.append(list(r.table('transactions').filter(
r.row['amount'] == request['amount']).run(conn)))
if "bot" in request:
transactions.append(list(r.table('transactions').filter(
r.row['bot'] == request['bot']).run(conn)))
if "user" in request:
transactions.append(list(r.table('transactions').filter(
r.row['user'] == request['user']).run(conn)))
if "reason" in request:
transactions.append(list(r.table('transactions').filter(
r.row['reason'] == request['reason']).run(conn)))
if not "reason" or "user" or "bot" or "amount" or "type" in request:
transactions.append(list(r.table('transactions').run(conn)))
temp = []
for i in transactions:
if i not in temp:
temp.append(i)
transactions = temp
return transactions[:request['limit']]
def create_token():
"""
register a new bot, and return the token
"bot": {
"name": "Alpha Bot",
"discrim": "4112",
"_id": "331841835733614603"
}
"""
raw_request = flask.request
request = flask.request.get_json()
token = tokengenerator.URandomTokenGenerator().generate()
bot = {"name": request['bot']['name'], "discrim": request['bot']['discrim'],
"_id": request['bot']['id'], "owner": request['owner'], "token": token}
r.table('bots').insert({"bot": bot})
return flask.jsonify(bot)
def show_transactions():
"""
Returns all transactions that match the given parameters
(keys in parentheses are optional)
{
"limit": 20,
("type": "deposit",)
("amount": 200,)
("user": {
"name": "Mr Boo Grande",
"discrim": "6644",
"_id": "209137943661641728"
},)
("bot": {
"name": "Alpha Bot",
"discrim": "4112",
"_id": "331841835733614603"
},)
("reason": "casino")
}
"""
raw_request = flask.request
request = flask.request.get_json()
transactions = get_transactions(request)
return flask.jsonify(transactions)
def fake_transaction():
"""
create a fake transaction log (for testing purposes)
{
"_id": 90832,
"type": "deposit",
"amount": 200,
"user": {
"name": "Mr Boo Grande",
"discrim": "6644",
"_id": "209137943661641728"
},
"bot": {
"name": "Alpha Bot",
"discrim": "4112",
"_id": "331841835733614603"
},
"reason": "casino"
}
"""
raw_request = flask.request
request = flask.request.get_json()
make_embed(request)
return flask.jsonify(request)
def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def flask(body, headers):
import flask
path = '/hello/<account_id>/test'
flask_app = flask.Flask('hello')
@flask_app.route(path)
def hello(account_id):
request = flask.request
user_agent = request.headers['User-Agent'] # NOQA
limit = request.args.get('limit', '10') # NOQA
return flask.Response(body, headers=headers,
mimetype='text/plain')
return flask_app
def werkzeug(body, headers):
import werkzeug.wrappers as werkzeug
from werkzeug.routing import Map, Rule
path = '/hello/<account_id>/test'
url_map = Map([Rule(path, endpoint='hello')])
@werkzeug.Request.application
def hello(request):
user_agent = request.headers['User-Agent'] # NOQA
limit = request.args.get('limit', '10') # NOQA
adapter = url_map.bind_to_environ(request.environ) # NOQA
endpoint, values = adapter.match() # NOQA
aid = values['account_id'] # NOQA
return werkzeug.Response(body, headers=headers,
mimetype='text/plain')
return hello
def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def _serve_webui(self, file_name='index.html'): # pylint: disable=redefined-builtin
try:
assert file_name
web3 = self.flask_app.config.get('WEB3_ENDPOINT')
if web3 and 'config.' in file_name and file_name.endswith('.json'):
host = request.headers.get('Host')
if any(h in web3 for h in ('localhost', '127.0.0.1')) and host:
_, _port = split_endpoint(web3)
_host, _ = split_endpoint(host)
web3 = 'http://{}:{}'.format(_host, _port)
response = jsonify({'raiden': self._api_prefix, 'web3': web3})
else:
response = send_from_directory(self.flask_app.config['WEBUI_PATH'], file_name)
except (NotFound, AssertionError):
response = send_from_directory(self.flask_app.config['WEBUI_PATH'], 'index.html')
return response