def test_kuailexue_api2(self):
# return # ?kuailexue????????
settings.TESTING = False
test_stu_id = 'test_student_1'
test_stu_name = '????1'
test_tea_id = 'debug_210_7VQy5'
test_teat_name = '????1'
test_teat_pswd = '892673'
klx_stu = klx.klx_register(klx.KLX_ROLE_STUDENT, test_stu_id, test_stu_name)
_console.info(klx_stu)
self.assertIsNotNone(klx_stu)
klx_tea = klx.klx_register(klx.KLX_ROLE_TEACHER, test_tea_id, test_teat_name, test_teat_pswd)
_console.info(klx_tea)
self.assertIsNotNone(klx_tea)
ok = klx.klx_relation(klx_tea, klx_stu)
self.assertTrue(ok)
python类TESTING的实例源码
def clean_description(self):
desc = self.cleaned_data['description']
if settings.TESTING:
check_spam = False
else:
akismet = Akismet(settings.AKISMET_KEY, blog="CC Search")
check_spam = akismet.check(self.request.get_host(),
user_agent=self.request.META.get('user-agent'),
comment_author=self.request.user.username,
comment_content=desc)
wordfilter = Wordfilter()
check_words = wordfilter.blacklisted(desc)
if check_spam or check_words:
raise forms.ValidationError("This description failed our spam or profanity check; the description has not been updated.")
return desc
def create_profile(sender, instance, **kwargs):
new_account = False
if kwargs['created']:
profile = Profile.objects.create(user=instance)
new_account = True
else:
profile = Profile.objects.filter(user=instance)
if len(profile) == 0:
profile = Profile.objects.create(user=instance)
new_account = True
if new_account:
profile.update_code()
link_text = settings.HOSTNAME + profile.get_confirmation_link()
if not settings.TESTING:
send_signup_email(instance.email, link_text)
def save(self, *args, **kwargs):
"""Save the model after geocoding the supplied address.
Here the address is geocoded using the Google geocoding service.
This is limited to 2500 requests per day. There is no current system
to account for this so models over 2500 will not geocode.
"""
if not settings.TESTING:
response = requests.get(
'https://maps.googleapis.com/maps/api/geocode/json',
params={'address': self.raw_address,
'key': settings.G_APPS_KEY})
if response.status_code == 200:
address = Address()
address.raw = response.text
address.from_google_json(response.json())
address.save()
self.address_object = address
super(Property, self).save(*args, **kwargs)
def article_save_callback(sender, **kwargs):
id = kwargs['id']
is_update_views = kwargs['is_update_views']
type = sender.__name__
obj = None
from blog.models import Article, Category, Tag
if type == 'Article':
obj = Article.objects.get(id=id)
elif type == 'Category':
obj = Category.objects.get(id=id)
elif type == 'Tag':
obj = Tag.objects.get(id=id)
if obj is not None:
if not settings.TESTING and not is_update_views:
try:
notify_url = obj.get_full_url()
SpiderNotify.baidu_notify([notify_url])
except Exception as ex:
logger.error("notify sipder", ex)
print(ex)
def _get_image_url(self, image_gcs_filename):
"""
Takes the filename of an image stored in Cloud Storage
and calls the images API to serve it from there.
Returns the images API serving URL
No longer used.
"""
if settings.TESTING or not settings.DEBUG:
if image_gcs_filename:
return get_gcs_image_serving_url(
image_gcs_filename)
else:
# Little local dev hack. We're using the Images API with GCS.
# This doesn't work locally so instead we pass the image dataUri
# and use it directly
return image_gcs_filename
def __basic_publish(self):
"""
Publish message to exchange
:return:
"""
if not django_settings.TESTING:
body = json.dumps(self.body)
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key=self.routing_key,
body=body,
)
print(" [x] Sent %r" % body)
self.connection.close()
def add_region(apps, schema_editor):
if settings.TESTING:
data_file = "regions_for_test.json"
else:
data_file = "regions.txt"
regions = json.load(open(
os.path.join(os.path.dirname(__file__), data_file)),
object_pairs_hook=OrderedDict)
dfs(apps, regions, 1)
def get_one_subject_report(self, the_subject, parent, params):
s_name = the_subject.name
s_name_en = klx.klx_subject_name(s_name)
url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en)
ans_data = {'subject_id': the_subject.id}
if settings.TESTING:
return JsonResponse(ans_data)
# query the last order
last_order = models.Order.objects.filter(
parent=parent, status=models.Order.PAID,
subject=the_subject).order_by('-created_at').first()
if not last_order:
return HttpResponse(status=404) # Have not joined the course
ans_data['grade_id'] = last_order.grade_id
# ???????????
ans_data.update(self._get_total_nums(url, params))
# ?????????????????
ans_data.update(self._get_exercise_total_nums(url, params))
# ???????
ans_data['error_rates'] = self._get_error_rates(url, params)
# ?????????
ans_data['month_trend'] = self._get_month_trend(url, params)
# ??????/????????
ans_data['knowledges_accuracy'] = self._get_knowledges_accuracy(
url, params)
# ??????
ans_data['abilities'] = self._get_abilities(
url, params, klx.KLX_MATH_ABILITY_KEYS)
# ?????(?????????????????????)
ans_data['score_analyses'] = self._get_score_analyses(url, params)
settings.DEBUG and logger.debug(json.dumps(ans_data))
return JsonResponse(ans_data)
def _get_auth_redirect_url(request, state):
if settings.TESTING:
return reverse('wechat:phone_page') + '?state=' + str(state)
checkPhoneURI = get_server_host(request) + reverse('wechat:check_phone')
params_str = {
# 'redirect_uri': checkPhoneURI,
'response_type': "code",
'scope': "snsapi_base",
'state': state,
'connect_redirect': "1"
}
redirect_url = wx.WX_AUTH_URL + '&' + urlencode(
params_str) + '&redirect_uri=' + checkPhoneURI + '#wechat_redirect'
return redirect_url
def get_parent(self, request):
parent = _get_parent(request)
if parent is None and settings.TESTING:
# the below line is only for testing
parent = models.Parent.objects.get(pk=3)
parent.user.backend = _get_default_bankend_path()
login(request, parent.user)
return parent
def verify_order(self, request):
# get request params
prepay_id = request.POST.get('prepay_id')
order_id = request.POST.get('order_id')
if settings.TESTING:
ret_code = set_order_paid(order_id=order_id)
if ret_code == 1:
return JsonResponse(
{'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
return JsonResponse({'ok': True, 'msg': '', 'code': 0})
query_ret = wx.wx_pay_order_query(order_id=order_id)
if query_ret['ok']:
trade_state = query_ret['data']['trade_state']
if trade_state == wx.WX_SUCCESS:
# ????, ????????, ????????
try:
ret_code = set_order_paid(prepay_id=prepay_id,
order_id=query_ret['data'][
'out_trade_no'],
open_id=query_ret['data'][
'openid'])
if ret_code == 1:
return JsonResponse(
{'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
except (OrderStatusIncorrect, RefundError):
return JsonResponse(
{'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
except Exception as ex:
logger.exception(ex)
return JsonResponse(
{'ok': False, 'msg': '????, ?????', 'code': 5})
return JsonResponse({'ok': True, 'msg': '', 'code': 0})
else:
if trade_state == wx.WX_PAYERROR:
return {'ok': False, 'msg': '????', 'code': 2}
else:
return {'ok': False, 'msg': '???', 'code': 3}
else:
return {'ok': False, 'msg': query_ret['msg'], 'code': 1}
def _get_wx_jsapi_ticket(access_token):
if settings.TESTING:
return "", ""
jsapi_ticket = _get_wx_jsapi_ticket_from_db()
msg = None
if not jsapi_ticket:
result = wx.wx_get_jsapi_ticket(access_token)
if result['ok']:
jsapi_ticket = result['ticket']
else:
msg = result['msg']
return jsapi_ticket, msg
def _get_wx_token():
if settings.TESTING:
return "", ""
token = _get_wx_token_from_db()
msg = None
if not token:
result = wx.wx_get_token()
if result['ok']:
token = result['token']
else:
msg = result['msg']
return token, msg
def update_search_index(sender, instance, **kwargs):
"""When an Image instance is saved, tell the search engine about it."""
if not settings.TESTING:
_update_search_index(instance)
def autocomplete_queries(self, query_string):
"""provide autocomplete suggestions"""
analyzer = InputQAnalyzer(query_string)
query_components = [
vestiging_query(analyzer),
mac_query(analyzer)
]
result_data = []
# Ignoring cache in case debug is on
ignore_cache = settings.TESTING
# create elk queries
for q in query_components: # type: ElasticQueryWrapper
search = q.to_elasticsearch_object(self.client)
# get the result from elastic
try:
result = search.execute(ignore_cache=ignore_cache)
except:
log.exception('FAILED ELK SEARCH: %s',
json.dumps(search.to_dict(), indent=2))
continue
# Get the datas!
result_data.append(result)
return result_data
def setup_test_environment(self, **kwargs):
super(TestRunner, self).setup_test_environment(**kwargs)
settings.TESTING = True
def teardown_test_environment(self, **kwargs):
super(TestRunner, self).teardown_test_environment(**kwargs)
settings.TESTING = True
def handler(self):
info = self.message.content
if self.userinfo.isAdmin and info.upper() == 'EXIT':
self.userinfo = WxUserInfo()
self.savesession()
return "????"
if info.upper() == 'ADMIN':
self.userinfo.isAdmin = True
self.savesession()
return "???????"
if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
passwd = settings.WXADMIN
if settings.TESTING:
passwd='123'
if passwd.upper() == get_md5(get_md5(info)).upper():
self.userinfo.isPasswordSet = True
self.savesession()
return "????,???????????????:??helpme????"
else:
if self.userinfo.Count >= 3:
self.userinfo = WxUserInfo()
self.savesession()
return "??????"
self.userinfo.Count += 1
self.savesession()
return "???????????????:"
if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
if self.userinfo.Command != '' and info.upper() == 'Y':
return cmdhandler.run(self.userinfo.Command)
else:
if info.upper() == 'HELPME':
return cmdhandler.get_help()
self.userinfo.Command = info
self.savesession()
return "????: " + info + " ???"
rsp = tuling.getdata(info)
return rsp
def fileupload(request):
if request.method == 'POST':
response = []
for filename in request.FILES:
timestr = datetime.datetime.now().strftime('%Y/%m/%d')
imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
fname = u''.join(str(filename))
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0
basepath = r'/var/www/resource/{type}/{timestr}'.format(
type='files' if not isimage else'image', timestr=timestr)
if settings.TESTING:
basepath = settings.BASE_DIR + '/uploads'
url = 'https://resource.lylinux.net/{type}/{timestr}/{filename}'.format(
type='files' if not isimage else'image', timestr=timestr, filename=filename)
if not os.path.exists(basepath):
os.makedirs(basepath)
savepath = os.path.join(basepath, filename)
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES[filename].chunks():
wfile.write(chunk)
if isimage:
from PIL import Image
image = Image.open(savepath)
image.save(savepath, quality=20, optimize=True)
response.append(url)
return HttpResponse(response)
else:
return HttpResponse("only for post")
def mutate_and_get_payload(cls, input, context, info):
if not context.user.stripe_user_id:
error_msg = 'User tried to create a transaction before they have a stripe account'
logger.warning(error_msg, extra={'request': context})
raise MutationException(error_msg)
formatted_input = convert_input_global_ids_to_pks(input)
serializer = CreateTransactionSerializer(data=formatted_input, context={
'user': context.user,
'receiving_person_id': formatted_input.get('receiving_person_id')
})
serializer.is_valid(raise_exception=True)
product = Product.objects.get(pk=formatted_input.get('product_id'))
associated_event = AssociatedEvent.objects.get(pk=formatted_input.get('associated_event_id'))
transaction = Transaction(
**formatted_input,
associated_event_date=associated_event.event.next_date,
cost_usd=product.cost_usd,
user=context.user)
transaction.save()
if settings.TESTING:
mock_create_stripe_charge(
user=context.user,
transaction=transaction,
request=context
)
else:
create_stripe_charge(
user=context.user,
transaction=transaction,
request=context
)
return CreateTransaction(transaction=transaction)
def __init__(self, **kwargs):
self.connection_name = kwargs.get('connection_name',
getattr(settings, 'LOG_QUERY_DATABASE_CONNECTION', 'default'))
self.log_duplicate_queries = kwargs.get('log_duplicate_queries',
getattr(settings, 'LOG_QUERY_DUPLICATE_QUERIES', True))
self.log_tracebacks = kwargs.get('log_tracebacks',
getattr(settings, 'LOG_QUERY_TRACEBACKS', False))
self.log_long_running_time = kwargs.get('log_long_running_time',
getattr(settings, 'LOG_QUERY_TIME_ABSOLUTE_LIMIT', 1000))
self.logging_extras = kwargs.get('logging_extra_dict', {})
# This is for internal testing only. If you add unit tests yourself for the query debbuging mixin, then you can
# define a settings.TESTING variable that is True when unit tests are running.
self.testing = getattr(settings, 'TESTING', False)
def get_subjects_summary(self, parent, params):
subjects_list = []
if settings.TESTING:
return JsonResponse({'results': subjects_list})
purchased_subjects = []
# get subject from order
ordered_subjects = models.Order.objects.filter(
parent=parent, status=models.Order.PAID).values(
'subject', 'grade', 'created_at').order_by(
'-created_at')
for tmp_order in ordered_subjects:
# only support math presently
tmp_subject = models.Subject.objects.get(id=tmp_order['subject'])
s_name = tmp_subject.name
if s_name in purchased_subjects:
continue
purchased_subjects.append(s_name)
if s_name in klx.KLX_REPORT_SUBJECTS:
s_name_en = klx.klx_subject_name(s_name)
url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en)
subject_data = {
'subject_id': tmp_subject.id, 'supported': True,
'purchased': True, 'grade_id': tmp_order['grade']}
# ???????????
subject_data.update(self._get_total_nums(url, params))
subjects_list.append(subject_data)
else:
subjects_list.append({
'subject_id': tmp_subject.id,
'purchased': True,
'grade_id': tmp_order['grade'],
'supported': False,
})
# subjects supported, but user did not purchase
should_buy_subjects = [b for b in klx.KLX_REPORT_SUBJECTS
if b not in purchased_subjects]
if should_buy_subjects:
to_buy_subjects = models.Subject.objects.filter(
name__in=should_buy_subjects)
for s in to_buy_subjects:
subjects_list.append({
'subject_id': s.id,
'supported': True,
'purchased': False,
})
settings.DEBUG and logger.debug(json.dumps(subjects_list))
return JsonResponse({'results': subjects_list})
def klx_register(role, uid, name, password=None, subject=None):
'''
:param role: 1???? 2???
:param uid: models.User.id
:param name: Student Name or Teacher Name
:param password: ??123456
:param subject: ?role??????????
:return: kuailexue username
'''
klx_url = settings.KUAILEXUE_SERVER + '/third-partner/register'
# _logger.debug(klx_url)
params = {
'role': role,
'uid': uid,
'name': name,
}
if password is not None:
params['password'] = password
if subject is not None:
params['subject'] = subject
params = klx_build_params(params, True)
if settings.TESTING:
return klx_verify_sign(params) and '1' or '0'
# _logger.debug(params)
try:
resp = requests.post(klx_url, data=params, timeout=10)
except Exception as err:
_logger.error('cannot reach kuailexue server')
_logger.exception(err)
return None
if resp.status_code != 200:
_logger.error('cannot reach kuailexue server, http_status is %s' % (resp.status_code))
# raise KuailexueServerError('cannot reach kuailexue server, http_status is %s' % (resp.status_code))
return None
if _get_is_cold_testing():
_console.warning(klx_url+'?'+urllib.parse.urlencode(params))
ret_json = json.loads(resp.content.decode('utf-8'))
if _get_is_cold_testing():
_console.info(ret_json)
if ret_json.get('data') is not None: # code == 0, ??????code != 0
ret_data = ret_json.get('data')
return ret_data.get('username') # (????)??????? KUAILEXUE_PARTNER+uid+'_'+${YYYY}
else:
req_url = klx_url+'?'+urllib.parse.urlencode(params)
_logger.error('kuailexue reponse data error, CODE: %s, MSG: %s. (URL=%s)' % (ret_json.get('code'), ret_json.get('message'), req_url))
# raise KuailexueDataError('get kuailexue wrong data, CODE: %s, MSG: %s' % (ret_json.get('code'), ret_json.get('message')))
return None