def employee_location_list(request):
"""
Returns employee location full list
---
serializer: employees.serializers.EmployeeLocationListSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'GET':
location_list = get_list_or_404(Location)
serializer = EmployeeLocationListSerializer(location_list, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
python类Response()的实例源码
def employee_role_list(request):
"""
Returns employee role full list
---
serializer: employees.serializers.EmployeeRoleListSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'GET':
role_list = get_list_or_404(Role)
serializer = EmployeeRoleListSerializer(role_list, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all badges
---
serializer: administrator.serializers.BadgeSerializer
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
badges = get_list_or_404(Badge)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(badges, request)
serializer = BadgeSerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = BadgeSerializer(badges, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
def SwaggerRequestMethodMaker(model = None):
def model_handler(self, request, *args, **kwargs):
data = kwargs.get('data', None)
resp = model
return Response(resp, status = status.HTTP_200_OK)
def empty_handler(self, request, *args, **kwargs):
data = kwargs.get('data', None)
resp = None
return Response(resp, status = status.HTTP_200_OK)
if model:
return model_handler
else:
return empty_handler
# make named APIView class with specified methods
def get(self, request, **kwargs):
name = kwargs['name']
try:
project = models.Project.objects.get(name=name)
s = self.serializer_class(project)
return Response(s.data)
except:
pass
qs = models.Project.objects.filter(name__icontains=name)
if qs.count() > 0:
s = self.serializer_class(qs, many=True)
return Response(s.data)
return Response(status=404)
def post(self, request): # noqa
username = request.data.get('username')
password = request.data.get('password')
device_id = request.data.get('device_id') or ''
if not username or not password:
return Response(
{'error': 'Missing username or password'},
status=status.HTTP_400_BAD_REQUEST
)
user = authenticate(
username=username.lower(), password=password
)
if not user:
raise InvalidEmailOrPasswordAPIException()
auth_token, _ = AuthToken.objects.get_or_create(
user=user, device_id=device_id
)
return Response({'token': auth_token.key})
def time_entry_item(request, id):
try:
item = TimeEntry.objects.get(user=request.user, id=id)
except NoCurrentEntry:
return Response(status=status.HTTP_404_NOT_FOUND)
user = request.user
if request.method == 'GET':
serializer = TimeEntrySerializer(request.user, item)
return Response(serializer.data)
elif request.method == 'PUT':
serializer = TimeEntrySerializer(request.user, item, data=request.data)
if (serializer.is_valid()):
serializer.save()
return Response(serializer.data)
return Response(serializer.errors)
elif request.method == 'DELETE':
pass
def view_autocomplete(self, request, group, **kwargs):
field = request.GET.get('autocomplete_field')
query = request.GET.get('autocomplete_query')
if field != 'issue_id' or not query:
return Response({'issue_id': []})
repo = self.get_option('repo', group.project)
client = self.get_client(request.user)
try:
response = client.search_issues(
query=(u'repo:%s %s' % (repo, query)).encode('utf-8'),
)
except Exception as e:
return self.handle_api_error(e)
issues = [
{
'text': '(#%s) %s' % (i['number'], i['title']),
'id': i['number']
} for i in response.get('items', [])
]
return Response({field: issues})
def view_autocomplete(self, request, group, **kwargs):
field = request.GET.get('autocomplete_field')
query = request.GET.get('autocomplete_query')
if field != 'issue_id' or not query:
return Response({'issue_id': []})
query = query.encode('utf-8')
_url = '%s?%s' % (self.build_api_url(group, 'search'), urlencode({'query': query}))
try:
req = self.make_api_request(group.project, _url)
body = safe_urlread(req)
except (requests.RequestException, PluginError) as e:
return self.handle_api_error(e)
try:
json_resp = json.loads(body)
except ValueError as e:
return self.handle_api_error(e)
resp = json_resp.get('stories', {})
stories = resp.get('stories', [])
issues = [{'text': '(#%s) %s' % (i['id'], i['name']), 'id': i['id']} for i in stories]
return Response({field: issues})
def put(self, request, id, format=None):
"""Summary
Args:
request (TYPE): Description
id (TYPE): Description
format (None, optional): Description
Returns:
TYPE: Description
"""
process = self.get_object(id)
serializer = serializers.ProcessSerializer(process, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def delete(self, request, id, format=None):
"""Summary
Args:
request (TYPE): Description
id (TYPE): Description
format (None, optional): Description
Returns:
TYPE: Description
"""
process = self.get_object(id)
process.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# FBV
# class MyOpenAPIRenderer(OpenAPIRenderer):
# def get_customizations(self):
# data = super(MyOpenAPIRenderer, self).get_customizations()
# data['paths'] = custom_data['paths']
# data['info'] = custom_data['info']
# data['basePath'] = custom_data['basePath']
# return data
def upload_avatar(request):
"""
Sets new user's avatar.
"""
with transaction.atomic():
profile_user = get_object_or_404(TheUser, auth_token=request.data.get('user_token'))
try:
profile_user.user_photo.save('user_{}.png'.format(profile_user.id), request.data.get('file'))
profile_user.save()
logger.info("User '{}' changed his avatar.".format(profile_user))
resize_image(profile_user.user_photo.path, AVATAR_WIDTH)
logger.info("Image '{}' successfully resized!".format(profile_user.user_photo.path))
return Response({'status': 200,
'detail': 'successful',
'data': {'profile_image': profile_user.user_photo.url}})
except ValidationError:
logger.info("User '{}' tried to upload not an image as avatar!".format(profile_user))
return Response({'status': 404,
'detail': 'tried to upload not an image',
'data': {}})
def user_login(request):
"""
Authenticates user and returns the token which uses to access to the API.
"""
user = authenticate(username=request.data.get('username'),
password=request.data.get('password'))
if user:
user_token = TheUser.objects.get(id_user=user).auth_token
login(request, user)
logger.info("User '{}' logged in.".format(user.username))
return Response({'status': 200,
'detail': 'successful',
'data': {'token': user_token}})
return Response({'status': 404,
'detail': 'not authenticated',
'data': {'token': None}})
# ----------------------------------------------------------------------------------------------------------------------
def is_user_exists(request):
"""
Checks if user is exists. If exists return True, else False.
"""
try:
User.objects.get(username=request.data.get('username'))
return Response({'status': 200,
'detail': 'successful',
'data': {'user': True}})
except ObjectDoesNotExist:
return Response({'status': 200,
'detail': 'successful',
'data': {'user': False}})
# ----------------------------------------------------------------------------------------------------------------------
def is_mail_exists(request):
"""
Checks if mail is exists. If exists return True, else False.
"""
try:
User.objects.get(email=request.data.get('email'))
return Response({'status': 200,
'detail': 'successful',
'data': {'email': True}})
except ObjectDoesNotExist:
return Response({'status': 200,
'detail': 'successful',
'data': {'email': False}})
# ----------------------------------------------------------------------------------------------------------------------
def selected_category(request):
"""
Returns books from selected category.
"""
user = get_object_or_404(TheUser, auth_token=request.data.get('user_token'))
category = get_object_or_404(Category, id=request.data.get('category_id'))
books = Book.objects.filter(id_category=category).order_by('book_name')
filtered_books = Book.exclude_private_books(user.id_user, books)
paginator = Paginator(filtered_books, OUTPUT_BOOKS_PER_PAGE)
page = paginator.page(request.data.get('page'))
next_page = page.has_next()
page_books = page.object_list
return Response({'status': 200,
'detail': 'successful',
'data': {'books': [BookSerializer(book).data for book in page_books],
'next_page': page.next_page_number() if next_page else 0}})
# ----------------------------------------------------------------------------------------------------------------------
def find_book(request):
"""
Generates list with books of data which user entered. At first it check full equality in name,
after tries to check if contains some part of entered data.
"""
user = get_object_or_404(TheUser, auth_token=request.data.get('user_token'))
search_data = request.data.get('search_term')
filtered_books = Book.exclude_private_books(user.id_user, Book.fetch_books(search_data))
paginator = Paginator(filtered_books, OUTPUT_BOOKS_PER_PAGE)
page = paginator.page(request.data.get('page'))
next_page = page.has_next()
page_books = page.object_list
return Response({'status': 200,
'detail': 'successful',
'data': {'books': [BookSerializer(book).data for book in page_books],
'next_page': page.next_page_number() if next_page else 0}})
def add_book_to_home(request):
"""
Adds book to list of user's added books.
"""
user = get_object_or_404(TheUser, auth_token=request.data.get('user_token'))
book = get_object_or_404(Book, id=request.data.get('book_id'))
if book.private_book and book.who_added != user:
return Response({}, status=404)
if AddedBook.objects.filter(id_user=user, id_book=book).exists():
return Response({}, status=404)
AddedBook.objects.create(id_user=user, id_book=book)
logger.info("User '{}' added book with id: '{}' to his own library.".format(user.id_user.id, book.id))
return Response({'status': 200,
'detail': 'success',
'data': {}})
# ----------------------------------------------------------------------------------------------------------------------
def open_book(request):
"""
Returns the book of last readed page.
"""
user = get_object_or_404(TheUser, auth_token=request.data.get('user_token'))
book = get_object_or_404(Book, id=request.data.get('book_id'))
added_book = get_object_or_404(AddedBook, id_book=book, id_user=user)
added_book.last_read = added_book.last_read.now()
added_book.save()
logger.info("User '{}' opened book with id: '{}'.".format(user, book.id))
return Response({'status': 200,
'detail': 'successful',
'data': {'last_page': added_book.last_page}})
# ----------------------------------------------------------------------------------------------------------------------
def set_current_page(request):
"""
Changes current readed page for book of the user.
"""
user = get_object_or_404(TheUser, auth_token=request.data.get('user_token'))
book = get_object_or_404(Book, id=request.data.get('book_id'))
current_page = request.data.get('current_page')
if not isinstance(current_page, int):
return Response({'status': 400,
'detail': 'current page not a number',
'data': {}}, status=400)
added_book = AddedBook.objects.get(id_book=book, id_user=user)
added_book.last_page = current_page
added_book.save()
logger.info("User '{}' on book with id: '{}' changed page to: '{}'."
.format(user, book.id, current_page))
return Response({'status': 200,
'detail': 'successful',
'data': {}})
def save_support_message(request):
"""
Saves a support message if validated.
"""
message = request.data.get('text')
if len(message) > 5000:
return Response({'status': 400,
'detail': 'message length more than 5000 symbols',
'data': {}})
SupportMessage.objects.create(email=request.data.get('email'), text=message)
return Response({'status': 200,
'detail': 'successful',
'data': {}})
def category_detail(request, category_id):
"""
Category detail
---
serializer: categories.serializers.CategorySerializer
responseMessages:
- code: 400
message: Bad request.
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
category = get_object_or_404(Category, pk=category_id)
if request.method == 'GET':
serializer = CategorySerializer(category)
return Response(serializer.data, status=status.HTTP_200_OK)
def keyword_create(request):
"""
Creates a new keyword
---
serializer: categories.serializers.KeywordSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'POST':
serializer = KeywordSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def keyword_detail(request, keyword_id):
"""
Keyword detail
---
serializer: categories.serializers.KeywordSerializer
responseMessages:
- code: 400
message: Bad request.
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
keyword = get_object_or_404(Keyword, pk=keyword_id)
if request.method == 'GET':
serializer = KeywordSerializer(keyword)
return Response(serializer.data, status=status.HTTP_200_OK)
def employee_activate(request, employee_id, action):
"""
Activate employee account, action could be true or false
---
response_serializer: employees.serializers.EmployeeSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'PATCH':
employee = get_object_or_404(Employee, pk=employee_id)
if action == 'true':
employee.is_active = True
elif action == 'false':
employee.is_active = False
else:
pass
employee.save()
serializer = EmployeeSerializer(employee)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def employee_block(request, employee_id, action):
"""
Block employee account, action could be true or false
---
response_serializer: employees.serializers.EmployeeSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'PATCH':
employee = get_object_or_404(Employee, pk=employee_id)
if action == 'true':
employee.is_blocked = True
elif action == 'false':
employee.is_blocked = False
else:
pass
employee.save()
serializer = EmployeeSerializer(employee)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def employee_logout(request):
"""
Logout employee
---
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
- code: 500
message: Internal Server Error
"""
if request.method == 'GET':
employee = request.user
try:
devices = EmployeeDevice.objects.filter(username=employee)
for device in devices:
device.delete()
except:
pass
logout(request)
content = {'detail': config.USER_LOGOUT}
return Response(content, status=status.HTTP_202_ACCEPTED)
def employee_admin(request, employee_id, action):
"""
Set or unset admin permission to employee, action could be true or false
---
response_serializer: employees.serializers.EmployeeSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden.
- code: 404
message: Not found
"""
if request.method == 'PATCH':
employee = get_object_or_404(Employee, pk=employee_id)
if action == 'true':
employee.is_staff = True
elif action == 'false':
employee.is_staff = False
else:
pass
employee.save()
serializer = EmployeeSerializer(employee)
return Response(serializer.data, status=status.HTTP_202_ACCEPTED)
def star(request, star_id):
"""
Returns star detail
---
serializer: stars.serializers.StarSerializer
responseMessages:
- code: 401
message: Unauthorized. Authentication credentials were not provided. Invalid token.
- code: 403
message: Forbidden, authentication credentials were not provided
- code: 404
message: Not found
"""
if request.method == 'GET':
star = get_object_or_404(Star, pk=star_id)
serializer = StarSerializer(star)
return Response(serializer.data, status=status.HTTP_200_OK)
def get(self, request, format=None):
"""
List all categories
---
parameters:
- name: pagination
required: false
type: string
paramType: query
"""
categories = get_list_or_404(Category)
if request.GET.get('pagination'):
pagination = request.GET.get('pagination')
if pagination == 'true':
paginator = AdministratorPagination()
results = paginator.paginate_queryset(categories, request)
serializer = CategorySerializer(results, many=True)
return paginator.get_paginated_response(serializer.data)
else:
return Response(status=status.HTTP_400_BAD_REQUEST)
else:
serializer = CategorySerializer(categories, many=True)
return Response(serializer.data, status=status.HTTP_400_BAD_REQUEST)