python类HttpResponse()的实例源码

views.py 文件源码 项目:django-csv-export-view 作者: benkonrath 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self, request, *args, **kwargs):
        queryset = self.get_queryset()

        field_names = self.get_fields(queryset)

        response = HttpResponse(content_type='text/csv')

        filename = self.get_filename(queryset)
        response['Content-Disposition'] = 'attachment; filename="{}.csv"'.format(filename)

        writer = csv.writer(response, **self.get_csv_writer_fmtparams())

        if self.specify_separator:
            response.write('sep={}{}'.format(writer.dialect.delimiter, writer.dialect.lineterminator))

        if self.header:
            writer.writerow([self.get_header_name(queryset.model, field_name) for field_name in list(field_names)])

        for obj in queryset:
            writer.writerow([self.get_field_value(obj, field) for field in field_names])

        return response
validation.py 文件源码 项目:ramlwrap 作者: jmons 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _validation_error_handler(e):
    """
    Handle validation errors.
    This can be overridden via the settings.
    """

    if isinstance(e, ValidationError):
        message = "Validation failed. {}".format(e.message)
        error_response = {
            "message": message,
            "code": e.validator
        }
        logger.info(message)
        error_resp = HttpResponse(error_response, status=422)
    else:
        raise FatalException("Malformed JSON in the request.", 400)

    return error_resp
views.py 文件源码 项目:djangocms-concurrent-users 作者: Blueshoe 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def post(self, request, *args, **kwargs):
        _page = get_object_or_404(Page, pk=request.POST.get('page_id'))

        def update_indicator():
            indicator = PageIndicator.objects.get(editor=request.user, edited_on__gte=self.time_past, page=_page)
            indicator.edited_on = self.now
            indicator.save()

        # To avoid uncontrolled creation of PageIndicators delete any for this user/page combination which
        # is too old.
        PageIndicator.objects.filter(editor=request.user, edited_on__lte=self.time_past, page=_page).delete()

        try:
            update_indicator()
        except PageIndicator.DoesNotExist:
            PageIndicator.objects.create(editor=request.user, edited_on=self.now, started_editing=self.now, page=_page)

        return HttpResponse(status=200)
views.py 文件源码 项目:golem 作者: prihoda 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def log_tests(request):

    es = get_elastic()
    if not es:
        return HttpResponse('not able to connect to elasticsearch')

    res = es.search(index="message-log", doc_type='message', body={
    "size": 0,
    "aggs" : {
        "test_ids" : {
            "terms" : { "field" : "test_id",  "size" : 500 }
        }
    }})
    test_ids = []
    for bucket in res['aggregations']['test_ids']['buckets']:
        test_id = bucket['key']
        test_ids.append({'id':'test_id_'+test_id, 'name':test_id})


    context = {
        'groups' : test_ids
    }
    template = loader.get_template('golem/log.html')
    return HttpResponse(template.render(context,request))
recommend.py 文件源码 项目:pony 作者: Eastwu5788 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def remove_home_recommend_handler(request):
    result = base_result()
    if request.method == "GET":
        result["code"] = 500
        result["message"] = "get method is not supported!"
        return result

    article_id = request.POST.get("article_id")

    try:
        recommend = HomeRecommend.objects.get(share_id=article_id)
        recommend.status = 0
        recommend.save()
    except HomeRecommend.DoesNotExist:
        result["code"] = 500
        result["message"] = "article is not exists!"

    return HttpResponse(json.dumps(result))
generate_vcl.py 文件源码 项目:django-cache-headers 作者: praekelt 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        request = RequestFactory().get("/")
        response = HttpResponse()
        user = User()
        n = 0
        print(TEMPLATE_A)
        for pattern, age, cache_type, dc in rules:
            policy = POLICIES[cache_type]
            policy(request, response, user, age)
            if n == 0:
                print("if", end="")
            else:
                print("else if", end="")
            print(""" (req.url ~ "%s") {""" % pattern.pattern)
            print("""set req.http.Hash-Cookies = "%s";""" % response["X-Hash-Cookies"])
            print("}")
            n += 1
        print(TEMPLATE_B)
