def download_log(request):
if request.method == 'GET':
log_path = request.GET.get('log_path')
log_name = request.GET.get('log_name')
print('log_path:',log_path,'log_name:',log_name)
#??zip?????
zip_file_name = log_name + '.zip'
#?????????????
zip_dir = dao_config.log_dir_master + 'tmp/'+ zip_file_name
archive = zipfile.ZipFile(zip_dir, 'w', zipfile.ZIP_DEFLATED)
#??zip?????????
archive.write(log_path)
#????
archive.close()
print(zip_dir)
if os.path.isfile(zip_dir):
response = StreamingHttpResponse(readFile(zip_dir))
response['Content-Type'] = 'application/octet-stream'
response['Content-Disposition'] = 'attachment;filename="{0}"'.format(zip_file_name)
return response
else:
return HttpResponse('??????')
python类HttpResponse()的实例源码
def checkcode(request):
# ?????????
# file_path = "D:\python\mybbs02\statics\checkcodes"
# file_name = 'checkcode'
# img_str, img_data = CheckCode.gene_code(file_path, file_name)
# ??gene_code????????????????????????????
img_str, img_data = CheckCode.gene_code()
# ????????????session????login??????
request.session["checkcode"] = img_str
# ??????????????????????<img src="/your_url/">?????????
# img_data = open(file_name+".png", 'rb').read()
return HttpResponse(img_data)
def login(request):
if request.method == 'POST':
# ???????????????
username = request.POST.get('username')
password = request.POST.get('password')
# ?form????????????
check_code = request.POST.get('checkcode')
# ???? checkcod??????? session???????
session_code = request.session["checkcode"]
# ????????? ???session??? ????
if check_code.strip().lower() != session_code.lower():
return HttpResponse('??????')
else:
return HttpResponse('?????')
return render(request, 'login.html', {'error': "", 'username': '', 'password': ''})
def Subscribe(request, token):
'''
??ssr????
'''
username = token.split('&&')[0]
user = base64.b64decode(username)
try:
user = User.objects.get(username=user)
ss_user = user.ss_user
except:
return HttpResponse('ERROR')
# ??token
keys = base64.b64encode(bytes(user.username, 'utf-8')).decode('ascii') + \
'&&' + base64.b64encode(bytes(user.password, 'utf-8')).decode('ascii')
if token == keys:
# ????????
sub_code = ''
# ??????????
node_list = Node.objects.filter(level__lte=user.level, show='??')
for node in node_list:
sub_code = sub_code + node.get_ssr_link(ss_user) + "\n"
sub_code = base64.b64encode(bytes(sub_code, 'utf8')).decode('ascii')
return HttpResponse(sub_code)
else:
return HttpResponse('ERROR')
def nodeData(request):
'''
??????
?????
???????
'''
nodeName = [node.name for node in Node.objects.all()]
nodeTraffic = [TrafficLog.totalTraffic(
node.node_id) for node in Node.objects.all()]
data = {
'nodeName': nodeName,
'nodeTraffic': nodeTraffic,
}
result = json.dumps(data, ensure_ascii=False)
return HttpResponse(result, content_type='application/json')
def change_ss_port(request):
'''
?????????
??????
'''
user = request.user.ss_user
# ???????????
port = SSUser.randomPord()
user.port = port
user.save()
registerinfo = {
'title': '?????',
'subtitle': '??????{}?'.format(port),
'status': 'success',
}
result = json.dumps(registerinfo, ensure_ascii=False)
# AJAX ??json??
return HttpResponse(result, content_type='application/json')
def get(self, request, *args, **kwargs):
if request.GET.get('format', None) == 'json':
self.setup_queryset(*args, **kwargs)
# Put the project id into the context for the static_data_template
if 'pid' in kwargs:
self.static_context_extra['pid'] = kwargs['pid']
cmd = request.GET.get('cmd', None)
if cmd and 'filterinfo' in cmd:
data = self.get_filter_info(request, **kwargs)
else:
# If no cmd is specified we give you the table data
data = self.get_data(request, **kwargs)
return HttpResponse(data, content_type="application/json")
return super(ToasterTable, self).get(request, *args, **kwargs)
def edit_comment(request, id=None):
if request.method == 'POST':
comment = get_object_or_404(Comment, id=id)
content = request.POST.get('content')
if comment.user != request.user:
response = HttpResponse("You do not have permission to view this.")
response.status_code = 403
return response
parent_url = comment.content_object.get_absolute_url()
comment.content = content
comment.save()
return HttpResponseRedirect(parent_url)
return HttpResponseRedirect(request.META.get('HTTP_REFERER'))
def delete_comment(request, id=None):
# if request.method == 'POST':
# comment = get_object_or_404(Comment, id=id)
try:
comment = Comment.objects.get(id=id)
except:
raise Http404
if comment.user != request.user:
response = HttpResponse("You do not have permission to view this.")
response.status_code = 403
return response
parent_url = comment.content_object.get_absolute_url()
comment.delete()
return HttpResponseRedirect(parent_url)
def update_log(request):
if request.method == 'GET':
all_log = request.GET.get('all_log')
hostname = request.GET.get('hostname')
container_name = request.GET.get('container_name')
if hostname and container_name:
docker_download_log_path = docker_initial().docker_update_log(hostname=hostname,container_name=container_name)
return render(request, 'return_index.html', docker_download_log_path)
elif all_log:
docker_log_bak = docker_initial().docker_update_log(all_log=all_log)
if docker_log_bak:
return render(request, 'return_index.html',docker_log_bak)
else:
errors = {'return_results':'???????????!','log_name':None}
return render(request, 'return_index.html', errors)
return HttpResponse('???~')
def index(request):
context = {}
context['movie_list'] = []
context['types'] = TYPES
if request.method == 'GET':
movie_type = request.GET.get('type')
q = request.GET.get('q')
if q: # ?????????????????
context['movie_list'] = conn['spider']['movie'].find({'title': {'$regex': q}})
context['current_cat'] = '????"{}"'.format(q)
elif movie_type: # ??????????
if movie_type.encode('utf-8') in context['types']:
context['movie_list'] = conn['spider']['movie'].find({'types': movie_type})
context['current_cat'] = movie_type
else:
return HttpResponse('??????')
else: # ??????????????
context['movie_list'] = conn['spider']['movie'].find()
context['current_cat'] = '????'
return render(request, 'watchfilm/index.html', context)
def log_in(request):
if request.method == 'POST':
username = request.POST.get('username').strip()
password = request.POST.get('password').strip()
user = authenticate(username=username, password=password)
if not user:
User = get_user_model()
query = User.objects.filter(email__iexact=username)
if query:
username = query[0].username
user = authenticate(username=username, password=password)
if user:
if user.is_active:
login(request, user)
return redirect('/watchfilm/')
else:
return HttpResponse('not active')
else:
return HttpResponse('not valid')
else:
if request.method == 'GET':
return render(request, 'watchfilm/login.html')
def asset_cpu(request):
if request.method == "POST":
ip = request.META['REMOTE_ADDR']
cpu_info = request.POST
try:
asset = models.Asset.objects.get(ip=ip)
except:
return HttpResponse("no asset")
try:
models.CPU.objects.get(asset = asset)
models.CPU.objects.filter(asset = asset).update(cpu_model=cpu_info.get('cpu_model'),
cpu_count=cpu_info.get('cpu_count'),
cpu_core_count=cpu_info.get('cpu_core_count'),)
return HttpResponse("cpu asset already update")
except ObjectDoesNotExist:
models.CPU.objects.create(asset=asset,cpu_model=cpu_info.get('cpu_model'),
cpu_count=cpu_info.get('cpu_count'),
cpu_core_count=cpu_info.get('cpu_core_count'))
else:
pass
return HttpResponse("ok")
def check_connect(request):
if request.method == 'POST':
data = json.loads(request.body)
ip = data.get("ip")
port = int(data.get("port"))
uname = data.get("uname")
password = data.get("password")
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
try:
ssh.connect(ip, port, uname, password)
except paramiko.AuthenticationException:
ssh = None
if ssh is not None:
r = redis.Redis(host='localhost', port=6379, db=0)
if not r.get(ip):
r.set(ip, [port, uname, password])
return HttpResponse(json.dumps({"result": "success"}), content_type="application/json")
else:
return HttpResponse(json.dumps({"result": "failed"}), content_type="application/json")
else:
return Http404
def get(self, request, orgid):
'''
API Provide download file template
:param request:
:param orgid:
:return:
'''
return render(request,'upload/upload.html')
msg = {
"status": 1,
"data": '',
"msg": ''}
if self.verification(request) or 1:
msg['status'] = 0
msg['data'] = '/static/filetamplate/asset_template.xlsx'
# msg['msg'] = 'Verification does not pass'
return HttpResponse(json.dumps(msg))
def post(self, request, orgid):
msg = {"status": 0, "data": '', "msg": None}
file_path = self.wirte_excel(request)
if not file_path:
msg['status'] = 1
msg['msg'] = "?????????????"
return HttpResponse(json.dumps(msg))
data = self.read_excel(file_path)
if not data:
msg['status'] = 1
msg['msg'] = "The file has no content"
return HttpResponse(json.dumps(msg))
data['orgid'] = orgid
msg['data'] = data
return HttpResponse(json.dumps(data))
def get(self, request, orgid):
msg = {"status": 1, "data": '', "msg": None}
condition = request.GET.get('condition')
data_sum_group_items = Asset.objects.filter(orgid=orgid).values('%s' % condition).annotate(
c_sum=Count('%s' % condition))
data = []
for item in data_sum_group_items:
tmp = {}
tmp['name'] = item['%s' % condition]
tmp['sum_nub'] = item['c_sum']
data.append(tmp)
if not data:
msg['status'] = 0
msg['msg'] = 'No data has been found'
msg['data'] = data
return HttpResponse(json.dumps(data))
def dispatch(self, request, *args, **kwargs):
self.request = request
self.response_data = {"msg": "ok",
"status": "-1",
"data": {}}
if request.method in [item.upper() for item in self.ALLOWED_METHOD]:
method = getattr(self, request.method.lower())
if request.method.lower() in ["post",]:
self.request.data = json.loads(u"".join([item.decode("utf8")for item in self.request.readlines()]))
respose = method(request, *args, **kwargs)
if isinstance(respose, (list, dict)):
self.response_data["data"] = respose
self.response_data["status"] = "1"
else:
self.response_data["msg"] = respose
return HttpResponse(json.dumps(self.response_data, ensure_ascii=False), status=200,
content_type=" application/json")
def get_disk(request, hostname, timing, partition):
data_time = []
disk_percent = []
disk_name = ""
range_time = TIME_SECTOR[int(timing)]
disk = GetSysData(hostname, "disk", range_time)
for doc in disk.get_data():
unix_time = doc['timestamp']
times = time.localtime(unix_time)
dt = time.strftime("%m%d-%H:%M", times)
data_time.append(dt)
d_percent = doc['disk'][int(partition)]['percent']
disk_percent.append(d_percent)
if not disk_name:
disk_name = doc['disk'][int(partition)]['mountpoint']
data = {"data_time": data_time, "disk_name": disk_name, "disk_percent": disk_percent}
return HttpResponse(json.dumps(data))
def del_monitor_data(request, timing):
timing = int(timing)
if timing == 4:
db = GetSysData.connect_db()
db.drop_database("sys_info")
else:
host_list = Host.objects.all()
client = GetSysData.connect_db()
db = client.sys_info
for host in host_list:
try:
collection = db[host]
except:
continue
now_time = int(time.time())
del_time = now_time-TIME_SECTOR[int(timing)]
collection.remove({'timestamp': {'$lte': del_time}}, {"timestamp": 1})
return HttpResponse("ok")
def get_dir(args):
config = ConfigParser.RawConfigParser()
dirs = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
with open(dirs+'/adminset.conf', 'r') as cfgfile:
config.readfp(cfgfile)
a_path = config.get('config', 'ansible_path')
r_path = config.get('config', 'roles_path')
p_path = config.get('config', 'playbook_path')
s_path = config.get('config', 'scripts_path')
token = config.get('token', 'token')
ssh_pwd = config.get('token', 'ssh_pwd')
log_path = config.get('log', 'log_path')
log_level = config.get('log', 'log_level')
mongodb_ip = config.get('mongodb', 'mongodb_ip')
mongodb_port = config.get('mongodb', 'mongodb_port')
mongodb_user = config.get('mongodb', 'mongodb_user')
mongodb_pwd = config.get('mongodb', 'mongodb_pwd')
mongodb_collection = config.get('mongodb', 'collection')
webssh_domain = config.get('webssh', 'domain')
# ???????????????????????????
if args:
return vars()[args]
else:
return HttpResponse(status=403)
def insexport(request):
LOGGER.debug('Proceedings export page accessed', request)
uid = request.user.id
uname = request.user.username
res = Insolvency.objects.filter(uid=uid).order_by('desc', 'pk').distinct()
response = HttpResponse(content_type='text/csv; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename=sir.csv'
writer = csvwriter(response)
for idx in res:
dat = (
idx.desc,
str(idx.number),
str(idx.year),
'ano' if idx.detailed else 'ne',
)
writer.writerow(dat)
LOGGER.info('User "{}" ({:d}) exported proceedings'.format(uname, uid), request)
return response
def hostgroup_append(request):
group_name = request.POST.get('groupname', None)
group_desc = request.POST.get('groupdesc', None)
group_id = request.POST.get('groupid', None)
if group_id and group_name:
try:
cmdb_models.HostGroup.objects.filter(id=group_id).update(
host_group_name=group_name,
host_group_jd=group_desc)
result_dict = {'flag': 1, 'msg': 'GroupName: %s update successful' % group_name}
except Exception:
result_dict = {'flag': 0, 'msg': 'GroupName: %s already exist' % group_name}
elif group_name:
result = cmdb_models.HostGroup.objects.filter(host_group_name=group_name)
if result:
result_dict = {'flag': 0, 'msg': 'GroupName: %s already exist' % group_name}
else:
cmdb_models.HostGroup.objects.create(
host_group_name=group_name,
host_group_jd=group_desc
)
result_dict = {'flag': 1, 'msg': 'GroupName: %s append successful' % group_name}
else:
result_dict = {'flag': 0, 'msg': 'GroupName: is None'}
return HttpResponse(json.dumps(result_dict))
def group_del(request):
hid_list = request.POST.get('host_list', None)
gid_list = request.POST.get('group_list', None)
group_id_list = []
host_id_list = []
if len(hid_list):
for item in hid_list.split(','):
host_id_list.append(int(item))
if len(gid_list):
for item in gid_list.split(','):
group_id_list.append(int(item))
data = {'msg': '', 'flag': 1}
try:
if len(host_id_list):
r = cmdb_models.HostInfo.objects.filter(id__in=host_id_list).delete()
data['msg'] = r
if len(group_id_list):
r = cmdb_models.HostGroup.objects.filter(id__in=group_id_list).delete()
data['msg'] = r
except Exception as e:
data['msg'] = e
data['flag'] = 0
return HttpResponse(json.dumps(data))
def post(self, request, pk):
with transaction.atomic():
user = User.objects.select_for_update().get(pk=pk)
if self.request.user.privilege == Privilege.ROOT:
if user.privilege == 'user':
user.privilege = 'volunteer'
elif user.privilege == 'volunteer':
user.privilege = 'admin'
elif user.privilege == 'admin':
user.privilege = 'user'
elif self.request.user.privilege == Privilege.ADMIN:
if user.privilege == 'user':
user.privilege = 'volunteer'
elif user.privilege == 'volunteer':
user.privilege = 'user'
user.save()
return HttpResponse(json.dumps({'result': 'success'}))
def get(self, request, *args, **kwargs):
if 'data' in request.GET:
problems = self.contest.contestproblem_set.select_related('problem').all()
data = []
SUB_FIELDS = ["title", "id", "alias"]
STATISTIC_FIELDS = [
('ac1', get_problem_accept_count),
('ac2', get_problem_accept_user_count),
('tot1', get_problem_all_count),
('tot2', get_problem_all_user_count),
('ratio1', get_problem_accept_ratio),
('ratio2', get_problem_accept_user_ratio),
]
for problem in problems:
d = {k: getattr(problem.problem, k) for k in SUB_FIELDS}
d.update(pid=problem.id, identifier=problem.identifier, weight=problem.weight)
d.update({k: v(problem.problem_id, self.contest.id) for k, v in STATISTIC_FIELDS})
data.append(d)
data.sort(key=lambda x: x['identifier'])
return HttpResponse(json.dumps(data))
return super(ContestProblemManage, self).get(request, *args, **kwargs)
def search_images(request):
term = request.POST['term']
images = requests.get('http://localhost:' + DOCKER_API_PORT + '/images/search?term=' + term)
return HttpResponse(json.dumps(images.json()),
content_type='application/json')
def test_firebase(request):
firebase_db = FirebaseApplication('https://vkusotiiki-bg.firebaseio.com/', None) # no authentication
users = firebase_db.get('/User', None)
print(users)
return HttpResponse('Heeey', {})
def testcheck(request):
'''test url '''
# auto_register(10)
# do some test page
# check_user_state()
# clean_traffic_log()
return HttpResponse('ok')
def test(request):
'''??api'''
data = {
'user': [1, 2, 3, 4]
}
result = json.dumps(data, ensure_ascii=False)
return HttpResponse(result, content_type='application/json')