def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
python类request()的实例源码
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def find_fontface_by_font_id():
response_data = []
try:
query_string = request.args.get("font_id")
fontfaces = FontFaceService().find_by_font_id(query_string)
for fontface in fontfaces:
response_data.append(
{
"fontface_id": fontface.fontface_id,
"fontface": fontface.fontface,
"font_id": fontface.font_id,
"resource_path": fontface.resource_path
}
)
return jsonify(response_data)
except:
return jsonify({"error": "Invalid request"})
def pushbullet(ALERTID=None, TOKEN=None):
"""
Send a `link` notification to all devices on pushbullet with a link back to the alert's query.
If `TOKEN` is not passed, requires `PUSHBULLETTOKEN` defined, see https://www.pushbullet.com/#settings/account
"""
if not PUSHBULLETURL:
return ("PUSHBULLET parameter must be set, please edit the shim!", 500, None)
if (TOKEN is not None):
PUSHBULLETTOKEN = TOKEN
if not PUSHBULLETTOKEN:
return ("PUSHBULLETTOKEN parameter must be set, please edit the shim!", 500, None)
a = parse(request)
payload = {
"body": a['info'],
"title": a['AlertName'],
"type": "link",
"url": a['url'],
}
headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN}
return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
def find_fontface_by_font_id():
response_data = []
try:
query_string = request.args.get("font_id")
fontfaces = FontFaceService().find_by_font_id(query_string)
for fontface in fontfaces:
response_data.append(
{
"fontface_id": fontface.fontface_id,
"fontface": fontface.fontface,
"font_id": fontface.font_id,
"resource_path": fontface.resource_path
}
)
return jsonify(response_data)
except:
return jsonify({"error": "Invalid request"})
def extract_token(self):
'''
Extracts a token from the current HTTP request if it is available.
Invokes the `save_user` callback if authentication is successful.
'''
header = request.headers.get(b'authorization')
if header and header.startswith(b'Negotiate '):
token = header[10:]
user, token = _gssapi_authenticate(token, self._service_name)
if token is not None:
stack.top.kerberos_token = token
if user is not None:
self._save_user(user)
else:
# Invalid Kerberos ticket, we could not complete authentication
abort(403)
def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# Disable test if we don't have greenlets available
def recongize():
'''
Receives POST request from Twilio. Sends data from request to Amazon Rekognize and receives the API's analysis.
Returns the resulting analysis as JSON.
'''
# Reads image data from incoming request
r = request
data = r.files['file'].read()
# Sends image data to Amazon Rekognize for analysis. Returns JSON response of results.
try:
results = client.recognize_celebrities(Image={'Bytes': data})
except botocore.exceptions.ClientError as e:
results = {"Rekognition Client Error": str(e)}
logging.info(results)
return jsonify(results)
def chunked_reader():
try:
# If we're running under uWSGI, use the uwsgi.chunked_read method
# to read chunked input.
import uwsgi # noqa
while True:
chunk = uwsgi.chunked_read()
if len(chunk) > 0:
yield chunk
else:
return
except ImportError:
# Otherwise try to read the wsgi input. This works in embedded Apache.
stream = flask.request.environ["wsgi.input"]
try:
while True:
yield stream.next()
except Exception:
return
def get_history(coin):
c = Coin(coin.lower())
q = "SELECT * from api_coin WHERE name=:coin ORDER BY date desc"
if request.args.get('key') in API_KEYS:
print(crayons.red('Pro request!'))
rows = pro_db.query(q, coin=c.name)
else:
rows = db.query(q, coin=c.name)
return jsonify(history=[
{
'value': r.value,
'value.currency': 'USD',
'timestamp': maya.MayaDT.from_datetime(r.date).subtract(hours=4).iso8601(),
'when': maya.MayaDT.from_datetime(r.date).subtract(hours=4).slang_time()
} for r in rows]
)
def second_phase_lti_launch(
) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]]
]:
launch_params = jwt.decode(
flask.request.headers.get('Jwt', None),
app.config['LTI_SECRET_KEY'],
algorithm='HS512'
)['params']
lti = CanvasLTI(launch_params)
user, new_token = lti.ensure_lti_user()
course = lti.get_course()
assig = lti.get_assignment(user)
lti.set_user_role(user)
new_role_created = lti.set_user_course_role(user, course)
db.session.commit()
result: t.Mapping[str, t.Union[str, models.Assignment, bool]]
result = {'assignment': assig, 'new_role_created': new_role_created}
if new_token is not None:
result['access_token'] = new_token
return helpers.jsonify(result)
def validate_sig(body, sig_str, pkh):
"""
Validate the signature on the body of the request - throws exception if not valid.
"""
# Check permission to update
if (pkh is None):
raise PermissionError("pkh not found.")
# Validate the pkh format
base58.b58decode_check(pkh)
if (len(pkh) < 20) or (len(pkh) > 40):
raise PermissionError("Invalid pkh")
try:
if not sig_str:
raise PermissionError("X-Bitcoin-Sig header not found.")
if not wallet.verify_bitcoin_message(body, sig_str, pkh):
raise PermissionError("X-Bitcoin-Sig header not valid.")
except Exception as err:
logger.error("Failure: {0}".format(err))
raise PermissionError("X-Bitcoin-Sig header validation failed.")
return True
def clear_cache():
user_name = request.headers.get('x-webauth-user')
account = ldap_get_member(user_name)
if not ldap_is_eval_director(account) and not ldap_is_rtp(account):
return redirect("/dashboard")
log = logger.new(request=request)
log.info('Purge All Caches')
_ldap_is_member_of_directorship.cache_clear()
ldap_get_member.cache_clear()
ldap_get_active_members.cache_clear()
ldap_get_intro_members.cache_clear()
ldap_get_onfloor_members.cache_clear()
ldap_get_current_students.cache_clear()
get_voting_members.cache_clear()
get_members_info.cache_clear()
get_onfloor_members.cache_clear()
return "cache cleared", 200
def database_processor(logger, log_method, event_dict): # pylint: disable=unused-argument, redefined-outer-name
if 'request' in event_dict:
if event_dict['method'] != 'GET':
log = UserLog(
ipaddr=event_dict['ip'],
user=event_dict['user'],
method=event_dict['method'],
blueprint=event_dict['blueprint'],
path=event_dict['path'],
description=event_dict['event']
)
db.session.add(log)
db.session.flush()
db.session.commit()
del event_dict['request']
return event_dict
def RunFileServer(fileServerDir, fileServerPort):
"""
Run a Flask file server on the given port.
Explicitly specify instance_path, because Flask's
auto_find_instance_path can fail when run in a frozen app.
"""
app = Flask(__name__, instance_path=fileServerDir)
@app.route('/fileserver-is-ready', methods=['GET'])
def FileserverIsReady(): # pylint: disable=unused-variable
"""
Used to test if file server has started.
"""
return 'Fileserver is ready!'
@app.route('/<path:filename>', methods=['GET'])
def ServeFile(filename): # pylint: disable=unused-variable
"""
Serves up a file from PYUPDATER_FILESERVER_DIR.
"""
return send_from_directory(fileServerDir, filename.strip('/'))
def ShutDownServer():
"""
Shut down the file server.
"""
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
raise RuntimeError('Not running with the Werkzeug Server')
func()
@app.route('/shutdown', methods=['POST'])
def ShutDown(): # pylint: disable=unused-variable
"""
Respond to a POSTed request to shut down the file server.
"""
ShutDownServer()
return 'Server shutting down...'
app.run(host=LOCALHOST, port=fileServerPort)
def request_password():
jid = request.form.get('jid')
re = s.query(RecoveryEmail).filter(RecoveryEmail.jid==jid, RecoveryEmail.confirmed==True).one_or_none()
if re:
password_code = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(64))
re.password_code = password_code
s.merge(re)
s.commit()
password_code_link = 'https://www.pimux.de/password/?code=%s' % password_code
sendMail(re.email, 'password reset request', 'click here: %s' % password_code_link)
content = render_template('success.html', message='link was sent')
else:
content = render_template('error.html', message='user not found')
return content
def connected():
logging.debug('Received connect request')
emit('log', {'data': 'Connected'})
def create_from_request(cls: t.Type['LTI'], req: flask.Request) -> 'LTI':
params = req.form.copy()
lti_provider = models.LTIProvider.query.filter_by(
key=params['oauth_consumer_key']
).first()
if lti_provider is None:
lti_provider = models.LTIProvider(key=params['oauth_consumer_key'])
db.session.add(lti_provider)
db.session.commit()
params['lti_provider_id'] = lti_provider.id
# This is semi sensitive information so it should not end up in the JWT
# token.
launch_params = {}
for key, value in params.items():
if not key.startswith('oauth'):
launch_params[key] = value
self = cls(launch_params, lti_provider)
auth.ensure_valid_oauth(self.key, self.secret, req)
return self
def setUp(self) -> None:
"""
Set up some mock model classes and create an endpoint
"""
self.session = mock.MagicMock(spec=Session) # type: Session
self.request = mock.MagicMock(spec=Request) # type: Request
self.service_list = mock.MagicMock(
spec=ServiceList
) # type: ServiceList
self._app = Flask(__name__)
self._app.add_url_rule(
'/', view_func=ServiceDetail.as_view(
ServiceDetail.__name__, self.session
)
)
self._context = self._app.test_request_context()
self._context.push()
def test_invalid_post(self, request_body: dict) -> None:
assume("name" not in request_body.keys())
assume("description" not in request_body.keys())
assume("job_registration_schema" not in request_body.keys())
assume("job_result_schema" not in request_body.keys())
request = mock.MagicMock(spec=Request)
request.get_json = mock.MagicMock(return_value=request_body)
endpoint = ServicesList(
self.session, request
)
with self.assertRaises(endpoint.Abort):
endpoint.post()
self.assertTrue(endpoint.errors)
def __init__(
self,
session: Session,
flask_request: Request=request,
job_list_model: Optional[JobListInterface]=None
) -> None:
"""
:param session: The session to use
:param flask_request: The Flask request that this endpoint needs to
process
"""
super(self.__class__, self).__init__(session, flask_request)
if job_list_model is None:
self.job_list = JobListModel(self.database_session)
else:
self.job_list = job_list_model
def __init__(
self,
session: Session,
flask_request: Request=request,
metadata: APIMetadataModelInterface=MetadataModel()
) -> None:
"""
:param session: The SQLAlchemy ORM Session to use for communicating
with the database. This is required by ``AbstractEndpoint`` in
order to perform its transaction management
:param flask_request: The request to process. By default, this is
flask's ``request`` variable
:param metadata: The metadata to serialize. By default, this is an
instance of the ``MetadataModel`` that pulls all of its values
from the ``config`` script.
"""
super(APIMetadata, self).__init__(session, request=flask_request)
self._api_metadata = metadata
def __init__(
self, session: Session,
flask_request: Request=request,
service_list: Optional[ServiceListInterface]=None
) -> None:
"""
Create a service list model that is capable of getting services from
the DB
:param session: The database session to use
"""
super(self.__class__, self).__init__(session, flask_request)
if service_list is not None:
self.service_list = service_list
else:
self.service_list = ServiceListModel(session)
def will_handle_ui(self, request):
"""
Called by OctoPrint to determine if the mixin implementation will be
able to handle the ``request`` provided as a parameter.
Return ``True`` here to signal that your implementation will handle
the request and that the result of its :meth:`~octoprint.plugin.UiPlugin.on_ui_render` method
is what should be served to the user.
The execution order of calls to this method can be influenced via the sorting context
``UiPlugin.will_handle_ui``.
Arguments:
request (flask.Request): A Flask `Request <http://flask.pocoo.org/docs/0.10/api/#flask.Request>`_
object.
Returns:
bool: ``True`` if the the implementation will serve the request,
``False`` otherwise.
"""
return False
def test_modified_url_encoding(self):
class ModifiedRequest(flask.Request):
url_charset = 'euc-kr'
app = flask.Flask(__name__)
app.testing = True
app.request_class = ModifiedRequest
app.url_map.charset = 'euc-kr'
@app.route('/')
def index():
return flask.request.args['foo']
rv = app.test_client().get(u'/?foo=????'.encode('euc-kr'))
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, u'????'.encode('utf-8'))
def test_modified_url_encoding(self):
class ModifiedRequest(flask.Request):
url_charset = 'euc-kr'
app = flask.Flask(__name__)
app.testing = True
app.request_class = ModifiedRequest
app.url_map.charset = 'euc-kr'
@app.route('/')
def index():
return flask.request.args['foo']
rv = app.test_client().get(u'/?foo=????'.encode('euc-kr'))
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, u'????'.encode('utf-8'))
def test_modified_url_encoding(self):
class ModifiedRequest(flask.Request):
url_charset = 'euc-kr'
app = flask.Flask(__name__)
app.testing = True
app.request_class = ModifiedRequest
app.url_map.charset = 'euc-kr'
@app.route('/')
def index():
return flask.request.args['foo']
rv = app.test_client().get(u'/?foo=????'.encode('euc-kr'))
self.assert_equal(rv.status_code, 200)
self.assert_equal(rv.data, u'????'.encode('utf-8'))