data_export_views.py 文件源码 项目:intake 作者: codeforamerica 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def csv_download(request):
    """ Creates a CSV file using all of the applications to the users
    organization.
    """
    apps = get_all_applications_for_users_org(request.user)
    data = ApplicationCSVDownloadSerializer(apps, many=True).data
    fields = []
    for datum in data:
        these_fields = list(datum.keys())
        # Finds the largest set of fields and uses it
        # There should not be a case where a smaller set of fields would have
        # a field not in a larger one.
        if len(these_fields) > len(fields):
            fields = these_fields
    response = HttpResponse(content_type='text/csv')
    csv_writer = csv.DictWriter(response, fieldnames=fields)
    csv_writer.writeheader()
    csv_writer.writerows(data)
    file = 'all_applications_to_%s_%s.csv' % (
        request.user.profile.organization.slug,
        timezone.now().strftime('%m-%d-%Y'),
    )
    response['Content-Disposition'] = 'attachment; filename="%s"' % file
    return response
admin_views.py 文件源码 项目:tumanov_castleoaks 作者: Roamdev 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get(self, request, *args, **kwargs):
        code = request.GET.get('code', '')
        if not code:
            raise Http404

        config = SocialConfig.get_solo()
        redirect_uri = self.request.build_absolute_uri(resolve_url('admin_social_networks:instagram_token'))
        response = requests.post(
            'https://api.instagram.com/oauth/access_token',
            data={
                'grant_type': 'authorization_code',
                'client_id': config.instagram_client_id,
                'client_secret': config.instagram_client_secret,
                'redirect_uri': redirect_uri,
                'code': code,
            }
        )

        answer = response.json()
        if answer and 'access_token' in answer:
            SocialConfig.objects.update(instagram_access_token=answer['access_token'])
            add_message(request, SUCCESS, _('Instagram access_token updated successfully!'))
            return redirect('admin:social_networks_socialconfig_change')
        else:
            return HttpResponse(response.text)
android_app.py 文件源码 项目:heltour 作者: cyanfish 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fcm_register(request):
    args = json.loads(request.body)
    slack_token = args.get('slack_token')
    reg_id = args.get('reg_id')

    url = 'https://slack.com/api/auth.test'
    r = requests.get(url, params={'token': slack_token})
    slack_user_id = r.json().get('user_id')
    if not slack_user_id:
        logger.warning('Couldn\'t validate slack token for FCM registration')
        return HttpResponse('Could not validate slack token', status=400)

    FcmSub.objects.update_or_create(reg_id=reg_id, defaults={'slack_user_id': slack_user_id})
    logger.warning('FCM registration complete for %s %s' % (slack_user_id, reg_id))

    return HttpResponse('ok')
views.py 文件源码 项目:WineHelper 作者: scorelabio 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def slack_oauth(request):
    code = request.GET['code']

    params = {
        'code': code,
        'client_id': settings.SLACK_CLIENT_ID,
        "client_secret": settings.SLACK_CLIENT_SECRET
    }
    url = 'https://slack.com/api/oauth.access'
    json_response = requests.get(url, params)
    data = json.loads(json_response.text)
    Team.objects.get_or_create(
        name=data['team_name'],
        team_id=data['team_id'],
        bot_user_id=data['bot']['bot_user_id'],
        bot_access_token=data['bot']['bot_access_token']
    )
    return HttpResponse('Bot added to your Slack team!')
views.py 文件源码 项目:tpobot 作者: rishabhiitbhu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post(self, request, *args, **kwargs):
        try:
            incoming_message = json.loads(self.request.body.decode('utf-8'))
            for entry in incoming_message['entry']:
                for message in entry['messaging']:
                    if 'message' in message:
                        text =  message['message']['text']
                        psid = message['sender']['id']
                        print(text, psid)
                        try:
                            user = User.objects.get(psid=psid)
                            if user.valid:
                                if user.profile_completed:
                                    analyseMessage.delay(psid, text)
                                else:#get user profile completed
                                    completeProfile.delay(psid, text)
                            else:
                                gotInactiveUser.delay(psid)
                        except Exception as e:
                            # print('\nnew user,, yaaayyye')
                            newUser.delay(psid)
        except Exception as e:
            # print(incoming_message)
            print("Exception: ", e)
        return HttpResponse()
