def plans(request):
token = None
has_verified_email = False
plans = None
if request.method == 'POST':
template = 'radio/subscribed.html'
token = request.POST.get('stripeToken')
plan = request.POST.get('plan')
# See if this user already has a stripe account
try:
stripe_cust = stripe_models.Customer.objects.get(user=request.user)
except ObjectDoesNotExist:
stripe_actions.customers.create(user=request.user)
stripe_cust = stripe_models.Customer.objects.get(user=request.user)
try:
stripe_info = stripe_actions.subscriptions.create(customer=stripe_cust, plan=plan, token=request.POST.get('stripeToken'))
except stripe.CardError as e:
template = 'radio/charge_failed.html'
logger.error("Error with stripe user card{}".format(e))
return render(request, template, {'error_msg': e })
for t in request.POST:
logger.error("{} {}".format(t, request.POST[t]))
else:
template = 'radio/plans.html'
plans = StripePlanMatrix.objects.filter(order__lt=99).filter(active=True)
# Check if users email address is verified
verified_email = allauth_emailaddress.objects.filter(user=request.user, primary=True, verified=True)
if verified_email:
has_verified_email = True
return render(request, template, {'token': token, 'verified_email': has_verified_email, 'plans': plans} )
python类method()的实例源码
def login():
if request.method == 'GET':
return render_template("login.html")
username = request.form['username']
password = request.form['password']
reg_user = User.query.filter_by(username=username, password=password).first()
if reg_user is None:
flash("Username or password is invalid. Please try again.", "error")
return redirect(url_for('login'))
login_user(reg_user)
return redirect(url_for('index'))
def prev(self, error_out=False):
"""Returns a :class:`Pagination` object for the previous page."""
assert self.query is not None, 'a query object is required ' \
'for this method to work'
return paginate(self.query, self.page - 1, self.per_page, error_out)
def next(self, error_out=False):
"""Returns a :class:`Pagination` object for the next page."""
assert self.query is not None, 'a query object is required ' \
'for this method to work'
return paginate(self.query, self.page + 1, self.per_page, error_out)
def test_close(self):
# Test close() by calling it here and then having it be called again
# by the tearDown() method for the test
self.returned_obj.close()
def help_inputtype(self, given, test_type):
"""Helper method for testing different input types.
'given' must lead to only the pairs:
* 1st, 1
* 2nd, 2
* 3rd, 3
Test cannot assume anything about order. Docs make no guarantee and
have possible dictionary input.
"""
expect_somewhere = ["1st=1", "2nd=2", "3rd=3"]
result = urllib.parse.urlencode(given)
for expected in expect_somewhere:
self.assertIn(expected, result,
"testing %s: %s not found in %s" %
(test_type, expected, result))
self.assertEqual(result.count('&'), 2,
"testing %s: expected 2 '&'s; got %s" %
(test_type, result.count('&')))
amp_location = result.index('&')
on_amp_left = result[amp_location - 1]
on_amp_right = result[amp_location + 1]
self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(),
"testing %s: '&' not located in proper place in %s" %
(test_type, result))
self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps
"testing %s: "
"unexpected number of characters: %s != %s" %
(test_type, len(result), (5 * 3) + 2))
def test_with_method_arg(self):
Request = urllib.request.Request
request = Request("http://www.python.org", method='HEAD')
self.assertEqual(request.method, 'HEAD')
self.assertEqual(request.get_method(), 'HEAD')
request = Request("http://www.python.org", {}, method='HEAD')
self.assertEqual(request.method, 'HEAD')
self.assertEqual(request.get_method(), 'HEAD')
request = Request("http://www.python.org", method='GET')
self.assertEqual(request.get_method(), 'GET')
request.method = 'HEAD'
self.assertEqual(request.get_method(), 'HEAD')
def upload(request):
if request.method == 'POST':
form = ImageUploadForm(request.POST, request.FILES)
if form.is_valid():
return handle_uploaded_image(request)
else:
print("Not valid")
return HttpResponseRedirect(reverse('search_web:index'))
return HttpResponseRedirect(reverse('search_web:index'))
def new(request):
if request.method == "GET":
if request.GET.get("username", None) is not None and request.GET.get("email", None) is not None:
user = User.objects.create_user(request.GET.get("username", None), request.GET.get("email", None), '')
user.first_name = request.GET.get("first_name", None)
user.last_name = request.GET.get("last_name", None)
user.save()
training_folder = os.path.join(TRAINED_FACES_PATH, str(user.pk))
if not os.path.exists(training_folder):
os.makedirs(training_folder)
return JsonResponse({"sucess": True})
def index(request):
BASE_URL = 'https://api.github.com/users/'
form = SearchForm(request.POST or None)
name = None
details = ''
status = ''
photo = ''
if request.method == 'POST':
try:
name = request.POST.get('search')
#print(request.POST)
url = BASE_URL + name
r = requests.get(url)
print(r.status_code)
if(r.status_code == 404):
status = 'User "' + name + '" does not exists.'
name = None
elif(r.status_code == 200):
details = r.json()
photo = details['avatar_url']
name = details['name']
details = (('Name', details['name']),
('Bio', details['bio']),
('UserID', details['login']),
('Email', details['email']),
('Company', details['company']),
('Blog' , details['blog']),
('Location' , details['location']),
('Hireable' , details['hireable']),
('Public Repos' , details['public_repos']),
('Public Gists' , details['public_gists']),
('Followers' , details['followers']),
('Following' , details['following']),
)
details = collections.OrderedDict(details)
else:
status = 'There is some error with your request. Please try again later.'
except ConnectionError:
status = "Connection Error!! Try again later"
#except HTTPError:
# status = "Invalid HTTP Response!! Try again later"
except:
status = 'Error!! Try again later'
context = {
'form': form ,
'name': name ,
'status': status,
'details': details,
'photo': photo,
}
return render(request, 'index.html', context)
def validate_captcha(view):
"""
Decorator to validate a captcha, settings from django
@validate_Captcha
def a_view():
...
"""
def wrap(request, *args, **kwargs):
def failure_http():
# Status 401 means that they are not authorized
return render(request, 'captcha_fail.html', status=401)
def failure_ajax():
return HttpResponse(
'There was a problem with the captcha, please try again', status=401)
if request.method == 'POST':
url = "https://www.google.com/recaptcha/api/siteverify"
values = {
'secret': settings.GOOGLE_RECAPTCHA_SECRET_KEY,
'response': request.POST.get('g-recaptcha-response', None),
'remoteip': request.META.get("REMOTE_ADDR", None),
}
data = urllib.parse.urlencode(values)
req = urllib.request.Request(url, data)
response = urllib.request.urlopen(req)
result = json.loads(response.read())
# result["success"] will be True on a success
if result["success"]:
return view(request, *args, **kwargs)
elif request.is_ajax():
return failure_ajax()
else:
return failure_http()
return view(request, *args, **kwargs)
wrap._original = view
return wrap
def hello():
if request.method == 'GET':
if request.args.get("hub.verify_token") == VERIFY_TOKEN:
return request.args.get("hub.challenge")
else:
return 'Invalid verification token'
if request.method == 'POST':
output = request.get_json()
for event in output['entry']:
messaging = event['messaging']
for x in messaging:
if x.get('message'):
recipient_id = x['sender']['id']
if x['message'].get('text'):
message = x['message']['text']
msg = AI.process_message(message)
bot.send_text_message(recipient_id, msg)
if x['message'].get('attachments'):
for att in x['message'].get('attachments'):
print(att['payload']['url'])
voice_url = urllib.request.urlopen(att['payload']['url'])
with open('voicemsg.aac', 'w+b') as f:
f.write(voice_url.read())
f.close()
aac_file = AudioSegment.from_file('voicemsg.aac', format='aac')
wav_handler = aac_file.export('rawmsg.wav', format='wav')
os.system("sox rawmsg.wav -r 16000 temp.wav")
wav_handler = AudioSegment.from_file('temp.wav', format='wav')
raw_handler = wav_handler.export('rawmsg.raw', format='raw')
decoder.start_utt()
stream = open('rawmsg.raw', 'rb')
#stream.seek(44)
while True:
buf = stream.read(1024)
if buf:
decoder.process_raw(buf, False, False)
else:
break
decoder.end_utt()
sentence = " ".join([seg.word for seg in decoder.seg()])
bot.send_text_message(recipient_id, sentence)
#bot.send_attachment_url(recipient_id, att['type'], att['payload']['url'])
else:
pass
return "Success"
def create_listing(request): # /create-listing
if request.method != 'POST':
return _error_response(request, err_exp.E_BAD_REQUEST, "must make POST request")
form = ListingForm(request.POST)
if not form.is_valid():
return _error_response(request, err_exp.E_FORM_INVALID, "listing form not correctly filled out")
post_data = form.cleaned_data
post_data['auth'] = request.POST['auth']
post_data['last_checked_out'] = datetime.datetime.now()
post_encoded = urllib.parse.urlencode(post_data).encode('utf-8')
req = urllib.request.Request('http://models-api:8000/api/v1/drone/create/', data=post_encoded, method='POST')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
if not resp:
return _error_response(request, err_exp.E_REGISTER_FAILED, "no response from models API")
if resp['ok'] == False: # could be much more nuanced. makes web view handle errors
return _error_response(request, err_exp.E_REGISTER_FAILED, resp)
post_data['_drone_key'] = resp['resp']['drone_id']
post_data['time_posted'] = datetime.datetime.now()
post_encoded = urllib.parse.urlencode(post_data).encode('utf-8')
req = urllib.request.Request('http://models-api:8000/api/v1/listing/create/', data=post_encoded, method='POST')
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp = json.loads(resp_json)
if not resp:
return _error_response(request, err_exp.E_REGISTER_FAILED, "no response from models API")
if resp['ok'] == False: # could be much more nuanced. makes web view handle errors
return _error_response(request, err_exp.E_REGISTER_FAILED, {'resp':resp})
# add newly created listing to Kafka
# get listing
'''
req = urllib.request.Request('http://models-api:8000/api/v1/listing/'+resp['resp']['listing_id'])
resp_json = urllib.request.urlopen(req).read().decode('utf-8')
resp1 = json.loads(resp_json)
resp1['listing_id'] = resp['listing_id']
'''
# add to kafka
producer = KafkaProducer(bootstrap_servers='kafka:9092')
# need to pass dictionary object
new_listing = resp['resp']['listing']
producer.send('new-listings-topic', json.dumps(new_listing).encode('utf-8'))
print(new_listing)
return _success_response(request, resp['resp']['listing'])
def upgrade(request):
if request.method == 'POST':
form = PaymentForm(request.POST)
if not form.is_valid():
return render(
request,
'registration/upgrade.html',
{'form': form},
)
try:
plan = form.cleaned_data.get('plan_type')
card_name = form.cleaned_data.get('cardholder_name')
stripe_cust = stripe_models.Customer.objects.get(user=request.user)
logger.error('Change plan to {} for customer {} Card Name {}'.format(plan, stripe_cust, card_name))
stripe_info = stripe_actions.subscriptions.create(customer=stripe_cust, plan=plan, token=request.POST.get('stripeToken'))
except stripe.InvalidRequestError as e:
messages.error(request, "Error with stripe {}".format(e))
logger.error("Error with stripe {}".format(e))
return render(
request,
'registration/upgrade.html',
{'form': form},
)
except stripe.CardError as e:
messages.error(request, "<b>Error</b> Sorry there was an error with processing your card:<br>{}".format(e))
logger.error("Error with stripe user card{}".format(e))
return render(
request,
'registration/upgrade.html',
{'form': form},
)
print('------ STRIPE DEBUG -----')
pprint(stripe_info, sys.stderr)
return render(
request,
'registration/upgrade_complete.html',
)
else:
form = PaymentForm()
return render(
request,
'registration/upgrade.html',
{'form': form, },
)