python类TESTING的实例源码

cold_tests.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_kuailexue_api2(self):
        # return # ?kuailexue????????
        settings.TESTING = False
        test_stu_id = 'test_student_1'
        test_stu_name = '????1'
        test_tea_id = 'debug_210_7VQy5'
        test_teat_name = '????1'
        test_teat_pswd = '892673'
        klx_stu = klx.klx_register(klx.KLX_ROLE_STUDENT, test_stu_id, test_stu_name)
        _console.info(klx_stu)
        self.assertIsNotNone(klx_stu)
        klx_tea = klx.klx_register(klx.KLX_ROLE_TEACHER, test_tea_id, test_teat_name, test_teat_pswd)
        _console.info(klx_tea)
        self.assertIsNotNone(klx_tea)
        ok = klx.klx_relation(klx_tea, klx_stu)
        self.assertTrue(ok)
forms.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def clean_description(self):
        desc = self.cleaned_data['description']
        if settings.TESTING:
            check_spam = False
        else:
            akismet = Akismet(settings.AKISMET_KEY, blog="CC Search")
            check_spam = akismet.check(self.request.get_host(),
                                  user_agent=self.request.META.get('user-agent'),
                                  comment_author=self.request.user.username,
                                  comment_content=desc)
        wordfilter = Wordfilter()
        check_words = wordfilter.blacklisted(desc)
        if check_spam or check_words:
            raise forms.ValidationError("This description failed our spam or profanity check; the description has not been updated.")

        return desc
models.py 文件源码 项目:home-data-api 作者: data-skeptic 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_profile(sender, instance, **kwargs):
    new_account = False
    if kwargs['created']:
        profile = Profile.objects.create(user=instance)
        new_account = True
    else:
        profile = Profile.objects.filter(user=instance)
        if len(profile) == 0:
            profile = Profile.objects.create(user=instance)
            new_account = True

    if new_account:
        profile.update_code()
        link_text = settings.HOSTNAME + profile.get_confirmation_link()
        if not settings.TESTING:
            send_signup_email(instance.email, link_text)
models.py 文件源码 项目:home-data-api 作者: data-skeptic 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        """Save the model after geocoding the supplied address.

        Here the address is geocoded using the Google geocoding service.
        This is limited to 2500 requests per day. There is no current system
        to account for this so models over 2500 will not geocode.
        """
        if not settings.TESTING:
            response = requests.get(
                    'https://maps.googleapis.com/maps/api/geocode/json',
                    params={'address': self.raw_address,
                            'key': settings.G_APPS_KEY})
            if response.status_code == 200:
                address = Address()
                address.raw = response.text
                address.from_google_json(response.json())
                address.save()
                self.address_object = address
        super(Property, self).save(*args, **kwargs)
blog_signals.py 文件源码 项目:DjangoBlog 作者: liangliangyy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def article_save_callback(sender, **kwargs):
    id = kwargs['id']
    is_update_views = kwargs['is_update_views']
    type = sender.__name__
    obj = None
    from blog.models import Article, Category, Tag
    if type == 'Article':
        obj = Article.objects.get(id=id)
    elif type == 'Category':
        obj = Category.objects.get(id=id)
    elif type == 'Tag':
        obj = Tag.objects.get(id=id)
    if obj is not None:
        if not settings.TESTING and not is_update_views:
            try:
                notify_url = obj.get_full_url()
                SpiderNotify.baidu_notify([notify_url])
            except Exception as ex:
                logger.error("notify sipder", ex)
                print(ex)
project_api.py 文件源码 项目:montage 作者: storyful 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_image_url(self, image_gcs_filename):
        """
            Takes the filename of an image stored in Cloud Storage
            and calls the images API to serve it from there.

            Returns the images API serving URL

            No longer used.
        """
        if settings.TESTING or not settings.DEBUG:
            if image_gcs_filename:
                return get_gcs_image_serving_url(
                    image_gcs_filename)
        else:
            # Little local dev hack. We're using the Images API with GCS.
            # This doesn't work locally so instead we pass the image dataUri
            # and use it directly
            return image_gcs_filename
