def get(self, request, *args, **kwargs):
queryset = self.get_queryset()
field_names = self.get_fields(queryset)
response = HttpResponse(content_type='text/csv')
filename = self.get_filename(queryset)
response['Content-Disposition'] = 'attachment; filename="{}.csv"'.format(filename)
writer = csv.writer(response, **self.get_csv_writer_fmtparams())
if self.specify_separator:
response.write('sep={}{}'.format(writer.dialect.delimiter, writer.dialect.lineterminator))
if self.header:
writer.writerow([self.get_header_name(queryset.model, field_name) for field_name in list(field_names)])
for obj in queryset:
writer.writerow([self.get_field_value(obj, field) for field in field_names])
return response
python类HttpResponse()的实例源码
def _validation_error_handler(e):
"""
Handle validation errors.
This can be overridden via the settings.
"""
if isinstance(e, ValidationError):
message = "Validation failed. {}".format(e.message)
error_response = {
"message": message,
"code": e.validator
}
logger.info(message)
error_resp = HttpResponse(error_response, status=422)
else:
raise FatalException("Malformed JSON in the request.", 400)
return error_resp
def post(self, request, *args, **kwargs):
_page = get_object_or_404(Page, pk=request.POST.get('page_id'))
def update_indicator():
indicator = PageIndicator.objects.get(editor=request.user, edited_on__gte=self.time_past, page=_page)
indicator.edited_on = self.now
indicator.save()
# To avoid uncontrolled creation of PageIndicators delete any for this user/page combination which
# is too old.
PageIndicator.objects.filter(editor=request.user, edited_on__lte=self.time_past, page=_page).delete()
try:
update_indicator()
except PageIndicator.DoesNotExist:
PageIndicator.objects.create(editor=request.user, edited_on=self.now, started_editing=self.now, page=_page)
return HttpResponse(status=200)
def log_tests(request):
es = get_elastic()
if not es:
return HttpResponse('not able to connect to elasticsearch')
res = es.search(index="message-log", doc_type='message', body={
"size": 0,
"aggs" : {
"test_ids" : {
"terms" : { "field" : "test_id", "size" : 500 }
}
}})
test_ids = []
for bucket in res['aggregations']['test_ids']['buckets']:
test_id = bucket['key']
test_ids.append({'id':'test_id_'+test_id, 'name':test_id})
context = {
'groups' : test_ids
}
template = loader.get_template('golem/log.html')
return HttpResponse(template.render(context,request))
def remove_home_recommend_handler(request):
result = base_result()
if request.method == "GET":
result["code"] = 500
result["message"] = "get method is not supported!"
return result
article_id = request.POST.get("article_id")
try:
recommend = HomeRecommend.objects.get(share_id=article_id)
recommend.status = 0
recommend.save()
except HomeRecommend.DoesNotExist:
result["code"] = 500
result["message"] = "article is not exists!"
return HttpResponse(json.dumps(result))
def handle(self, *args, **options):
request = RequestFactory().get("/")
response = HttpResponse()
user = User()
n = 0
print(TEMPLATE_A)
for pattern, age, cache_type, dc in rules:
policy = POLICIES[cache_type]
policy(request, response, user, age)
if n == 0:
print("if", end="")
else:
print("else if", end="")
print(""" (req.url ~ "%s") {""" % pattern.pattern)
print("""set req.http.Hash-Cookies = "%s";""" % response["X-Hash-Cookies"])
print("}")
n += 1
print(TEMPLATE_B)
def csv_download(request):
""" Creates a CSV file using all of the applications to the users
organization.
"""
apps = get_all_applications_for_users_org(request.user)
data = ApplicationCSVDownloadSerializer(apps, many=True).data
fields = []
for datum in data:
these_fields = list(datum.keys())
# Finds the largest set of fields and uses it
# There should not be a case where a smaller set of fields would have
# a field not in a larger one.
if len(these_fields) > len(fields):
fields = these_fields
response = HttpResponse(content_type='text/csv')
csv_writer = csv.DictWriter(response, fieldnames=fields)
csv_writer.writeheader()
csv_writer.writerows(data)
file = 'all_applications_to_%s_%s.csv' % (
request.user.profile.organization.slug,
timezone.now().strftime('%m-%d-%Y'),
)
response['Content-Disposition'] = 'attachment; filename="%s"' % file
return response
def get(self, request, *args, **kwargs):
code = request.GET.get('code', '')
if not code:
raise Http404
config = SocialConfig.get_solo()
redirect_uri = self.request.build_absolute_uri(resolve_url('admin_social_networks:instagram_token'))
response = requests.post(
'https://api.instagram.com/oauth/access_token',
data={
'grant_type': 'authorization_code',
'client_id': config.instagram_client_id,
'client_secret': config.instagram_client_secret,
'redirect_uri': redirect_uri,
'code': code,
}
)
answer = response.json()
if answer and 'access_token' in answer:
SocialConfig.objects.update(instagram_access_token=answer['access_token'])
add_message(request, SUCCESS, _('Instagram access_token updated successfully!'))
return redirect('admin:social_networks_socialconfig_change')
else:
return HttpResponse(response.text)
def fcm_register(request):
args = json.loads(request.body)
slack_token = args.get('slack_token')
reg_id = args.get('reg_id')
url = 'https://slack.com/api/auth.test'
r = requests.get(url, params={'token': slack_token})
slack_user_id = r.json().get('user_id')
if not slack_user_id:
logger.warning('Couldn\'t validate slack token for FCM registration')
return HttpResponse('Could not validate slack token', status=400)
FcmSub.objects.update_or_create(reg_id=reg_id, defaults={'slack_user_id': slack_user_id})
logger.warning('FCM registration complete for %s %s' % (slack_user_id, reg_id))
return HttpResponse('ok')
def slack_oauth(request):
code = request.GET['code']
params = {
'code': code,
'client_id': settings.SLACK_CLIENT_ID,
"client_secret": settings.SLACK_CLIENT_SECRET
}
url = 'https://slack.com/api/oauth.access'
json_response = requests.get(url, params)
data = json.loads(json_response.text)
Team.objects.get_or_create(
name=data['team_name'],
team_id=data['team_id'],
bot_user_id=data['bot']['bot_user_id'],
bot_access_token=data['bot']['bot_access_token']
)
return HttpResponse('Bot added to your Slack team!')
def post(self, request, *args, **kwargs):
try:
incoming_message = json.loads(self.request.body.decode('utf-8'))
for entry in incoming_message['entry']:
for message in entry['messaging']:
if 'message' in message:
text = message['message']['text']
psid = message['sender']['id']
print(text, psid)
try:
user = User.objects.get(psid=psid)
if user.valid:
if user.profile_completed:
analyseMessage.delay(psid, text)
else:#get user profile completed
completeProfile.delay(psid, text)
else:
gotInactiveUser.delay(psid)
except Exception as e:
# print('\nnew user,, yaaayyye')
newUser.delay(psid)
except Exception as e:
# print(incoming_message)
print("Exception: ", e)
return HttpResponse()
def get(self, request):
"""
Get configuration from settings, format it and return
"""
# get settings and transform it to json
config = json.dumps({
'VERSION': pkg_resources.get_distribution("Vaultier").version,
'raven_key': settings.VAULTIER.get('raven_key'),
'invitation_lifetime': settings.VAULTIER.get(
'invitation_lifetime'),
'registration_allow': settings.VAULTIER.get('registration_allow'),
'registration_enforce': not bool(User.objects.all().count()),
# dev
'dev_shared_key': settings.VAULTIER.get('dev_shared_key'),
'dev_shared_key_private': settings.VAULTIER.get('dev_shared_key_private'),
'dev_shared_key_public': settings.VAULTIER.get('dev_shared_key_public'),
'dev_show_token': settings.VAULTIER.get('dev_show_token'),
'dev_email': settings.VAULTIER.get('dev_email')
})
return HttpResponse(config, content_type='application/json')
def get(self, request, name):
db_file = get_object_or_404(DBFile.objects.defer('content'), name=name)
mtime = time.mktime(db_file.updated_on.timetuple())
modified = was_modified_since(
header=self.request.META.get('HTTP_IF_MODIFIED_SINCE'),
mtime=mtime,
size=db_file.size)
if not modified:
return HttpResponseNotModified()
content_type, encoding = mimetypes.guess_type(db_file.name)
content_type = content_type or 'application/octet-stream'
response = HttpResponse(db_file.content, content_type=content_type)
response['Last-Modified'] = http_date(mtime)
response['Content-Length'] = db_file.size
if encoding: response['Content-Encoding'] = encoding
return response
def uploadfile(file, filename, upload_dir):
if not os.path.exists(upload_dir):
try :
os.mkdir(upload_dir)
except :
os.makedirs(upload_dir)
'''
if self.file.size > 5 * 1024 * 1024 :
return HttpResponse('?????????5M')
'''
filename = upload_dir + filename
with open(filename, 'wb+') as dest:
for chunk in file.chunks():
dest.write(chunk)
return filename
def run(self, query, query_args):
"""Genera una respuesta CSV, con columnas
(indice tiempo, serie1, serie2, ...) y un dato por fila
"""
# Saco metadatos, no se usan para el formato CSV
query.set_metadata_config(constants.METADATA_NONE)
header = query_args.get(constants.PARAM_HEADER,
constants.API_DEFAULT_VALUES[constants.PARAM_HEADER])
series_ids = query.get_series_ids(how=header)
data = query.run()['data']
response = HttpResponse(content_type='text/csv')
content = 'attachment; filename="{}"'
response['Content-Disposition'] = content.format(constants.CSV_RESPONSE_FILENAME)
writer = unicodecsv.writer(response)
header = [settings.INDEX_COLUMN] + series_ids
writer.writerow(header)
for row in data:
writer.writerow(row)
return response
def post(self, request, *args, **kwargs):
# Converts the text payload into a python dictionary
incoming_message = json.loads(self.request.body.decode('utf-8'))
# Facebook recommends going through every entry since they might send
# multiple messages in a single call during high load
for entry in incoming_message['entry']:
for message in entry['messaging']:
# Check to make sure the received call is a message call
# This might be delivery, optin, postback for other events
if 'message' in message:
# Print the message to the terminal
pprint(message)
# Assuming the sender only sends text. Non-text messages like stickers, audio, pictures
# are sent as attachments and must be handled accordingly.
post_facebook_message(message['sender']['id'], message['message']['text'])
return HttpResponse()
def slack_oauth(request):
code = request.GET['code']
params = {
'code': code,
'client_id': settings.SLACK_CLIENT_ID,
"client_secret": settings.SLACK_CLIENT_SECRET
}
url = 'https://slack.com/api/oauth.access'
json_response = requests.get(url, params)
data = json.loads(json_response.text)
Team.objects.get_or_create(
name=data['team_name'],
team_id=data['team_id'],
bot_user_id=data['bot']['bot_user_id'],
bot_access_token=data['bot']['bot_access_token']
)
return HttpResponse('Bot added to your Slack team!')
def add_question(request):
if request.method == "POST":
question = request.POST.get("question")
option1 = request.POST.get("option1")
option2 = request.POST.get("option2")
option3 = request.POST.get("option3")
option4 = request.POST.get("option4")
answer = request.POST.get("answer")
exam = request.POST.get("exam")
q = Question()
q.question = question
q.option1 = option1
q.option2 = option2
q.option3 = option3
q.option4 = option4
q.answer = answer
q.exam = Exam.objects.get(pk=int(exam))
q.save()
return HttpResponse("success")
#viewsets for rest_framework
def get_ss_qr(request):
if request.user.is_anonymous():
return JsonResponse({'error': 'unauthorized'})
if not hasattr(request.user, 'ss_user'):
return JsonResponse({'error': 'no linked shadowsocks account'})
ss_user = request.user.ss_user
if request.GET.get('nid'):
try:
node = Node.objects.get(pk=request.GET.get('nid'))
except Node.DoesNotExist:
return JsonResponse({'error': 'node not exist'})
else:
node = Node.objects.all().order_by('-weight')
if node:
node = node[0]
else:
return JsonResponse({'error': 'no node at all'})
password = '{}:{}@{}:{}'.format(node.method, ss_user.password, node.server, ss_user.port)
img = qrcode.make('ss://{}'.format(base64.b64encode(bytes(password, 'utf8')).decode('ascii')))
response = HttpResponse(content_type="image/png")
img.save(response)
return response
def device(request):
c = statsd.StatsClient('localhost', 8125)
c.incr('subscribe.success')
c.gauge('total.subscribe.success', 1, delta=True)
device_id = request.body.decode("utf-8")
if check_uuid.match(device_id) is None:
return HttpResponse("", status=400)
user_device, created = UserDevice.objects.get_or_create(device_id=device_id)
if not created:
if not user_device.verified:
send_verify_notification.delay(device_id, 0)
return HttpResponse(json.dumps({'verified': user_device.verified}), status=200, content_type="application/json")
user_device.verified = False
user_device.verification_key = uuid4().hex
user_device.save()
# TODO send verification key via PUSH
send_verify_notification.delay(device_id, 0)
return HttpResponse(json.dumps({}), status=201, content_type="application/json")
def verify(request, key=None):
c = statsd.StatsClient('localhost', 8125)
if not key:
c.incr('verify.device.failed')
c.gauge('total.verify.device.failed', 1, delta=True)
return HttpResponse("", status=400)
try:
user_device = UserDevice.objects.get(verification_key=key)
user_device.verified = True
user_device.save()
except UserEmail.DoesNotExist:
c.incr('verify.device.failed')
c.gauge('total.verify.device.failed', 1, delta=True)
return HttpResponse("", status=400)
c.incr('verify.device.success')
c.gauge('total.verify.device.success', 1, delta=True)
return HttpResponse("")
def post(self, request, *args, **kwargs):
content = request.POST.get('content')
if content:
response = md2html(content)
else:
response = b""
return HttpResponse(response)
def greet(request, greeting, greetee):
raise ExceptionalResponse(HttpResponse('oh no', status=400))
def twitter_tpf_server(request):
tpq = TriplePatternQuery(request.GET.get('page', '1'),
request.GET.get('subject'),
request.GET.get('predicate'),
request.GET.get('object'))
fragment = Fragment()
Odmtp(TrimmerXr2rmlTwitter(), Tp2QueryTwitter(), MapperTwitterXr2rml()).match(tpq, fragment, request)
response = HttpResponse(
fragment.serialize(),
content_type='application/trig; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename="fragment.trig"'
response['Access-Control-Allow-Origin'] = '*'
return response
def github_tpf_server(request):
tpq = TriplePatternQuery(request.GET.get('page', '1'),
request.GET.get('subject'),
request.GET.get('predicate'),
request.GET.get('object'))
fragment = Fragment()
Odmtp(TrimmerXr2rmlGithub(), Tp2QueryGithub(), MapperGithubXr2rml()).match(tpq, fragment, request)
response = HttpResponse(
fragment.serialize(),
content_type='application/trig; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename="fragment.trig"'
response['Access-Control-Allow-Origin'] = '*'
return response
def custom_validation_response(e):
"""
Custom validation handler to override the default
and return a HttpResponse I'm a teapot code.
"""
return HttpResponse(status=418)
def ExampleAPI(request, schema, example):
response = _example_api(request, schema, example)
if isinstance(response, HttpResponse):
return response
else:
return HttpResponse(response)
def ValidatedGETAPI(request, expected_params, target):
"""
Validate GET APIs.
"""
if _is_valid_query(request.GET, expected_params):
response = target(request)
if isinstance(response, HttpResponse):
return response
else:
return HttpResponse(json.dumps(response))
def ValidatedPOSTAPI(request, schema, expected_params, target):
"""
Validate POST APIs.
"""
error_response = None
if expected_params:
_is_valid_query(request.GET, expected_params) # Either passes through or raises an exception.
if schema:
# If there is a problem with the json data, return a 400.
try:
data = json.loads(request.body.decode("utf-8"))
validate(data, schema)
except Exception as e:
# Check the value is in settings, and that it is not None
if hasattr(settings, 'RAMLWRAP_VALIDATION_ERROR_HANDLER') and settings.RAMLWRAP_VALIDATION_ERROR_HANDLER:
error_response = _call_custom_handler(e)
else:
error_response = _validation_error_handler(e)
else:
data = json.loads(request.body.decode('utf-8'))
if error_response:
response = error_response
else:
request.validated_data = data
response = target(request)
if isinstance(response, HttpResponse):
return response
else:
return HttpResponse(json.dumps(response))
def test_redirect(browser, logged_in_staff, mocker, settings):
"""Test the redirect behavior. If the dashboard API returns a 401 it should handle it properly."""
# Set ROOT_URLCONF to this modules path. This will cause the app to use the 'urlpatterns' value defined above
# at the module level.
settings.ROOT_URLCONF = __name__
dashboard_patch = mocker.patch('dashboard.views.UserDashboard.get', return_value=HttpResponse(
status=401,
content=json.dumps({"error": "message"}).encode()
))
browser.get("/dashboard", ignore_errors=True)
assert FAKE_RESPONSE in browser.driver.find_element_by_css_selector("body").text
assert dashboard_patch.called