def test_updating_cart_item(self):
session = self.client.session
cart = self._create_testing_cart()
cart.session_key = session.session_key
cart.save()
cart_item = self._create_testing_cart_item(cart_instance=cart, product_instance=self.test_product)
response = self.client.post(reverse('cart:update', kwargs={'pk': cart_item.pk}),
data={'cart_item_quantity': '2'}, follow=True
)
messages = [msg for msg in get_messages(response.wsgi_request)]
updated_quantity = response.context['cart'].items.first().quantity
cart_item.quantity = updated_quantity
cart_item.save()
self.assertEqual(response.status_code, 200)
self.assertEqual(cart_item.quantity, 2)
self.assertEqual(cart_item.total_price, Decimal(cart_item.quantity * cart_item.product.price))
self.assertEqual(messages[0].tags, 'success', 'Message type should return success type')
self.assertEqual(messages[0].message, 'Product quantity has been updated.')
python类get_messages()的实例源码
def is_valid(self):
'''
For this form to be considered valid, there must be not only no errors, but also no messages on
the request that need to be shown.
'''
valid = super(RegistrationContactForm,self).is_valid()
msgs = messages.get_messages(self._request)
# We only want validation messages to show up once, so pop messages that have already show up
# before checking to see if any messages remain to be shown.
prior_messages = self._session.pop('prior_messages',[])
remaining_messages = []
for m in msgs:
m_dict = {'message': m.message, 'level': m.level, 'extra_tags': m.extra_tags}
if m_dict not in prior_messages:
remaining_messages.append(m_dict)
if remaining_messages:
self._session['prior_messages'] = remaining_messages
self._request.session.modified = True
return False
return valid
def running_jobs(request):
#Strategies = Strategy.objects.all()
if request.user.is_authenticated():
username = request.user.username
useremail = request.user.email
messages.get_messages(request)
template = get_template('strategy.html')
request_context = RequestContext(request)
request_context.push(locals())
html = template.render(request_context)
running_jobs = scheduler.get_jobs()
#running_jobs = []
today = datetime.datetime.now()
#return HttpResponse(html)
return render_to_response('running_jobs.html', {"running_jobs": running_jobs, "username":username, "today":today},
context_instance=RequestContext(request))
#initial apscheduler
def assert_message_contains(self, response, message, level=None):
messages = self.get_messages(response)
found = self.filter_messages(messages, message, level)
if not found:
messages = [
'%s (%s)' % (msg.message, msg.level)
for msg in messages
]
if level:
pytest.fail(
'Message %r with level %r not found in request. '
'Available messages: %r' %
(message, level, messages)
)
else:
pytest.fail(
'Message %r not found in request. '
'Available messages: %r' %
(message, messages)
)
def test_deleting_cart_item(self):
session = self.client.session
cart = self._create_testing_cart()
cart.session_key = session.session_key
cart.save()
cart_item = self._create_testing_cart_item(
cart_instance=cart,
product_instance=self.test_product
)
response = self.client.post(reverse('cart:remove',
kwargs={'product_id': cart_item.product_id}),
data={'product_id': cart_item.product_id}, follow=True)
messages = [msg for msg in get_messages(response.wsgi_request)]
self.assertEqual(response.status_code, 200)
self.assertEqual(messages[0].tags,
'success',
'Message type should return success type'
)
self.assertEqual(
messages[0].message,
'The item has been deleted from your cart.',
'Message text should be equal to: The item has been deleted from '
'your cart')
self.assertEqual(cart.items.count(), 0, 'Cart should have zero items.')
def horizon_message_already_queued(request, message):
_message = force_text(message)
if request.is_ajax():
for tag, msg, extra in request.horizon['async_messages']:
if _message == msg:
return True
else:
for msg in _messages.get_messages(request)._queued_messages:
if msg.message == _message:
return True
return False
def error(request):
msgs = messages.get_messages(request)
return TemplateResponse(request, 'bom/error.html', locals())
def billing_view(request):
"""Shows CC info and transaction history."""
user_profile = UserProfile.objects.get(user=request.user)
network = user_profile.network
transactions = (network.ledger.transaction_set.filter(
kind__in=["credit", "adjustment"]).order_by('-created'))
transaction_paginator = Paginator(transactions, 10)
page = request.GET.get('page', 1)
try:
transactions = transaction_paginator.page(page)
except PageNotAnInteger:
# If page is not an integer, deliver first page.
transactions = transaction_paginator.page(1)
except EmptyPage:
# If page is out of range (e.g. 9999), deliver last page of results.
num_pages = transaction_paginator.num_pages
transactions = transaction_paginator.page(num_pages)
context = {
'networks': get_objects_for_user(request.user, 'view_network', klass=Network),
'user_profile': user_profile,
'transactions': transactions
}
# deal with messages; template needs to explicitly handle messages
msgs = messages.get_messages(request)
for m in msgs:
if "billing_resp_code" in m.tags:
context[m.message] = True # pass the message on to the template as-is
if network.stripe_card_type == "American Express":
context['card_type'] = 'AmEx'
else:
context['card_type'] = network.stripe_card_type
# Pass in card types that have icons.
context['cards_with_icons'] = ['Visa', 'AmEx', 'MasterCard', 'Discover']
t = get_template("dashboard/billing.html")
html = t.render(context, request)
return HttpResponse(html)
def dispatch(self, request, *args, **kwargs):
context = {}
if not request.user.is_authenticated:
return JsonResponse(context)
customer = getattr(request.user,'customer',None)
if customer:
context.update({
'customer': True,
'first_name': customer.first_name or request.user.first_name,
'last_name': customer.last_name or request.user.last_name,
'email': customer.email or request.user.email,
'phone': customer.phone,
})
else:
context.update({
'customer': False,
'first_name': request.user.first_name,
'last_name': request.user.last_name,
'email': request.user.email,
})
# Also add any outstanding messages (e.g. login successful message) to be
# relayed to the user when this information is used.
context['messages'] = []
for message in messages.get_messages(request):
context['messages'].append({
"level": message.level,
"message": message.message,
"extra_tags": message.tags,
})
return JsonResponse(context)
def index(request, pid=None, del_pass=None):
if request.user.is_authenticated():
username = request.user.username
useremail = request.user.email
messages.get_messages(request)
template = get_template('index.html')
request_context = RequestContext(request)
request_context.push(locals())
html = template.render(request_context)
return HttpResponse(html)
def strategy(request):
Strategies = Strategy.objects.all()
if request.user.is_authenticated():
username = request.user.username
useremail = request.user.email
messages.get_messages(request)
template = get_template('strategy.html')
request_context = RequestContext(request)
request_context.push(locals())
html = template.render(request_context)
return HttpResponse(html)
def get_messages(self, response):
if self._messages is None:
self._messages = get_messages(response.wsgi_request)
return self._messages
def assert_message_count(self, response, expected):
messages = self.get_messages(response)
actual_num = len(messages)
if actual_num != expected:
pytest.fail(
'Message count was %d, expected %d' % (actual_num, expected)
)
def assert_message_misses(self, response, message, level=None):
messages = self.get_messages(response)
found = self.filter_messages(messages, message, level)
if found:
if level:
pytest.fail(
'Message %r with level %r found in request' %
(message, level)
)
else:
pytest.fail('Message %r found in request' % message)
def messages_to_json(request):
json = {'messages': []}
for message in messages.get_messages(request):
json['messages'].append({
"level": message.level,
"level_tag": message.level_tag,
"message": message.message,
})
json['messages_html'] = render_to_string(
'home/includes/messages.html',
{'messages': messages.get_messages(request)})
return json
def test_alert_messages(self, alert_messages, expected_messages):
"""
Test that the alert_messages template tag returns the correct message given a message type.
"""
request = self._get_mock_request()
self._add_messages(request, alert_messages)
template = Template("{% load enterprise %} {% alert_messages messages %}")
rendered = template.render(Context({'messages': messages.get_messages(request)}))
for expected_icon, expected_message in expected_messages:
assert expected_icon in rendered.strip()
assert expected_message in rendered.strip()
def build_ajax_messages(request):
"""Create a structure suitable for converting to json from messages"""
ajax_messages = []
for message in messages.get_messages(request):
ajax_messages.append({
'level': message.level,
'message': message.message,
'extra_tags': message.tags
})
return ajax_messages
def cache_if_anon(timeout):
"""Cache the view if the user is not authenticated and there are no messages to display."""
# https://stackoverflow.com/questions/11661503/django-caching-for-authenticated-users-only
def _decorator(view_func):
@wraps(view_func, assigned=available_attrs(view_func))
def _wrapped_view(request, *args, **kwargs):
if request.user.is_authenticated() or messages.get_messages(request):
return view_func(request, *args, **kwargs)
else:
return cache_page(timeout)(view_func)(request, *args, **kwargs)
return _wrapped_view
return _decorator
def test_delete_message_mixin(self):
request = self.factory.post('/fake-path', {})
request._messages = default_storage(request)
DeleteMessageView.as_view()(request)
self.assertEqual(len(get_messages(request)), 1)
def test_extra_forms_and_formsets_and_update_message_mixin(self):
content_type = ContentType.objects.get(
app_label='contenttypes', model='contenttype',
)
request = self.factory.post('/fake-path', {
'app_label': content_type.app_label,
'model': content_type.model,
'permission_set-TOTAL_FORMS': 4,
'permission_set-INITIAL_FORMS': 3,
'permission_set-MIN_NUM_FORMS': 0,
'permission_set-MAX_NUM_FORMS': 1000,
'permission_set-0-name': "Can add content type",
'permission_set-0-codename': "add_contenttype",
'permission_set-0-id': 13,
'permission_set-0-content_type': 5,
'permission_set-1-name': "Can change content type",
'permission_set-1-codename': "change_contenttype",
'permission_set-1-id': 14,
'permission_set-1-content_type': 5,
'permission_set-2-name': "Can delete content type",
'permission_set-2-codename': "delete_contenttype",
'permission_set-2-id': 15,
'permission_set-2-content_type': 5,
'permission_set-3-name': "Can test content type",
'permission_set-3-codename': "test_contenttype",
'permission_set-3-content_type': 5,
})
request._messages = default_storage(request)
response = ExtraFormsAndFormsetsAndUpdateView.as_view()(
request,
pk=content_type.pk,
)
self.assertEqual(
response.permission_set.get(codename='test_contenttype').codename,
'test_contenttype'
)
self.assertEqual(len(get_messages(request)), 1)
def clearMsg(self, request):
storage = messages.get_messages(request)
for msg in storage:
try:
del msg._loaded_messages
except:
pass
# Check permissions for model and object both
def render_messages(request):
context = {"messages": messages.get_messages(request)}
return render_to_string("pinax_theme_bootstrap:_messages.html", context)
def check_server_configs(request, server_id):
"""Checks server's configuration.
When server has some installs it should have
particular configs for these installs.
"""
server = get_object_or_404(models.Server, id=server_id)
installs = models.Install.objects.filter(server_id=server.id)
for i in installs:
if i.item in ['mysql', 'postgresql']:
if i.item == 'mysql':
type_db = 'M'
else:
type_db = 'P'
message_01 = '{}: {}: Db not found'.format(server.code, i.item)
message_02 = '{}: {}: Db version not correct (It should be x.x)'\
.format(server.code, i.item)
message_03 = '{}: {}: root user not found'\
.format(server.code, i.item)
try:
db = models.Db.objects.get(
server_id=server.id, type='S', type_db=type_db)
if not re.match('\d\.\d$', db.version):
messages.error(request, message_02)
models.User.objects.get(type='db', db_id=db.id, name='root')
except models.Db.DoesNotExist:
messages.error(request, message_01)
messages.error(request, message_03)
except models.User.DoesNotExist:
messages.error(request, message_03)
for filename in i.required_files():
dic = dict(server_id=server.id, item=i.item, filename=filename)
if not models.Conf.objects.filter(**dic).exists():
message = '{}: {}: {} not found'\
.format(server.code, i.item, filename)
messages.error(request, message)
storage = messages.get_messages(request)
if len(storage) == 0:
messages.success(request, "Server is ready for install.")
return HttpResponseRedirect('/admin/api/server/{}/'.format(server_id))
def get_react_config(context):
if 'request' in context:
user = context['request'].user
messages = get_messages(context['request'])
try:
is_superuser = context['request'].is_superuser()
except AttributeError:
is_superuser = False
else:
user = None
messages = []
is_superuser = False
if user:
user = extract_lazy_object(user)
enabled_features = []
if features.has('organizations:create', actor=user):
enabled_features.append('organizations:create')
if features.has('auth:register', actor=user):
enabled_features.append('auth:register')
version_info = _get_version_info()
needs_upgrade = False
if is_superuser:
needs_upgrade = _needs_upgrade()
context = {
'singleOrganization': settings.SENTRY_SINGLE_ORGANIZATION,
'urlPrefix': options.get('system.url-prefix'),
'version': version_info,
'features': enabled_features,
'mediaUrl': get_asset_url('sentry', ''),
'needsUpgrade': needs_upgrade,
'dsn': _get_public_dsn(),
'messages': [{
'message': msg.message,
'level': msg.tags,
} for msg in messages],
}
if user and user.is_authenticated():
context.update({
'isAuthenticated': True,
'user': serialize(user, user),
})
context['user']['isSuperuser'] = is_superuser
else:
context.update({
'isAuthenticated': False,
'user': None,
})
return mark_safe(json.dumps(context))