publisher.py 文件源码 项目:django_rest_example 作者: devslaw 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __basic_publish(self):
        """
        Publish message to exchange
        :return:
        """

        if not django_settings.TESTING:
            body = json.dumps(self.body)

            self.channel.basic_publish(
                exchange=self.exchange_name,
                routing_key=self.routing_key,
                body=body,
            )
            print(" [x] Sent %r" % body)
        self.connection.close()
0003_region.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def add_region(apps, schema_editor):
    if settings.TESTING:
        data_file = "regions_for_test.json"
    else:
        data_file = "regions.txt"
    regions = json.load(open(
        os.path.join(os.path.dirname(__file__), data_file)),
        object_pairs_hook=OrderedDict)
    dfs(apps, regions, 1)
views.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_one_subject_report(self, the_subject, parent, params):
        s_name = the_subject.name
        s_name_en = klx.klx_subject_name(s_name)
        url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en)
        ans_data = {'subject_id': the_subject.id}
        if settings.TESTING:
            return JsonResponse(ans_data)
        # query the last order
        last_order = models.Order.objects.filter(
                parent=parent, status=models.Order.PAID,
                subject=the_subject).order_by('-created_at').first()
        if not last_order:
            return HttpResponse(status=404)  # Have not joined the course
        ans_data['grade_id'] = last_order.grade_id
        # ???????????
        ans_data.update(self._get_total_nums(url, params))
        # ?????????????????
        ans_data.update(self._get_exercise_total_nums(url, params))

        # ???????
        ans_data['error_rates'] = self._get_error_rates(url, params)
        # ?????????
        ans_data['month_trend'] = self._get_month_trend(url, params)
        # ??????/????????
        ans_data['knowledges_accuracy'] = self._get_knowledges_accuracy(
                url, params)
        # ??????
        ans_data['abilities'] = self._get_abilities(
            url, params, klx.KLX_MATH_ABILITY_KEYS)
        # ?????(?????????????????????)
        ans_data['score_analyses'] = self._get_score_analyses(url, params)

        settings.DEBUG and logger.debug(json.dumps(ans_data))
        return JsonResponse(ans_data)
views.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_auth_redirect_url(request, state):
    if settings.TESTING:
        return reverse('wechat:phone_page') + '?state=' + str(state)
    checkPhoneURI = get_server_host(request) + reverse('wechat:check_phone')
    params_str = {
        # 'redirect_uri': checkPhoneURI,
        'response_type': "code",
        'scope': "snsapi_base",
        'state': state,
        'connect_redirect': "1"
    }
    redirect_url = wx.WX_AUTH_URL + '&' + urlencode(
        params_str) + '&redirect_uri=' + checkPhoneURI + '#wechat_redirect'
    return redirect_url
views.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_parent(self, request):
        parent = _get_parent(request)
        if parent is None and settings.TESTING:
            # the below line is only for testing
            parent = models.Parent.objects.get(pk=3)
            parent.user.backend = _get_default_bankend_path()
            login(request, parent.user)
        return parent