views.py 文件源码 项目:vaultier 作者: Movile 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(self, request):
        """
        Get configuration from settings, format it and return
        """
        # get settings and transform it to json
        config = json.dumps({
            'VERSION': pkg_resources.get_distribution("Vaultier").version,
            'raven_key': settings.VAULTIER.get('raven_key'),
            'invitation_lifetime': settings.VAULTIER.get(
                'invitation_lifetime'),
            'registration_allow': settings.VAULTIER.get('registration_allow'),
            'registration_enforce': not bool(User.objects.all().count()),
            # dev
            'dev_shared_key': settings.VAULTIER.get('dev_shared_key'),
            'dev_shared_key_private': settings.VAULTIER.get('dev_shared_key_private'),
            'dev_shared_key_public': settings.VAULTIER.get('dev_shared_key_public'),
            'dev_show_token': settings.VAULTIER.get('dev_show_token'),
            'dev_email': settings.VAULTIER.get('dev_email')
        })

        return HttpResponse(config, content_type='application/json')
views.py 文件源码 项目:django-db-storage 作者: derekkwok 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(self, request, name):
        db_file = get_object_or_404(DBFile.objects.defer('content'), name=name)

        mtime = time.mktime(db_file.updated_on.timetuple())
        modified = was_modified_since(
            header=self.request.META.get('HTTP_IF_MODIFIED_SINCE'),
            mtime=mtime,
            size=db_file.size)

        if not modified:
            return HttpResponseNotModified()

        content_type, encoding = mimetypes.guess_type(db_file.name)
        content_type = content_type or 'application/octet-stream'

        response = HttpResponse(db_file.content, content_type=content_type)
        response['Last-Modified'] = http_date(mtime)
        response['Content-Length'] = db_file.size
        if encoding: response['Content-Encoding'] = encoding
        return response
webpage.py 文件源码 项目:lykops-old 作者: lykops 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def uploadfile(file, filename, upload_dir):
    if not os.path.exists(upload_dir):
        try :
            os.mkdir(upload_dir)
        except :
            os.makedirs(upload_dir)
    '''    
    if self.file.size > 5 * 1024 * 1024 :
        return HttpResponse('?????????5M')
    '''

    filename = upload_dir + filename

    with open(filename, 'wb+') as dest:
        for chunk in file.chunks():
            dest.write(chunk)
        return filename
response.py 文件源码 项目:series-tiempo-ar-api 作者: datosgobar 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self, query, query_args):
        """Genera una respuesta CSV, con columnas
        (indice tiempo, serie1, serie2, ...) y un dato por fila
        """

        # Saco metadatos, no se usan para el formato CSV
        query.set_metadata_config(constants.METADATA_NONE)

        header = query_args.get(constants.PARAM_HEADER,
                                constants.API_DEFAULT_VALUES[constants.PARAM_HEADER])
        series_ids = query.get_series_ids(how=header)
        data = query.run()['data']

        response = HttpResponse(content_type='text/csv')
        content = 'attachment; filename="{}"'
        response['Content-Disposition'] = content.format(constants.CSV_RESPONSE_FILENAME)

        writer = unicodecsv.writer(response)
        header = [settings.INDEX_COLUMN] + series_ids
        writer.writerow(header)
        for row in data:
            writer.writerow(row)

        return response
views.py 文件源码 项目:django-facebook-messenger-bot-tutorial 作者: abhay1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post(self, request, *args, **kwargs):
        # Converts the text payload into a python dictionary
        incoming_message = json.loads(self.request.body.decode('utf-8'))
        # Facebook recommends going through every entry since they might send
        # multiple messages in a single call during high load
        for entry in incoming_message['entry']:
            for message in entry['messaging']:
                # Check to make sure the received call is a message call
                # This might be delivery, optin, postback for other events 
                if 'message' in message:
                    # Print the message to the terminal
                    pprint(message)    
                    # Assuming the sender only sends text. Non-text messages like stickers, audio, pictures
                    # are sent as attachments and must be handled accordingly. 
                    post_facebook_message(message['sender']['id'], message['message']['text'])    
        return HttpResponse()
