def class_view(request):
ctx = request.context
user_id = authenticated_userid(request) or Everyone
check = check_permissions(ctx, user_id, CrudPermissions.READ)
view = request.GET.get('view', None) or ctx.get_default_view() or 'id_only'
tombstones = asbool(request.GET.get('tombstones', False))
q = ctx.create_query(view == 'id_only', tombstones)
if check == IF_OWNED:
if user_id == Everyone:
raise HTTPUnauthorized()
q = ctx.get_target_class().restrict_to_owners(q, user_id)
if view == 'id_only':
return [ctx._class.uri_generic(x) for (x,) in q.all()]
else:
permissions = ctx.get_permissions()
r = [i.generic_json(view, user_id, permissions) for i in q.all()]
return [x for x in r if x is not None]
# @view_config(context=InstanceContext, renderer='json', name="jsonld",
# request_method='GET', permission=P_READ,
# accept="application/ld+json;q=0.9")
# @view_config(context=InstanceContext, renderer='json',
# request_method='GET', permission=P_READ,
# accept="application/ld+json;q=0.9")
python类view_config()的实例源码
def get_user_reviews(request):
"""RESTful version of getting all reviews of a task
"""
logger.debug('get_user_reviews is running')
reviewer_id = request.matchdict.get('id', -1)
# also try to get reviews with specified status
review_status = request.params.get('status', None)
if review_status:
where_conditions = \
"""where "Reviews".reviewer_id = %(reviewer_id)s and
"Reviews_Statuses".code = '%(status)s' """ % {
'reviewer_id': reviewer_id,
'status': review_status
}
else:
where_conditions = """where "Reviews".reviewer_id = %(reviewer_id)s""" % {
'reviewer_id': reviewer_id
}
return get_reviews(request, where_conditions)
# @view_config(
# route_name='get_user_reviews_count',
# renderer='json'
# )
def update_email_settings(request):
session = DBSession()
user_id = authenticated_userid(request)
if not user_id:
return HTTPFound('/login')
new_email = request.params.get('email')
contactable = True if request.params.get('emailContact') == "on" else False
session.query(User).filter(User.id == user_id).\
update({User.contactable: contactable, User.email: new_email})
params = {"message": "Congratulations! Email settings successfully updated",
"message_type": "success"}
return HTTPFound(location=request.route_url('account_settings', _query=params))
# @view_config(route_name='home', renderer='../templates/login.mako')
# def home(request):
# try:
# user = get_user(request)
# headers = remember(request, user.id)
# return HTTPFound("/team", headers=headers)
# except:
# return HTTPFound("/login")
# # return common_context(
# # request.registry.settings['SOCIAL_AUTH_AUTHENTICATION_BACKENDS'],
# # load_strategy(request),
# # user=get_user(request),
# # plus_id=request.registry.settings.get(
# # 'SOCIAL_AUTH_GOOGLE_PLUS_KEY'
# # ),
# # )
def done(request):
user = get_user(request)
headers = remember(request, user.id)
return HTTPFound('/team', headers=headers)
# return {"user": get_user(request),
# "plus_id": request.registry.settings.get(
# 'SOCIAL_AUTH_STEAM_KEY'
# ),
# }
# return common_context(
# request.registry.settings['SOCIAL_AUTH_AUTHENTICATION_BACKENDS'],
# load_strategy(request),
# user=get_user(request),
# plus_id=request.registry.settings['SOCIAL_AUTH_GOOGLE_PLUS_KEY'],
# )
# @view_config(route_name='email_required', renderer='common:templates/home.jinja2')
# def email_required(request):
# strategy = load_strategy(request)
# partial_token = request.GET.get('partial_token')
# partial = strategy.partial_load(partial_token)
# return common_context(
# request.registry.settings['SOCIAL_AUTH_AUTHENTICATION_BACKENDS'],
# strategy,
# user=get_user(request),
# plus_id=request.registry.settings['SOCIAL_AUTH_GOOGLE_PLUS_KEY'],
# email_required=True,
# partial_backend_name=partial.backend,
# partial_token=partial_token
# )
def add_localizer(event):
request = event.request
localizer = get_localizer(request)
if '_LOCALE_' in request.GET:
language = request.GET['_LOCALE_']
print('lang is ', language)
response = request.response
response.set_cookie('_LOCALE_',
value=language,
max_age=31536000)
def auto_translate(string, mapping=None, domain=None):
return localizer.translate(tsf(string),mapping=mapping, domain=domain)
request.localizer = localizer
request.translate = auto_translate
# @view_config(route_name='locale')
# def set_locale_cookie(request):
# if 'language' in request.GET:
# language = request.GET['language']
# response = Response()
# response.set_cookie('_LOCALE_',
# value=language,
# max_age=31536000) # max_age = year
# return HTTPFound(location='/',
# headers=response.headers)
# class MyRequest(Request):
# pass
def view_config(*args, **kwargs):
"""Override pyramid's view_config to log API requests and responses."""
return pyramid_view_config(*args, decorator=logging_view_decorator,
**kwargs)
def get_task_last_reviews(request):
"""RESTful version of getting all reviews of a task
"""
logger.debug('get_task_last_reviews is running')
task_id = request.matchdict.get('id', -1)
task = Task.query.filter(Task.id == task_id).first()
if not task:
transaction.abort()
return Response('There is no task with id: %s' % task_id, 500)
where_condition1 = """where "Review_Tasks".id = %(task_id)s""" % {
'task_id': task_id
}
where_condition2 = ''
logger.debug("task.status.code : %s" % task.status.code)
if task.status.code == 'PREV':
where_condition2 = """ and "Review_Tasks".review_number +1 = "Reviews".review_number"""
where_conditions = '%s %s' % (where_condition1, where_condition2)
reviews = get_reviews(request, where_conditions)
else:
# where_condition2 =""" and "Review_Tasks".review_number = "Reviews".review_number"""
reviews = [
{
'review_number': task.review_number,
'review_id': 0,
'review_status_code': 'WTNG',
'review_status_name': 'Waiting',
'review_status_color': 'wip',
'task_id': task.id,
'task_review_number': task.review_number,
'reviewer_id': responsible.id,
'reviewer_name': responsible.name,
'reviewer_thumbnail_full_path':
responsible.thumbnail.full_path
if responsible.thumbnail else None,
'reviewer_department': responsible.departments[0].name
}
for responsible in task.responsible
]
return reviews
# @view_config(
# route_name='get_user_reviews',
# renderer='json'
# )
def api_textbook(request):
method, params, url_param = get_params(request)
if method == 'OPTIONS':
return preflight_handler(request)
# Get a deck from database
if method == "POST":
params = json.loads(params)
deckid = int(params['deckid'])
userid = url_param['userid']
uuid_list = params['uuids']
with transaction.manager:
for uuid in uuid_list:
importCardsFromCnxDb(uuid, deckid, CNXDB_HOST)
target_deck = DBSession.query(Deck).filter(
Deck.id == deckid and Deck.user_id == userid).first()
deck = card2dict(target_deck.cards)
deck['title'] = str(target_deck.title)
deck['color'] = str(target_deck.color)
deck['id'] = deckid
response = update_header(body=json.dumps(deck))
return response
# @view_config(route_name='api_textbook', renderer='json')
# def api_textbook(request):
# method, params, url_param = get_params(request)
# if method == 'OPTIONS':
# return preflight_handler(request)
#
# # Get a deck from database
# if method == "GET":
# params = json.loads(params)
# deckid = int(params['deckid'])
# uuid_list = params['uuids']
# for uuid in uuid_list:
# importCardsFromCnxDb(uuid, deckid, cnxdbHost)
#
# with transaction.manager:
# target_deck = DBSession.query(Deck).filter(Deck.id == deckid).first()
# deck = card2dict(target_deck.cards)
# deck['title'] = str(target_deck.title)
# deck['color'] = str(target_deck.color)
# deck['id'] = deck_id
#
# response = update_header(body=json.dumps(deck))
# return response
# @view_config(route_name='add_user', renderer = 'json')
# def add_user(request):
# req_post = request.POST
# user_name = str(req_post['user_name'])
# with transaction.manager:
# model = User(user_name=user_name)
# DBSession.add(model)
# return {'add user': 'success'}