def get(self, request, *args, **kwargs):
apis = [{"path": "/" + path} for path in self.get_resources()]
return Response({
"apiVersion": rfs.SWAGGER_SETTINGS.get("api_version", ""),
"swaggerVersion": "1.2",
"basePath": self.get_base_path(),
"apis": apis,
"info": rfs.SWAGGER_SETTINGS.get("info", {
"contact": "",
"description": "",
"license": "",
"licenseUrl": "",
"termsOfServiceUrl": "",
"title": "",
}),
})
python类Response()的实例源码
def delete(self, request, *args, **kwargs):
if models.DB.objects.filter(id=int(kwargs['pk'])).exists():
models.DB.objects.get(id=int(kwargs['pk'])).delete()
return Response({'info': '????'}, status=status.HTTP_201_CREATED)
else:
return Response({'info': '???????'}, status=status.HTTP_406_NOT_ACCEPTABLE)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
db = models.DB.objects.get(id=int(kwargs['pk']))
serializer.instance.db = db
serializer.save()
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def delete(self, request, *args, **kwargs):
if models.DB.objects.get(id=int(kwargs['pk'])).dbuser.filter(user=request.data['user'],ip=request.data['ip']).exists():
models.DB.objects.get(id=int(kwargs['pk'])).dbuser.filter(user=request.data['user'], ip=request.data['ip']).delete()
return Response({'info': '????'}, status=status.HTTP_201_CREATED)
else:
return Response({'info': '?????????'}, status=status.HTTP_406_NOT_ACCEPTABLE)
def delete(self, request, *args, **kwargs):
if models.ExtendUser.objects.filter(id=int(kwargs['pk'])).exists():
models.ExtendUser.objects.get(id=int(kwargs['pk'])).delete()
return Response({'info': '????'}, status=status.HTTP_201_CREATED)
else:
return Response({'info': '??????'}, status=status.HTTP_406_NOT_ACCEPTABLE)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
if models.Script.objects.get(id=int(kwargs['pk'])).scriptargs.filter(args_name=request.data['args_name']).exists():
return Response({'info':'???????????'},status=status.HTTP_406_NOT_ACCEPTABLE)
else:
serializer.save()
script = models.Script.objects.get(id=int(kwargs['pk']))
serializer.instance.script=script
serializer.save()
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
playbook = models.PlayBook.objects.get(id=int(kwargs['pk']))
serializer.instance.playbook=playbook
serializer.instance.sort = playbook.sort
playbook.sort=playbook.sort+1
playbook.save()
serializer.save()
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
def delete(self, request, *args, **kwargs):
# return self.destroy(self,request,args,kwargs)
if models.PlayBook.objects.get(id=int(kwargs['pk'])).tasks.filter(sort=request.data['sort']).exists():
models.PlayBook.objects.get(id=int(kwargs['pk'])).tasks.filter(sort=request.data['sort']).delete()
return Response({'info': '????'}, status=status.HTTP_201_CREATED)
else:
return Response({'info': '??????????'}, status=status.HTTP_406_NOT_ACCEPTABLE)
def update(self, request, *args, **kwargs):
tasks = models.PlayBook.objects.get(id=int(kwargs['pk'])).tasks
taskA = tasks.get(sort=int(request.data['parta']))
taskB = tasks.get(sort=int(request.data['partb']))
tempsort=taskA.sort
taskA.sort=taskB.sort
taskB.sort=tempsort
taskA.save()
taskB.save()
return Response({'info': '????'}, status=status.HTTP_201_CREATED)
def delete(self, request, *args, **kwargs):
group = models.Group.objects.get(id=int(kwargs['pk']))
if group.hosts.count() != 0:
return Response({'detail': '?????????????'}, status=status.HTTP_406_NOT_ACCEPTABLE)
else:
group.delete()
return Response({'detail': '????'}, status=status.HTTP_201_CREATED)
def delete(self, request, *args, **kwargs):
host = models.Host.objects.get(id=int(kwargs['pk']))
if host.storages.count() != 0:
return Response({'detail': '????????????'}, status=status.HTTP_406_NOT_ACCEPTABLE)
elif len(host.application_get()) !=0:
return Response({'detail': '????????????'}, status=status.HTTP_406_NOT_ACCEPTABLE)
else:
models.Host.objects.get(id=int(kwargs['pk'])).delete()
return Response({'detail': '????'}, status=status.HTTP_201_CREATED)
def login_user(request):
data = json.loads(str(request.data))
username = data['username']
password = data['password']
user = authenticate(username=username, password=password)
if user is not None:
token = Token.objects.get_or_create(user=user)
login(request, user)
user = UserSerializer(user)
data = {
'message': 'Valid User',
'token': str(token[0]),
'user': user.data,
'code': 100,
}
else:
data = {
'message': 'Invalid Details',
'code': 101
}
return Response(status=200, data=data)
def get(self, request, path, *args, **kwargs):
result = Response({
"apiVersion": rfs.SWAGGER_SETTINGS.get("api_version", ""),
"swaggerVersion": "1.2",
"basePath": self.api_full_uri.rstrip("/"),
"resourcePath": "/" + path,
"apis": NotificationsDocs.generate_notifications_docs(path),
"models": NotificationsDocs.generate_models_docs(),
})
return result
# -- CODE TAKEN FROM DJANGO REST SWAGGER & MODIFIED
def post(self, request, *args, **kwargs):
data = request.data
serializer = UserLoginSerializer(data=data)
if serializer.is_valid(raise_exception=True):
user = User.objects.get(username=serializer.data['username'])
token = self.__create_token(user=user)
ret_json = {
"Token": token.key,
"error_code": 0
}
# new_data = serializer.data
return Response(ret_json, status=status.HTTP_200_OK)
return Response(serializer.error_messages, status=status.HTTP_401_UNAUTHORIZED)
def get(self, request):
user = request.user
user = Token.objects.get(user=user)
user.delete()
ret_json = {
'error_code': 0
}
return Response(ret_json, status=status.HTTP_200_OK)
def post(self, request):
request.user.notifications.mark_all_as_read()
return Response(status=status.HTTP_200_OK)
def post(self, request, pk):
try:
request.user.notifications.get(pk=pk).mark_as_read()
return Response(status=status.HTTP_200_OK)
except Exception as e:
print(repr(e))
return Response(status=status.HTTP_400_BAD_REQUEST)
def get(self, request):
"""returns """
name = self.request.GET.get('name', None)
return Response(self.get_matching_items(name))