views.py 文件源码 项目:dev-wine-helper 作者: WineHelperEnseirb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def slack_oauth(request):
    code = request.GET['code']

    params = {
        'code': code,
        'client_id': settings.SLACK_CLIENT_ID,
        "client_secret": settings.SLACK_CLIENT_SECRET
    }
    url = 'https://slack.com/api/oauth.access'
    json_response = requests.get(url, params)
    data = json.loads(json_response.text)
    Team.objects.get_or_create(
        name=data['team_name'],
        team_id=data['team_id'],
        bot_user_id=data['bot']['bot_user_id'],
        bot_access_token=data['bot']['bot_access_token']
    )
    return HttpResponse('Bot added to your Slack team!')
views.py 文件源码 项目:quizwizard 作者: naveensaikiran 项目源码 文件源码 阅读 71 收藏 0 点赞 0 评论 0
def add_question(request):
    if request.method == "POST":
        question  = request.POST.get("question")
        option1 = request.POST.get("option1")
        option2 = request.POST.get("option2")
        option3 = request.POST.get("option3")
        option4 = request.POST.get("option4")
        answer = request.POST.get("answer")
        exam = request.POST.get("exam")
        q = Question()
        q.question = question
        q.option1 = option1
        q.option2 = option2
        q.option3 = option3
        q.option4 = option4
        q.answer = answer
        q.exam = Exam.objects.get(pk=int(exam))
        q.save()
        return HttpResponse("success")

#viewsets for rest_framework
views.py 文件源码 项目:retoohs 作者: youyaochi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_ss_qr(request):
    if request.user.is_anonymous():
        return JsonResponse({'error': 'unauthorized'})

    if not hasattr(request.user, 'ss_user'):
        return JsonResponse({'error': 'no linked shadowsocks account'})
    ss_user = request.user.ss_user

    if request.GET.get('nid'):
        try:
            node = Node.objects.get(pk=request.GET.get('nid'))
        except Node.DoesNotExist:
            return JsonResponse({'error': 'node not exist'})
    else:
        node = Node.objects.all().order_by('-weight')
        if node:
            node = node[0]
        else:
            return JsonResponse({'error': 'no node at all'})

    password = '{}:{}@{}:{}'.format(node.method, ss_user.password, node.server, ss_user.port)
    img = qrcode.make('ss://{}'.format(base64.b64encode(bytes(password, 'utf8')).decode('ascii')))
    response = HttpResponse(content_type="image/png")
    img.save(response)
    return response
views.py 文件源码 项目:TheaterWecker 作者: CodeforChemnitz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def device(request):
    c = statsd.StatsClient('localhost', 8125)
    c.incr('subscribe.success')
    c.gauge('total.subscribe.success', 1, delta=True)
    device_id = request.body.decode("utf-8")

    if check_uuid.match(device_id) is None:
        return HttpResponse("", status=400)

    user_device, created = UserDevice.objects.get_or_create(device_id=device_id)
    if not created:
        if not user_device.verified:
            send_verify_notification.delay(device_id, 0)
        return HttpResponse(json.dumps({'verified': user_device.verified}), status=200, content_type="application/json")

    user_device.verified = False
    user_device.verification_key = uuid4().hex
    user_device.save()

    # TODO send verification key via PUSH
    send_verify_notification.delay(device_id, 0)

    return HttpResponse(json.dumps({}), status=201, content_type="application/json")
views.py 文件源码 项目:TheaterWecker 作者: CodeforChemnitz 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def verify(request, key=None):
    c = statsd.StatsClient('localhost', 8125)
    if not key:
        c.incr('verify.device.failed')
        c.gauge('total.verify.device.failed', 1, delta=True)
        return HttpResponse("", status=400)

    try:
        user_device = UserDevice.objects.get(verification_key=key)
        user_device.verified = True
        user_device.save()
    except UserEmail.DoesNotExist:
        c.incr('verify.device.failed')
        c.gauge('total.verify.device.failed', 1, delta=True)
        return HttpResponse("", status=400)

    c.incr('verify.device.success')
    c.gauge('total.verify.device.success', 1, delta=True)
    return HttpResponse("")
views.py 文件源码 项目:blogghar 作者: v1k45 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def post(self, request, *args, **kwargs):
        content = request.POST.get('content')
        if content:
            response = md2html(content)
        else:
            response = b""
        return HttpResponse(response)
test_exceptional_response.py 文件源码 项目:lepo 作者: akx 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def greet(request, greeting, greetee):
    raise ExceptionalResponse(HttpResponse('oh no', status=400))
