def do_default_site(self, using=DEFAULT_DB):
"""
If no site was selected, selects the site used to create the article
as the default site.
Returns True if an additional save is required, False otherwise.
"""
if not len(self.sites.all()):
sites = Site.objects.all()
if hasattr(sites, 'using'):
sites = sites.using(using)
self.sites.add(sites.get(pk=settings.SITE_ID))
return True
return False
python类Site()的实例源码
def get_context(self):
"""
Return the context used to render the templates for the email
subject and body.
By default, this context includes:
* All of the validated values in the form, as variables of the
same names as their fields.
* The current ``Site`` object, as the variable ``site``.
* Any additional variables added by context processors (this
will be a ``RequestContext``).
"""
if not self.is_valid():
raise ValueError("Cannot generate Context from invalid contact form")
return dict(self.cleaned_data, site=Site.objects.get_current())
def clean_body(self):
"""
Perform Akismet validation of the message.
"""
if 'body' in self.cleaned_data and getattr(settings, 'AKISMET_API_KEY', ''):
from akismet import Akismet
from django.utils.encoding import smart_str
akismet_api = Akismet(key=settings.AKISMET_API_KEY,
blog_url='http://%s/' % Site.objects.get_current().domain)
if akismet_api.verify_key():
akismet_data = { 'comment_type': 'comment',
'referer': self.request.META.get('HTTP_REFERER', ''),
'user_ip': self.request.META.get('REMOTE_ADDR', ''),
'user_agent': self.request.META.get('HTTP_USER_AGENT', '') }
if akismet_api.comment_check(smart_str(self.cleaned_data['body']), data=akismet_data, build_data=True):
raise forms.ValidationError(_("Akismet thinks this message is spam"))
return self.cleaned_data['body']
def handle(self, *args, **options):
stripe.api_key = stripe_settings.API_KEY
site_id = options.get("site")
site = Site.objects.get(pk=site_id) if site_id else Site.objects.all()[0]
pending_webhooks = []
last_event = StripeWebhook.objects.first()
last_event_id = last_event.id if last_event else None
try:
if last_event:
stripe.Event.retrieve(last_event_id)
except stripe.error.InvalidRequestError:
last_event_id = None
while True:
event_list = stripe.Event.list(ending_before=last_event_id, limit=100) # 100 is the maximum
pending_webhooks += event_list["data"]
if len(pending_webhooks) > stripe_settings.PENDING_WEBHOOKS_THRESHOLD:
raise StripePendingWebooksLimitExceeded(pending_webhooks, site)
if not event_list["has_more"]:
break
else:
last_event_id = event_list["data"][-1]["id"]
def test_permissioned_page_list(self):
"""
Makes sure that a user with restricted page permissions can view
the page list.
"""
admin_user, normal_guy = self._get_guys(use_global_permissions=False)
current_site = Site.objects.get(pk=1)
page = create_page("Test page", "nav_playground.html", "en",
site=current_site, created_by=admin_user)
PagePermission.objects.create(page=page, user=normal_guy)
with self.login_user_context(normal_guy):
resp = self.client.get(URL_CMS_PAGE)
self.assertEqual(resp.status_code, 200)
def _give_cms_permissions(self, user, save=True):
for perm_type in ['add', 'change', 'delete']:
for model in [Page, Title]:
self._give_permission(user, model, perm_type, False)
gpp = GlobalPagePermission.objects.create(
user=user,
can_change=True,
can_delete=True,
can_change_advanced_settings=False,
can_publish=True,
can_change_permissions=False,
can_move_page=True,
)
gpp.sites = Site.objects.all()
if save:
user.save()
def test_missmatching_site_parent_dotsite(self):
site0 = Site.objects.create(domain='foo.com', name='foo.com')
site1 = Site.objects.create(domain='foo2.com', name='foo.com')
parent_page = Page.objects.create(
template='nav_playground.html',
site=site0)
new_page_data = {
'title': 'Title',
'slug': 'slug',
'language': 'en',
'site': site1.pk,
'template': get_cms_setting('TEMPLATES')[0][0],
'reverse_id': '',
'parent': parent_page.pk,
}
form = PageForm(data=new_page_data, files=None)
self.assertFalse(form.is_valid())
self.assertIn(u"Site doesn't match the parent's page site",
form.errors['__all__'])
def user_has_page_add_perm(user, site=None):
"""
Checks to see if user has add page permission. This is used in multiple
places so is DRYer as a true function.
:param user:
:param site: optional Site object (not just PK)
:return: Boolean
"""
if not site:
site = Site.objects.get_current()
if get_cms_setting('PERMISSION'):
global_add_perm = (
GlobalPagePermission
.objects
.user_has_add_permission(user, site.pk)
.exists()
)
else:
global_add_perm = True
return has_auth_page_permission(user, action='add') and global_add_perm
def get_subordinate_groups(user):
"""
Similar to get_subordinate_users, but returns queryset of Groups instead
of Users.
"""
if (user.is_superuser or
GlobalPagePermission.objects.with_can_change_permissions(user)):
return Group.objects.all()
site = Site.objects.get_current()
page_id_allow_list = Page.permissions.get_change_permissions_id_list(user, site)
try:
user_level = get_user_permission_level(user)
except NoPermissionsException:
# no permission no records
# page_id_allow_list is empty
return Group.objects.distinct().filter(
Q(pageusergroup__created_by=user) &
Q(pagepermission__page=None)
)
return Group.objects.distinct().filter(
(Q(pagepermission__page__id__in=page_id_allow_list) & Q(pagepermission__page__level__gte=user_level))
| (Q(pageusergroup__created_by=user) & Q(pagepermission__page=None))
)
def get_current_tip_of_day(request):
user = request.user
if user.is_anonymous():
return Response()
try:
visibility_filter = {'for_staff': True} if user.is_staff else {'for_nonstaff': True}
site_filter = None
if isinstance(get_current_site(request), Site):
site_filter = Q(sites=None) | Q(sites__id__exact=get_current_site(request).id)
available_tips = TipOfDay.objects.filter(
active=True,
**visibility_filter
).exclude(
seen_by=user.pk,
).filter(
Q(groups=None) |
Q(groups__in=user.groups.all())
).filter(site_filter).order_by('id')
serializer = TipOfDaySerializer(available_tips[0])
return Response(serializer.data)
except IndexError:
return Response()
def root(cls):
site = Site.objects.get_current()
root_nodes = list(
cls.objects.root_nodes().filter(site=site).select_related_common()
)
# We fetch the nodes as a list and use len(), not count() because we need
# to get the result out anyway. This only takes one sql query
no_paths = len(root_nodes)
if no_paths == 0:
raise NoRootURL(
"You need to create a root article on site '%s'" %
site)
if no_paths > 1:
raise MultipleRootURLs(
"Somehow you have multiple roots on %s" %
site)
return root_nodes[0]
def create_root(cls, site=None, title="Root", request=None, **kwargs):
if not site:
site = Site.objects.get_current()
root_nodes = cls.objects.root_nodes().filter(site=site)
if not root_nodes:
# (get_or_create does not work for MPTT models??)
article = Article()
revision = ArticleRevision(title=title, **kwargs)
if request:
revision.set_from_request(request)
article.add_revision(revision, save=True)
article.save()
root = cls.objects.create(site=site, article=article)
article.add_object_relation(root)
else:
root = root_nodes[0]
return root
def create_default_site(app, created_models, verbosity, db, **kwargs):
# Only create the default sites in databases where Django created the table
if Site in created_models and router.allow_syncdb(db, Site) :
# The default settings set SITE_ID = 1, and some tests in Django's test
# suite rely on this value. However, if database sequences are reused
# (e.g. in the test suite after flush/syncdb), it isn't guaranteed that
# the next id will be 1, so we coerce it. See #15573 and #16353. This
# can also crop up outside of tests - see #15346.
if verbosity >= 2:
print("Creating example.com Site object")
Site(pk=1, domain="example.com", name="example.com").save(using=db)
# We set an explicit pk instead of relying on auto-incrementation,
# so we need to reset the database sequence. See #17415.
sequence_sql = connections[db].ops.sequence_reset_sql(no_style(), [Site])
if sequence_sql:
if verbosity >= 2:
print("Resetting sequence")
cursor = connections[db].cursor()
for command in sequence_sql:
cursor.execute(command)
Site.objects.clear_cache()
def create_user_profile(sender, instance, created, **kwargs):
""" Create profile and preference upon user creation (``post_save.connect``).
:param sender: sender object
:param instance: user instance
:param created: user creation flag
:param kwargs: dictionary argument
:return: None
"""
if created:
user = instance
# Create profile / preference objects for user
profile, created = UserProfile.objects.get_or_create(user=instance)
preference, created = UserPreference.objects.get_or_create(user=instance,
site=Site.objects.get_current())
# User automatically belongs to meta_all_members
g, c = Group.objects.get_or_create(name=PermissionAlias.all)
user.groups.add(g)
def test_view_subscribe_with_types(self):
"""view subscribe page"""
site1 = Site.objects.get_current()
site2 = mommy.make(Site)
subscribe_type1 = mommy.make(models.SubscriptionType, name="#News#abc", site=site1)
subscribe_type2 = mommy.make(models.SubscriptionType, name="#News#def", site=site1)
subscribe_type3 = mommy.make(models.SubscriptionType, name="#News#ghi", site=site2)
subscribe_type4 = mommy.make(models.SubscriptionType, name="#News#jkl")
url = reverse("emailing_subscribe_newsletter")
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertContains(response, subscribe_type1.name)
self.assertContains(response, subscribe_type2.name)
self.assertNotContains(response, subscribe_type3.name)
self.assertNotContains(response, subscribe_type4.name)
def site_settings(db, settings):
'''
This fixture is autouse set to True because
django.contrib.sites.models.Site and saleor.site.models.SiteSettings has
OneToOne relationship and Site should never exist without SiteSettings.
'''
site = Site.objects.get_or_create(name="mirumee.com", domain="mirumee.com")[0]
obj = SiteSettings.objects.get_or_create(site=site)[0]
settings.SITE_ID = site.pk
return obj
def process_request(self, request):
Site.objects.clear_cache()
request.site = Site.objects.get_current()
def add_default_site(instance, created, **kwargs):
"""
Called via Django's signals when an instance is created.
In case PHOTOLOGUE_MULTISITE is False, the current site (i.e.
``settings.SITE_ID``) will always be added to the site relations if none are
present.
"""
if not created:
return
if getattr(settings, 'PHOTOLOGUE_MULTISITE', False):
return
if instance.sites.exists():
return
instance.sites.add(Site.objects.get_current())
def events(request):
from django.contrib.sites.models import Site
site = Site.objects.first()
if request.user.is_anonymous():
messages = []
else:
messages = FieldSightMessage.inbox(request.user)
oid = 0
pid = 0
sid = 0
logs = []
if request.group is not None:
if request.group.name == "Super Admin":
logs = FieldSightLog.objects.filter(is_seen=False)[:10]
oid = 0
elif request.group.name == "Organization Admin":
logs = FieldSightLog.objects.filter(organization=request.organization).filter(is_seen=False)[:10]
oid = request.organization.id
elif request.group.name == "Project Manager":
logs = FieldSightLog.objects.filter(organization=request.organization).filter(is_seen=False)[:10]
pid = request.project.id
elif request.group.name in ["Reviewer", "Site Supevisor"]:
logs = FieldSightLog.objects.filter(organization=request.organization).filter(is_seen=False)[:10]
sid = request.site.id
else:
logs = []
oid = None
channels_url = "ws://"+settings.WEBSOCKET_URL+":"+settings.WEBSOCKET_PORT+"/" \
if settings.WEBSOCKET_PORT else "ws://"+settings.WEBSOCKET_URL+"/"
return {
'notifications': logs,
'fieldsight_message': messages,
'oid': oid,
'pid': pid,
'sid': sid,
'channels_url': channels_url,
'site_name': site.domain
}
def url(self):
'''
Because invoice URLs are generally emailed, this
includes the default site URL and the protocol specified in
settings.
'''
if self.id:
return '%s://%s%s' % (
getConstant('email__linkProtocol'),
Site.objects.get_current().domain,
reverse('viewInvoice', args=[self.id,]),
)
def get_absolute_url(self):
'''
For adding 'View on Site' links to the admin
'''
return reverse('viewInvoice', args=[self.id,])
def create_site(self, form):
site = Site(name=form.cleaned_data["name"], domain=form.cleaned_data["domain"])
site.save()
return site
def form_valid(self, form):
Site.objects.clear_cache()
return super(WebsiteCustomize, self).form_valid(form)
def add_arguments(self, parser):
parser.add_argument(
"--site",
help="Site id to use while running the command. First site in the database will be used if not provided."
)
def test_2apphooks_with_same_namespace(self):
PAGE1 = 'Test Page'
PAGE2 = 'Test page 2'
APPLICATION_URLS = 'project.sampleapp.urls'
admin_user, normal_guy = self._get_guys()
current_site = Site.objects.get(pk=1)
# The admin creates the page
page = create_page(PAGE1, "nav_playground.html", "en",
site=current_site, created_by=admin_user)
page2 = create_page(PAGE2, "nav_playground.html", "en",
site=current_site, created_by=admin_user)
page.application_urls = APPLICATION_URLS
page.application_namespace = "space1"
page.save()
page2.application_urls = APPLICATION_URLS
page2.save()
# The admin edits the page (change the page name for ex.)
page_data = {
'title': PAGE2,
'slug': page2.get_slug(),
'language': 'en',
'site': page.site.pk,
'template': page2.template,
'application_urls': 'SampleApp',
'application_namespace': 'space1',
}
with self.login_user_context(admin_user):
resp = self.client.post(base.URL_CMS_PAGE_ADVANCED_CHANGE % page.pk, page_data)
self.assertEqual(resp.status_code, 302)
self.assertEqual(Page.objects.filter(application_namespace="space1").count(), 1)
resp = self.client.post(base.URL_CMS_PAGE_ADVANCED_CHANGE % page2.pk, page_data)
self.assertEqual(resp.status_code, 200)
page_data['application_namespace'] = 'space2'
resp = self.client.post(base.URL_CMS_PAGE_ADVANCED_CHANGE % page2.pk, page_data)
self.assertEqual(resp.status_code, 302)
def test_preview_page(self):
permless = self.get_permless()
with self.login_user_context(permless):
request = self.get_request()
self.assertRaises(Http404, self.admin_class.preview_page, request, 404, "en")
page = self.get_page()
page.publish("en")
base_url = page.get_absolute_url()
with self.login_user_context(permless):
request = self.get_request('/?public=true')
response = self.admin_class.preview_page(request, page.pk, 'en')
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], '%s?%s&language=en' % (base_url, get_cms_setting('CMS_TOOLBAR_URL__EDIT_ON')))
request = self.get_request()
response = self.admin_class.preview_page(request, page.pk, 'en')
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], '%s?%s&language=en' % (base_url, get_cms_setting('CMS_TOOLBAR_URL__EDIT_ON')))
current_site = Site.objects.create(domain='django-cms.org', name='django-cms')
page.site = current_site
page.save()
page.publish("en")
self.assertTrue(page.is_home)
response = self.admin_class.preview_page(request, page.pk, 'en')
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'],
'http://django-cms.org%s?%s&language=en' % (base_url, get_cms_setting('CMS_TOOLBAR_URL__EDIT_ON')))
def test_editpage_contentsize(self):
"""
Expected a username only 2 times in the content, but a relationship
between usercount and pagesize
"""
with self.settings(CMS_PERMISSION=True):
admin_user = self.get_superuser()
PAGE_NAME = 'TestPage'
USER_NAME = 'test_size_user_0'
current_site = Site.objects.get(pk=1)
page = create_page(PAGE_NAME, "nav_playground.html", "en", site=current_site, created_by=admin_user)
page.save()
self._page = page
with self.login_user_context(admin_user):
url = base.URL_CMS_PAGE_PERMISSION_CHANGE % self._page.pk
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
old_response_size = len(response.content)
old_user_count = get_user_model().objects.count()
# create additionals user and reload the page
get_user_model().objects.create_user(username=USER_NAME, email=USER_NAME + '@django-cms.org',
password=USER_NAME)
user_count = get_user_model().objects.count()
more_users_in_db = old_user_count < user_count
# we have more users
self.assertTrue(more_users_in_db, "New users got NOT created")
response = self.client.get(url)
new_response_size = len(response.content)
page_size_grown = old_response_size < new_response_size
# expect that the pagesize gets influenced by the useramount of the system
self.assertTrue(page_size_grown, "Page size has not grown after user creation")
# usernames are only 2 times in content
text = smart_str(response.content, response.charset)
foundcount = text.count(USER_NAME)
# 2 forms contain usernames as options
self.assertEqual(foundcount, 2,
"Username %s appeared %s times in response.content, expected 2 times" % (
USER_NAME, foundcount))
def test_get_site_id_from_site(self):
site = Site()
site.id = 10
self.assertEqual(10, get_site_id(site))
def setUp(self):
self.assertEqual(Site.objects.all().count(), 1)
with self.settings(SITE_ID=1):
u = self._create_user("test", True, True)
# setup sites
self.site2 = Site.objects.create(domain="sample2.com", name="sample2.com", pk=2)
self.site3 = Site.objects.create(domain="sample3.com", name="sample3.com", pk=3)
self._login_context = self.login_user_context(u)
self._login_context.__enter__()
def has_global_page_permission(request, site=None, user=None, **filters):
"""
A helper function to check for global page permissions for the current user
and site. Caches the result on a request basis, so multiple calls to this
function inside of one request/response cycle only generate one query.
:param request: the Request object
:param site: the Site object or ID
:param filters: queryset filters, e.g. ``can_add = True``
:return: ``True`` or ``False``
"""
if not user:
user = request.user
if not user.is_authenticated():
return False
if not get_cms_setting('PERMISSION') or user.is_superuser:
return True
if not hasattr(request, '_cms_global_perms'):
request._cms_global_perms = {}
key = tuple((k, v) for k, v in filters.items())
if site:
key = (('site', site.pk if hasattr(site, 'pk') else int(site)),) + key
if key not in request._cms_global_perms:
qs = GlobalPagePermission.objects.with_user(user).filter(**filters)
if site:
qs = qs.filter(Q(sites__in=[site]) | Q(sites__isnull=True))
request._cms_global_perms[key] = qs.exists()
return request._cms_global_perms[key]