def index():
code = request.args.get("code", "")
app.logger.debug("code: %s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data.get('access_token')
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
python类jsonify()的实例源码
def omw_lang_selector():
selected_lang = request.cookies.get('selected_lang')
selected_lang2 = request.cookies.get('selected_lang2')
lang_id, lang_code = fetch_langs()
html = '<select name="lang" style="font-size: 85%; width: 9em" required>'
for lid in lang_id.keys():
if selected_lang == str(lid):
html += """<option value="{}" selected>{}</option>
""".format(lid, lang_id[lid][1])
else:
html += """<option value="{}">{}</option>
""".format(lid, lang_id[lid][1])
html += '</select>'
html += '<select name="lang2" style="font-size: 85%; width: 9em" required>'
for lid in lang_id.keys():
if selected_lang2 == str(lid):
html += """<option value="{}" selected>{}</option>
""".format(lid, lang_id[lid][1])
else:
html += """<option value="{}">{}</option>
""".format(lid, lang_id[lid][1])
html += '</select>'
return jsonify(result=html)
def nowplaying():
username = "@" + request.form.get('user_name', None)
user_id = request.form.get('user_id', None)
lastfm_user = request.form.get('text', None)
if lastfm_user == "" or lastfm_user == None:
user = User.query.filter_by(user_id=user_id).first()
if user == None:
return "Please enter a valid username", 200
lastfm_user = user.lastfm
if lastfm_user == None:
return "Please enter a valid username", 200
np = lastfm_network.get_user(lastfm_user).get_now_playing()
if np == None:
return "No song playing.", 200
payload = {
"response_type": "in_channel",
"text": "<http://last.fm/user/" + lastfm_user + "|" + lastfm_user + "> is currently listening to <" + np.get_url() + "|" + np.artist.name + " - " + np.title + ">",
"unfurl_links": False
}
return jsonify(payload), 200
def get_chapter(book_id):
# chapters = Chapter.query.filter_by(book_id=book_id).all()
page = request.args.get('page', 1, type=int)
pagination = Chapter.query.filter_by(book_id=book_id).paginate(
page, per_page=current_app.config['CHAPTER_PER_PAGE'],
error_out=False
)
chapters = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_chapter', book_id=book_id, page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_chapter', book_id=book_id, page=page+1, _external=True)
return jsonify({
'chapters': [chapter.to_json() for chapter in chapters],
'prev': prev,
'next': next
})
def is_hot_dog():
if request.method == 'POST':
if not 'file' in request.files:
return jsonify({'error': 'no file'}), 400
# Image info
img_file = request.files.get('file')
img_name = img_file.filename
mimetype = img_file.content_type
# Return an error if not a valid mimetype
if not mimetype in valid_mimetypes:
return jsonify({'error': 'bad-type'})
# Write image to static directory and do the hot dog check
img_file.save(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
hot_dog_conf = rekognizer.get_confidence(img_name)
# Delete image when done with analysis
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
is_hot_dog = 'false' if hot_dog_conf == 0 else 'true'
return_packet = {
'is_hot_dog': is_hot_dog,
'confidence': hot_dog_conf
}
return jsonify(return_packet)
def create():
form = UserCreateForm.from_json(request.get_json())
if not form.validate():
return jsonify(form.errors), 400
user = User()
user.email = form.data.get('email')
user.first_name = form.data.get('first_name')
user.last_name = form.data.get('last_name')
user.avatar = form.data.get('avatar', None)
user.password = User.make_password(form.data.get('password'))
user.save()
access_token = jwt.jwt_encode_callback(user)
return jsonify({'user': user.serialize(), 'access_token': access_token.decode('utf-8')}), 200
def login():
data = request.get_json()
username = data.get(current_app.config.get('JWT_AUTH_USERNAME_KEY'), None)
password = data.get(current_app.config.get('JWT_AUTH_PASSWORD_KEY'), None)
criterion = [username, password, len(data) == 2]
if not all(criterion):
return jsonify({'message': 'Invalid credentials'}), 401
user = jwt.authentication_callback(username, password)
if user:
if not user.is_active:
return jsonify({'message': 'InActive User'}), 401
access_token = jwt.jwt_encode_callback(user)
return jsonify({'user': user.serialize(), 'access_token': access_token.decode('utf-8')}), 200
else:
return jsonify({'message': 'Invalid credentials'}), 401
def rundeck_list_groups():
'''
Return the list of groups from all available profiles
'''
resp_obj = {}
awsconfig = aws_config.AwsConfig()
profiles = awsconfig.get_profiles()
for profile in profiles:
session = boto3.Session(profile_name=profile)
iamclient = session.client('iam')
try:
groupinfo = iamclient.list_groups()
except botocore.exceptions.ClientError:
groupinfo['Groups'] = []
for group in groupinfo['Groups']:
grouptext = "(%s) %s" % (profile, group['GroupName'])
resp_obj[grouptext] = group['GroupName']
return jsonify(resp_obj)
def rundeck_list_iam_policies():
'''
Return the list of profiles from all available profiles
'''
resp_obj = {}
awsconfig = aws_config.AwsConfig()
profiles = awsconfig.get_profiles()
for profile in profiles:
session = boto3.Session(profile_name=profile)
iamclient = session.client('iam')
try:
policyinfo = iamclient.list_policies()
except botocore.exceptions.ClientError:
policyinfo['Policies'] = []
for policy in policyinfo['Policies']:
policytext = "(%s) %s" % (profile, policy['PolicyName'])
resp_obj[policytext] = policy['PolicyName']
return jsonify(resp_obj)
def detailed_id():
ili_id = request.args.get('ili_id', None)
rate_hist = fetch_rate_id([ili_id])
comm_hist = fetch_comment_id([ili_id])
users = fetch_allusers()
r_html = ""
for r, u, t in rate_hist[int(ili_id)]:
r_html += '{} ({}): {} <br>'.format(users[u]['userID'], t, r)
c_html = ""
for c, u, t in comm_hist[int(ili_id)]:
c_html += '{} ({}): {} <br>'.format(users[u]['userID'], t, c)
html = """
<td colspan="9">
<div style="width: 49%; float:left;">
<h6>Ratings</h6>
{}</div>
<div style="width: 49%; float:right;">
<h6>Comments</h6>
{}</div>
</td>""".format(r_html, c_html)
return jsonify(result=html)
def min_omw_concepts(ss=None, ili_id=None):
if ili_id:
ss_ids = f_ss_id_by_ili_id(ili_id)
else:
ss_ids = [ss]
pos = fetch_pos()
langs_id, langs_code = fetch_langs()
ss, senses, defs, exes, links = fetch_ss_basic(ss_ids)
ssrels = fetch_ssrel()
return jsonify(result=render_template('min_omw_concept.html',
pos = pos,
langs = langs_id,
senses=senses,
ss=ss,
links=links,
ssrels=ssrels,
defs=defs,
exes=exes))
def network(interface=None):
if interface:
try:
netifaces.ifaddresses(interface)
interfaces = [interface]
except ValueError:
return jsonify({"message": "interface {} not available".format(interface)}), 404
else:
interfaces = netifaces.interfaces()
data = dict()
for i in interfaces:
try:
data[i] = netifaces.ifaddresses(i)[2]
except KeyError:
data[i] = {"message": "AF_INET data missing"}
return jsonify(data)
def run_selected_case():
# return jsonify(dict(name='selenium'))
data = request.get_json()
start = handle.now_str()
# ???mongo??case??
db = MongoClient()
case_list = db.get_case_by_name(data.get('case_name'))
obj = apiFunc()
rt = obj.run_tests(case_list, app.config['THREAD_NUM'])
report = obj.writeReport(rt)
html = render_template('testResult.html',failNum=rt['failed_num'], ignored=rt['ignored'],
successNum=rt['success_num'], total=rt['total'], start=start,
end=handle.now_str(), cost="{:.2}?".format(handle.delta(start, handle.now_str())),
fileName=report)
return jsonify(dict(html=html))
def taskstatus(task_id):
task = analyzetweets.AsyncResult(str(task_id))
if task.state == "PENDING":
response = {
"state": task.state,
"current": 0,
"total": NUMBER_OF_TWEETS
}
elif task.state != "FAILURE":
response = {
"state": task.state,
"current": task.info.get("current", 0),
"total": NUMBER_OF_TWEETS
}
if "subjectivityavg" in task.info:
response["subjectivityavg"] = task.info["subjectivityavg"]
if "sentimentavg" in task.info:
response["sentimentavg"] = task.info["sentimentavg"]
else:
response = {
"state": task.state,
"current": 1,
"total": NUMBER_OF_TWEETS
}
return jsonify(response)
def login():
"""Login POST handler.
Only runs when ``/login`` is hit with a POST method. There is no GET method
equivilent, as it is handled by the navigation template. Sets the status
code to ``401`` on login failure.
Returns:
JSON formatted output describing success or failure.
"""
log.debug("Entering login, attempting to authenticate user.")
username = request.form['signin_username']
password = request.form['signin_password']
log.debug("Username: {0}".format(username))
if fe.check_auth(username, password):
log.debug("User authenticated. Trying to set session.")
session_id = fe.set_login_id()
session['logged_in'] = session_id
log.debug("Session ID: {0}, returning to user".format(session_id))
return jsonify({ "login": "success" })
log.debug("Username or password not recognized, sending 401.")
response.status = 401
return jsonify({ "login": "failed" })
def star():
"""Starring/Highlighting handler.
Attempts to toggle a star/highlight on a particular show. The show ID must
be passed in the ``id`` query string. If the user is unauthenticated, the
function is aborted with a ``404`` message to hide the page.
Returns:
JSON formatted output describing success and the ID of the show starred.
"""
log.debug("Entering star, trying to toggle star.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.star_show(request.args['id'])
log.debug("Returning to user.")
return jsonify({ "star": "success", "id": request.args['id'] })
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def scan_scrapers():
"""On demand scrapper scanning handler.
For some reason the scheduler doesn't always work, this endpoint allows for
instant scanning, assuming it's not already occurring. The function is aborted
with a ``404`` message to hide the page if the user is not authenticated.
Scanning can take a long time - 20 to 30 minutes - so it's recommended this
endpoint be called asynchronously.
Returns:
JSON formatted output to identify that scanning has completed or is already
ongoing.
"""
log.debug("Entering scan_scrapers.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("User is logged in, attempting to begin scan.")
if not fe.scrape_shows():
log.debug("scrape_shows returned false, either the lockfile exists incorrectly or scraping is ongoing.")
return jsonify({"scan":"failure", "reason":"A scan is ongoing"})
log.debug("scrape_shows just returned. Returning success.")
return jsonify({"scan":"success"})
log.debug("User cannot be authenticated, send 404 to hide page.")
abort(404)
def get_role_credentials(api_version, requested_role, junk=None):
try:
role_params = roles.get_role_params_from_ip(
request.remote_addr,
requested_role=requested_role
)
except roles.UnexpectedRoleError:
return '', 403
try:
assumed_role = roles.get_assumed_role_credentials(
role_params=role_params,
api_version=api_version
)
except roles.GetRoleError as e:
return '', e.args[0][0]
return jsonify(assumed_role)
def get_instance_identity_document(api_version):
time_format = "%Y-%m-%dT%H:%M:%SZ"
now = datetime.datetime.now(dateutil.tz.tzutc())
ret = {
'privateIp': '127.255.0.1',
'devpayProductCodes': None,
'availabilityZone': 'us-east-1a',
'version': '2010-08-31',
'accountId': '1234',
'instanceId': 'i-{0}'.format(app.config['MOCKED_INSTANCE_ID']),
'billingProducts': None,
'instanceType': 't2.medium',
# This may be a terrible mock for this...
'pendingTime': now.strftime(time_format),
'imageId': 'ami-1234',
'kernelId': None,
'ramdiskId': None,
'architecture': 'x86_64',
'region': 'us-east-1'
}
return jsonify(ret)
def iam_sts_credentials(api_version, requested_role, junk=None):
if not _supports_iam(api_version):
return passthrough(request.path)
try:
role_params = roles.get_role_params_from_ip(
request.remote_addr,
requested_role=requested_role
)
except roles.UnexpectedRoleError:
msg = "Role name {0} doesn't match expected role for container"
log.error(msg.format(requested_role))
return '', 404
log.debug('Providing assumed role credentials for {0}'.format(role_params['name']))
assumed_role = roles.get_assumed_role_credentials(
role_params=role_params,
api_version=api_version
)
return jsonify(assumed_role)
def post(self, params=None):
user = SharedSessionMaster.new(
alias=params['client_alias']
)
secret = SharedSessionSecret.new(
shares=params['session_policies']['shares'],
quorum=params['session_policies']['quorum'],
protocol=params['session_policies']['protocol']
)
session = CombineSession.new(
session_id=params.get('session_id'),
master=user,
alias=params['session_alias'],
session_type=params['session_type'],
secret=secret
).store()
return flask.jsonify(
{
"session": session.to_api(auth=user.uuid),
"session_id": str(session.uuid)
}
)
def put(self, session_id, params=None):
session = CombineSession.get(session_id)
if not session:
raise exceptions.ObjectNotFoundException
user = params.get('auth') and session.get_user(params['auth'], alias=str(params['client_alias'])) \
or session.join(params['client_alias'])
if not user:
raise exceptions.ObjectDeniedException
user.session = session
if params.get('share'):
session.secret.add_share(Share(params['share'], str(user.uuid)))
session.update()
return flask.jsonify(
{
"session": session.to_api(auth=str(user.uuid)),
"session_id": str(session.uuid)
}
)
def get(self, session_id, params=None):
session = SplitSession.get(session_id, auth=params['auth'])
if not session:
raise exceptions.ObjectNotFoundException
if not session.ttl:
raise exceptions.ObjectExpiredException
if session.current_user.is_shareholder and session.secret.splitted and not \
session.secret.user_have_share(session.current_user):
session.current_user.secret.attach_user_to_share(session.current_user)
session.update()
return flask.jsonify(
{
"session": session.to_api(auth=params['auth']),
"session_id": str(session.uuid)
}
)
def upload():
starttime = time.time()
groupid = None
if request.args.get('groupid'):
groupid = request.args.get('groupid')
else:
groupid = (int(time.time()) * 1000) + random.randint(0,999)
f = request.files.getlist('vidfiles')
savelocation = './videos/{0}'.format(groupid)
if not os.path.exists(savelocation):
os.makedirs(savelocation)
for file in f:
file.save(os.path.join(savelocation,file.filename.lower()))
endtime = time.time()
totaltime = endtime-starttime
thread = threading.Thread(target=video_processing, args=(str(groupid),))
thread.start()
return jsonify(groupid=groupid, time=totaltime)
def get(self, testcase_name): # pylint: disable=no-self-use
""" GET the info of one testcase"""
testcase = Testcase().show(testcase_name)
if not testcase:
return api_utils.result_handler(
status=1,
data="The test case '%s' does not exist or is not supported"
% testcase_name)
testcase_info = api_utils.change_obj_to_dict(testcase)
dependency_dict = api_utils.change_obj_to_dict(
testcase_info.get('dependency'))
testcase_info.pop('name')
testcase_info.pop('dependency')
result = {'testcase': testcase_name}
result.update(testcase_info)
result.update({'dependency': dependency_dict})
return jsonify(result)
def post(self):
check_user = check_login()
if check_user is None:
return jsonify({'status': 'error', 'msg': 'no authority'})
if check_user == -1:
return jsonify({'status': 'error', 'msg': 'user is valid'})
user_id = session.get('user_id')
counts = db.session.query(Message).filter(Message.to_id == user_id).count()
page = int(request.form.get('page', 0))
nums = (page - 1 if page >= 0 else 0) * 5
msgs = db.session.query(Message).filter(Message.to_id == user_id).order_by(Message.id.desc())[nums: nums + 5]
all_page = int(counts / 5) + (1 if counts % 5 != 0 else 0)
return jsonify({'status': 'ok', 'data': [msg.to_json() for msg in msgs], 'len': len(msgs), 'now': page,
'all': all_page})
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def run_cmd():
"""Execute general (non-host specific) command."""
#Read 'command' argument from query string.
comm = request.args.get('command')
if not comm or comm in EXCLUDED_FUNCTIONS:
abort(400)
try:
#Read 'args' argument from query string.
args = request.args.getlist('args')
#Cast arguments to string.
for i, val in enumerate(args):
args[i] = str(val)
except KeyError:
args = []
#Execute command.
ret = RSPET_API.call_plugin(comm, args)
if ret['code'] == rspet_server.ReturnCodes.OK:
http_code = 200
else:
http_code = 404
return make_response(jsonify(ret), http_code)
def general_help():
"""Return general help."""
hlp_sntx = RSPET_API.help()
#Read 'state' argument from query string.
cur_state = request.args.get('state')
#Remove excluded functions.
for hlp in EXCLUDED_FUNCTIONS:
if hlp in hlp_sntx:
hlp_sntx.pop(hlp)
#Remove out of scope functions.
tmp = {}
for hlp in hlp_sntx:
if cur_state in hlp_sntx[hlp]['states']:
tmp[hlp] = hlp_sntx[hlp]
return jsonify({'commands': [make_public_help(command, hlp_sntx[command])\
for command in tmp]})
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', userData=userData)
#resp.set_cookie(key="logged_in", value='true', expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))