views.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def verify_order(self, request):
        # get request params
        prepay_id = request.POST.get('prepay_id')
        order_id = request.POST.get('order_id')
        if settings.TESTING:
            ret_code = set_order_paid(order_id=order_id)
            if ret_code == 1:
                return JsonResponse(
                    {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
            return JsonResponse({'ok': True, 'msg': '', 'code': 0})
        query_ret = wx.wx_pay_order_query(order_id=order_id)
        if query_ret['ok']:
            trade_state = query_ret['data']['trade_state']
            if trade_state == wx.WX_SUCCESS:
                # ????, ????????, ????????
                try:
                    ret_code = set_order_paid(prepay_id=prepay_id,
                                              order_id=query_ret['data'][
                                                  'out_trade_no'],
                                              open_id=query_ret['data'][
                                                  'openid'])
                    if ret_code == 1:
                        return JsonResponse(
                            {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
                except (OrderStatusIncorrect, RefundError):
                    return JsonResponse(
                        {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
                except Exception as ex:
                    logger.exception(ex)
                    return JsonResponse(
                        {'ok': False, 'msg': '????, ?????', 'code': 5})
                return JsonResponse({'ok': True, 'msg': '', 'code': 0})
            else:
                if trade_state == wx.WX_PAYERROR:
                    return {'ok': False, 'msg': '????', 'code': 2}
                else:
                    return {'ok': False, 'msg': '???', 'code': 3}
        else:
            return {'ok': False, 'msg': query_ret['msg'], 'code': 1}
views.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _get_wx_jsapi_ticket(access_token):
    if settings.TESTING:
        return "", ""
    jsapi_ticket = _get_wx_jsapi_ticket_from_db()
    msg = None
    if not jsapi_ticket:
        result = wx.wx_get_jsapi_ticket(access_token)
        if result['ok']:
            jsapi_ticket = result['ticket']
        else:
            msg = result['msg']
    return jsapi_ticket, msg
views.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_wx_token():
    if settings.TESTING:
        return "", ""
    token = _get_wx_token_from_db()
    msg = None
    if not token:
        result = wx.wx_get_token()
        if result['ok']:
            token = result['token']
        else:
            msg = result['msg']
    return token, msg
signals.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update_search_index(sender, instance, **kwargs):
    """When an Image instance is saved, tell the search engine about it."""
    if not settings.TESTING:
        _update_search_index(instance)
views.py 文件源码 项目:handelsregister 作者: Amsterdam 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def autocomplete_queries(self, query_string):
        """provide autocomplete suggestions"""

        analyzer = InputQAnalyzer(query_string)

        query_components = [
            vestiging_query(analyzer),
            mac_query(analyzer)
        ]

        result_data = []

        # Ignoring cache in case debug is on
        ignore_cache = settings.TESTING

        # create elk queries
        for q in query_components:  # type: ElasticQueryWrapper

            search = q.to_elasticsearch_object(self.client)

            # get the result from elastic
            try:
                result = search.execute(ignore_cache=ignore_cache)
            except:
                log.exception('FAILED ELK SEARCH: %s',
                              json.dumps(search.to_dict(), indent=2))
                continue
            # Get the datas!
            result_data.append(result)

        return result_data
compatibility.py 文件源码 项目:django-ios-notifications 作者: nnsnodnb 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setup_test_environment(self, **kwargs):
        super(TestRunner, self).setup_test_environment(**kwargs)
        settings.TESTING = True
compatibility.py 文件源码 项目:django-ios-notifications 作者: nnsnodnb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def teardown_test_environment(self, **kwargs):
        super(TestRunner, self).teardown_test_environment(**kwargs)
        settings.TESTING = True
robot.py 文件源码 项目:DjangoBlog 作者: liangliangyy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handler(self):
        info = self.message.content

        if self.userinfo.isAdmin and info.upper() == 'EXIT':
            self.userinfo = WxUserInfo()
            self.savesession()
            return "????"
        if info.upper() == 'ADMIN':
            self.userinfo.isAdmin = True
            self.savesession()
            return "???????"
        if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
            passwd = settings.WXADMIN
            if settings.TESTING:
                passwd='123'
            if passwd.upper() == get_md5(get_md5(info)).upper():
                self.userinfo.isPasswordSet = True
                self.savesession()
                return "????,???????????????:??helpme????"
            else:
                if self.userinfo.Count >= 3:
                    self.userinfo = WxUserInfo()
                    self.savesession()
                    return "??????"
                self.userinfo.Count += 1
                self.savesession()
                return "???????????????:"
        if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
            if self.userinfo.Command != '' and info.upper() == 'Y':
                return cmdhandler.run(self.userinfo.Command)
            else:
                if info.upper() == 'HELPME':
                    return cmdhandler.get_help()
                self.userinfo.Command = info
                self.savesession()
                return "????: " + info + " ???"
        rsp = tuling.getdata(info)
        return rsp
views.py 文件源码 项目:DjangoBlog 作者: liangliangyy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def fileupload(request):
    if request.method == 'POST':
        response = []
        for filename in request.FILES:
            timestr = datetime.datetime.now().strftime('%Y/%m/%d')
            imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
            fname = u''.join(str(filename))

            isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0

            basepath = r'/var/www/resource/{type}/{timestr}'.format(
                type='files' if not isimage else'image', timestr=timestr)
            if settings.TESTING:
                basepath = settings.BASE_DIR + '/uploads'
            url = 'https://resource.lylinux.net/{type}/{timestr}/{filename}'.format(
                type='files' if not isimage else'image', timestr=timestr, filename=filename)
            if not os.path.exists(basepath):
                os.makedirs(basepath)
            savepath = os.path.join(basepath, filename)
            with open(savepath, 'wb+') as wfile:
                for chunk in request.FILES[filename].chunks():
                    wfile.write(chunk)
            if isimage:
                from PIL import Image
                image = Image.open(savepath)
                image.save(savepath, quality=20, optimize=True)
            response.append(url)
        return HttpResponse(response)

    else:
        return HttpResponse("only for post")
create_transaction.py 文件源码 项目:occasions-server 作者: travisbloom 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def mutate_and_get_payload(cls, input, context, info):
        if not context.user.stripe_user_id:
            error_msg = 'User tried to create a transaction before they have a stripe account'
            logger.warning(error_msg, extra={'request': context})
            raise MutationException(error_msg)
        formatted_input = convert_input_global_ids_to_pks(input)
        serializer = CreateTransactionSerializer(data=formatted_input, context={
            'user': context.user,
            'receiving_person_id': formatted_input.get('receiving_person_id')
        })
        serializer.is_valid(raise_exception=True)

        product = Product.objects.get(pk=formatted_input.get('product_id'))
        associated_event = AssociatedEvent.objects.get(pk=formatted_input.get('associated_event_id'))
        transaction = Transaction(
            **formatted_input,
            associated_event_date=associated_event.event.next_date,
            cost_usd=product.cost_usd,
            user=context.user)
        transaction.save()
        if settings.TESTING:
            mock_create_stripe_charge(
                user=context.user,
                transaction=transaction,
                request=context
            )
        else:
            create_stripe_charge(
                user=context.user,
                transaction=transaction,
                request=context
            )
        return CreateTransaction(transaction=transaction)
config.py 文件源码 项目:django-query-logger 作者: 4word 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        self.connection_name = kwargs.get('connection_name',
                                          getattr(settings, 'LOG_QUERY_DATABASE_CONNECTION', 'default'))
        self.log_duplicate_queries = kwargs.get('log_duplicate_queries',
                                                getattr(settings, 'LOG_QUERY_DUPLICATE_QUERIES', True))
        self.log_tracebacks = kwargs.get('log_tracebacks',
                                         getattr(settings, 'LOG_QUERY_TRACEBACKS', False))
        self.log_long_running_time = kwargs.get('log_long_running_time',
                                                getattr(settings, 'LOG_QUERY_TIME_ABSOLUTE_LIMIT', 1000))

        self.logging_extras = kwargs.get('logging_extra_dict', {})

        # This is for internal testing only. If you add unit tests yourself for the query debbuging mixin, then you can
        # define a settings.TESTING variable that is True when unit tests are running.
        self.testing = getattr(settings, 'TESTING', False)
views.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def get_subjects_summary(self, parent, params):
        subjects_list = []
        if settings.TESTING:
            return JsonResponse({'results': subjects_list})
        purchased_subjects = []
        # get subject from order
        ordered_subjects = models.Order.objects.filter(
                parent=parent, status=models.Order.PAID).values(
                        'subject', 'grade', 'created_at').order_by(
                                '-created_at')
        for tmp_order in ordered_subjects:
            # only support math presently
            tmp_subject = models.Subject.objects.get(id=tmp_order['subject'])
            s_name = tmp_subject.name
            if s_name in purchased_subjects:
                continue
            purchased_subjects.append(s_name)
            if s_name in klx.KLX_REPORT_SUBJECTS:
                s_name_en = klx.klx_subject_name(s_name)
                url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en)
                subject_data = {
                        'subject_id': tmp_subject.id, 'supported': True,
                        'purchased': True, 'grade_id': tmp_order['grade']}
                # ???????????
                subject_data.update(self._get_total_nums(url, params))
                subjects_list.append(subject_data)
            else:
                subjects_list.append({
                    'subject_id': tmp_subject.id,
                    'purchased': True,
                    'grade_id': tmp_order['grade'],
                    'supported': False,
                })
        # subjects supported, but user did not purchase
        should_buy_subjects = [b for b in klx.KLX_REPORT_SUBJECTS
                               if b not in purchased_subjects]
        if should_buy_subjects:
            to_buy_subjects = models.Subject.objects.filter(
                    name__in=should_buy_subjects)
            for s in to_buy_subjects:
                subjects_list.append({
                    'subject_id': s.id,
                    'supported': True,
                    'purchased': False,
                })
        settings.DEBUG and logger.debug(json.dumps(subjects_list))
        return JsonResponse({'results': subjects_list})
klx_api.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def klx_register(role, uid, name, password=None, subject=None):
    '''

    :param role: 1???? 2???
    :param uid: models.User.id
    :param name: Student Name or Teacher Name
    :param password: ??123456
    :param subject: ?role??????????
    :return: kuailexue username
    '''
    klx_url = settings.KUAILEXUE_SERVER + '/third-partner/register'
    # _logger.debug(klx_url)
    params = {
        'role': role,
        'uid': uid,
        'name': name,
    }
    if password is not None:
        params['password'] = password
    if subject is not None:
        params['subject'] = subject
    params = klx_build_params(params, True)
    if settings.TESTING:
        return klx_verify_sign(params) and '1' or '0'
    # _logger.debug(params)
    try:
        resp = requests.post(klx_url, data=params, timeout=10)
    except Exception as err:
        _logger.error('cannot reach kuailexue server')
        _logger.exception(err)
        return None
    if resp.status_code != 200:
        _logger.error('cannot reach kuailexue server, http_status is %s' % (resp.status_code))
        # raise KuailexueServerError('cannot reach kuailexue server, http_status is %s' % (resp.status_code))
        return None
    if _get_is_cold_testing():
        _console.warning(klx_url+'?'+urllib.parse.urlencode(params))
    ret_json = json.loads(resp.content.decode('utf-8'))
    if _get_is_cold_testing():
        _console.info(ret_json)
    if ret_json.get('data') is not None: # code == 0, ??????code != 0
        ret_data = ret_json.get('data')
        return ret_data.get('username')  # (????)??????? KUAILEXUE_PARTNER+uid+'_'+${YYYY}
    else:
        req_url = klx_url+'?'+urllib.parse.urlencode(params)
        _logger.error('kuailexue reponse data error, CODE: %s, MSG: %s. (URL=%s)' % (ret_json.get('code'), ret_json.get('message'), req_url))
        # raise KuailexueDataError('get kuailexue wrong data, CODE: %s, MSG: %s' % (ret_json.get('code'), ret_json.get('message')))
        return None


问题


面经


文章

微信
公众号

扫码关注公众号