views.py 文件源码 项目:odmtp-tpf 作者: benjimor 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def twitter_tpf_server(request):
    tpq = TriplePatternQuery(request.GET.get('page', '1'),
                             request.GET.get('subject'),
                             request.GET.get('predicate'),
                             request.GET.get('object'))
    fragment = Fragment()
    Odmtp(TrimmerXr2rmlTwitter(), Tp2QueryTwitter(), MapperTwitterXr2rml()).match(tpq, fragment, request)
    response = HttpResponse(
        fragment.serialize(),
        content_type='application/trig; charset=utf-8')
    response['Content-Disposition'] = 'attachment; filename="fragment.trig"'
    response['Access-Control-Allow-Origin'] = '*'
    return response
views.py 文件源码 项目:odmtp-tpf 作者: benjimor 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def github_tpf_server(request):
    tpq = TriplePatternQuery(request.GET.get('page', '1'),
                             request.GET.get('subject'),
                             request.GET.get('predicate'),
                             request.GET.get('object'))
    fragment = Fragment()
    Odmtp(TrimmerXr2rmlGithub(), Tp2QueryGithub(), MapperGithubXr2rml()).match(tpq, fragment, request)
    response = HttpResponse(
        fragment.serialize(),
        content_type='application/trig; charset=utf-8')
    response['Content-Disposition'] = 'attachment; filename="fragment.trig"'
    response['Access-Control-Allow-Origin'] = '*'
    return response
validation_handler.py 文件源码 项目:ramlwrap 作者: jmons 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def custom_validation_response(e):
    """
    Custom validation handler to override the default
    and return a HttpResponse I'm a teapot code.
    """

    return HttpResponse(status=418)
validation.py 文件源码 项目:ramlwrap 作者: jmons 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def ExampleAPI(request, schema, example):

    response = _example_api(request, schema, example)
    if isinstance(response, HttpResponse):
        return response
    else:
        return HttpResponse(response)
validation.py 文件源码 项目:ramlwrap 作者: jmons 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ValidatedGETAPI(request, expected_params, target):
    """
    Validate GET APIs.
    """

    if _is_valid_query(request.GET, expected_params):
        response = target(request)

        if isinstance(response, HttpResponse):
            return response
        else:
            return HttpResponse(json.dumps(response))
validation.py 文件源码 项目:ramlwrap 作者: jmons 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ValidatedPOSTAPI(request, schema, expected_params, target):
    """
    Validate POST APIs.
    """

    error_response = None
    if expected_params:
        _is_valid_query(request.GET, expected_params)   # Either passes through or raises an exception.

    if schema:
        # If there is a problem with the json data, return a 400.
        try:
            data = json.loads(request.body.decode("utf-8"))
            validate(data, schema) 
        except Exception as e:
            # Check the value is in settings, and that it is not None
            if hasattr(settings, 'RAMLWRAP_VALIDATION_ERROR_HANDLER') and settings.RAMLWRAP_VALIDATION_ERROR_HANDLER:
                error_response = _call_custom_handler(e)

            else:
                error_response = _validation_error_handler(e)
    else:
        data = json.loads(request.body.decode('utf-8'))

    if error_response:
        response = error_response
    else:
        request.validated_data = data
        response = target(request)

    if isinstance(response, HttpResponse):
        return response
    else:
        return HttpResponse(json.dumps(response))
redirect_test.py 文件源码 项目:micromasters 作者: mitodl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_redirect(browser, logged_in_staff, mocker, settings):
    """Test the redirect behavior. If the dashboard API returns a 401 it should handle it properly."""
    # Set ROOT_URLCONF to this modules path. This will cause the app to use the 'urlpatterns' value defined above
    # at the module level.
    settings.ROOT_URLCONF = __name__
    dashboard_patch = mocker.patch('dashboard.views.UserDashboard.get', return_value=HttpResponse(
        status=401,
        content=json.dumps({"error": "message"}).encode()
    ))
    browser.get("/dashboard", ignore_errors=True)
    assert FAKE_RESPONSE in browser.driver.find_element_by_css_selector("body").text
    assert dashboard_patch.called


问题


面经


文章

微信
公众号

扫码关注公众号