def get_expiry_date(self, **kwargs):
"""Get session the expiry date (as a datetime object).
Optionally, this function accepts `modification` and `expiry` keyword
arguments specifying the modification and expiry of the session.
"""
try:
modification = kwargs['modification']
except KeyError:
modification = timezone.now()
# Same comment as in get_expiry_age
try:
expiry = kwargs['expiry']
except KeyError:
expiry = self.get('_session_expiry')
if isinstance(expiry, datetime):
return expiry
if not expiry: # Checks both None and 0 cases
expiry = settings.SESSION_COOKIE_AGE
return modification + timedelta(seconds=expiry)
python类now()的实例源码
def _ask_default(self):
print("Please enter the default value now, as valid Python")
print("The datetime and django.utils.timezone modules are available, so you can do e.g. timezone.now()")
while True:
if six.PY3:
# Six does not correctly abstract over the fact that
# py3 input returns a unicode string, while py2 raw_input
# returns a bytestring.
code = input(">>> ")
else:
code = input(">>> ").decode(sys.stdin.encoding)
if not code:
print("Please enter some code, or 'exit' (with no quotes) to exit.")
elif code == "exit":
sys.exit(1)
else:
try:
return eval(code, {}, {"datetime": datetime_safe, "timezone": timezone})
except (SyntaxError, NameError) as e:
print("Invalid input: %s" % e)
def ask_not_null_addition(self, field_name, model_name):
"Adding a NOT NULL field to a model"
if not self.dry_run:
choice = self._choice_input(
"You are trying to add a non-nullable field '%s' to %s without a default; "
"we can't do that (the database needs something to populate existing rows).\n"
"Please select a fix:" % (field_name, model_name),
[
"Provide a one-off default now (will be set on all existing rows)",
"Quit, and let me add a default in models.py",
]
)
if choice == 2:
sys.exit(3)
else:
return self._ask_default()
return None
def ask_not_null_alteration(self, field_name, model_name):
"Changing a NULL field to NOT NULL"
if not self.dry_run:
choice = self._choice_input(
"You are trying to change the nullable field '%s' on %s to non-nullable "
"without a default; we can't do that (the database needs something to "
"populate existing rows).\n"
"Please select a fix:" % (field_name, model_name),
[
"Provide a one-off default now (will be set on all existing rows)",
("Ignore for now, and let me handle existing rows with NULL myself "
"(e.g. because you added a RunPython or RunSQL operation to handle "
"NULL values in a previous data migration)"),
"Quit, and let me add a default in models.py",
]
)
if choice == 2:
return NOT_PROVIDED
elif choice == 3:
sys.exit(3)
else:
return self._ask_default()
return None
def get_dated_queryset(self, **lookup):
"""
Get a queryset properly filtered according to `allow_future` and any
extra lookup kwargs.
"""
qs = self.get_queryset().filter(**lookup)
date_field = self.get_date_field()
allow_future = self.get_allow_future()
allow_empty = self.get_allow_empty()
paginate_by = self.get_paginate_by(qs)
if not allow_future:
now = timezone.now() if self.uses_datetime_field else timezone_today()
qs = qs.filter(**{'%s__lte' % date_field: now})
if not allow_empty:
# When pagination is enabled, it's better to do a cheap query
# than to load the unpaginated queryset in memory.
is_empty = len(qs) == 0 if paginate_by is None else not qs.exists()
if is_empty:
raise Http404(_("No %(verbose_name_plural)s available") % {
'verbose_name_plural': force_text(qs.model._meta.verbose_name_plural)
})
return qs
def has_key(self, key, version=None):
key = self.make_key(key, version=version)
self.validate_key(key)
db = router.db_for_read(self.cache_model_class)
connection = connections[db]
table = connection.ops.quote_name(self._table)
if settings.USE_TZ:
now = datetime.utcnow()
else:
now = datetime.now()
now = now.replace(microsecond=0)
with connection.cursor() as cursor:
cursor.execute("SELECT cache_key FROM %s "
"WHERE cache_key = %%s and expires > %%s" % table,
[key, connection.ops.adapt_datetimefield_value(now)])
return cursor.fetchone() is not None
def _cull(self, db, cursor, now):
if self._cull_frequency == 0:
self.clear()
else:
connection = connections[db]
table = connection.ops.quote_name(self._table)
cursor.execute("DELETE FROM %s WHERE expires < %%s" % table,
[connection.ops.adapt_datetimefield_value(now)])
cursor.execute("SELECT COUNT(*) FROM %s" % table)
num = cursor.fetchone()[0]
if num > self._max_entries:
cull_num = num // self._cull_frequency
cursor.execute(
connection.ops.cache_key_culling_sql() % table,
[cull_num])
cursor.execute("DELETE FROM %s "
"WHERE cache_key < %%s" % table,
[cursor.fetchone()[0]])
def load(self):
try:
data = self._cache.get(self.cache_key)
except Exception:
# Some backends (e.g. memcache) raise an exception on invalid
# cache keys. If this happens, reset the session. See #17810.
data = None
if data is None:
# Duplicate DBStore.load, because we need to keep track
# of the expiry date to set it properly in the cache.
try:
s = self.model.objects.get(
session_key=self.session_key,
expire_date__gt=timezone.now()
)
data = self.decode(s.session_data)
self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
except (self.model.DoesNotExist, SuspiciousOperation) as e:
if isinstance(e, SuspiciousOperation):
logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
logger.warning(force_text(e))
self._session_key = None
data = {}
return data
def get_expiry_age(self, **kwargs):
"""Get the number of seconds until the session expires.
Optionally, this function accepts `modification` and `expiry` keyword
arguments specifying the modification and expiry of the session.
"""
try:
modification = kwargs['modification']
except KeyError:
modification = timezone.now()
# Make the difference between "expiry=None passed in kwargs" and
# "expiry not passed in kwargs", in order to guarantee not to trigger
# self.load() when expiry is provided.
try:
expiry = kwargs['expiry']
except KeyError:
expiry = self.get('_session_expiry')
if not expiry: # Checks both None and 0 cases
return settings.SESSION_COOKIE_AGE
if not isinstance(expiry, datetime):
return expiry
delta = expiry - modification
return delta.days * 86400 + delta.seconds
def get_expiry_date(self, **kwargs):
"""Get session the expiry date (as a datetime object).
Optionally, this function accepts `modification` and `expiry` keyword
arguments specifying the modification and expiry of the session.
"""
try:
modification = kwargs['modification']
except KeyError:
modification = timezone.now()
# Same comment as in get_expiry_age
try:
expiry = kwargs['expiry']
except KeyError:
expiry = self.get('_session_expiry')
if isinstance(expiry, datetime):
return expiry
if not expiry: # Checks both None and 0 cases
expiry = settings.SESSION_COOKIE_AGE
return modification + timedelta(seconds=expiry)
def validate_token_age(callback_token):
"""
Returns True if a given token is within the age expiration limit.
"""
try:
token = CallbackToken.objects.get(key=callback_token, is_active=True)
seconds = (timezone.now() - token.created_at).total_seconds()
token_expiry_time = api_settings.PASSWORDLESS_TOKEN_EXPIRE_TIME
if seconds <= token_expiry_time:
return True
else:
# Invalidate our token.
token.is_active = False
token.save()
return False
except CallbackToken.DoesNotExist:
# No valid token.
return False
def fetch_og_preview(content, urls):
"""Fetch first opengraph entry for a list of urls."""
for url in urls:
# See first if recently cached already
if OpenGraphCache.objects.filter(url=url, modified__gte=now() - datetime.timedelta(days=7)).exists():
opengraph = OpenGraphCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(opengraph=opengraph)
return opengraph
try:
og = OpenGraph(url=url, parser="lxml")
except AttributeError:
continue
if not og or ("title" not in og and "site_name" not in og and "description" not in og and "image" not in og):
continue
try:
title = og.title if "title" in og else og.site_name if "site_name" in og else ""
description = og.description if "description" in og else ""
image = og.image if "image" in og and not content.is_nsfw else ""
try:
with transaction.atomic():
opengraph = OpenGraphCache.objects.create(
url=url,
title=truncate_letters(safe_text(title), 250),
description=safe_text(description),
image=safe_text(image),
)
except DataError:
continue
except IntegrityError:
# Some other process got ahead of us
opengraph = OpenGraphCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(opengraph=opengraph)
return opengraph
Content.objects.filter(id=content.id).update(opengraph=opengraph)
return opengraph
return False
def fetch_oembed_preview(content, urls):
"""Fetch first oembed content for a list of urls."""
for url in urls:
# See first if recently cached already
if OEmbedCache.objects.filter(url=url, modified__gte=now()-datetime.timedelta(days=7)).exists():
oembed = OEmbedCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(oembed=oembed)
return oembed
# Fetch oembed
options = {}
if url.startswith("https://twitter.com/"):
# This probably has little effect since we fetch these on the backend...
# But, DNT is always good to communicate if possible :)
options = {"dnt": "true"}
try:
oembed = PyEmbed(discoverer=OEmbedDiscoverer()).embed(url, **options)
except (PyEmbedError, PyEmbedDiscoveryError, PyEmbedConsumerError, ValueError):
continue
if not oembed:
continue
# Ensure width is 100% not fixed
oembed = re.sub(r'width="[0-9]*"', 'width="100%"', oembed)
oembed = re.sub(r'height="[0-9]*"', "", oembed)
try:
with transaction.atomic():
oembed = OEmbedCache.objects.create(url=url, oembed=oembed)
except IntegrityError:
# Some other process got ahead of us
oembed = OEmbedCache.objects.get(url=url)
Content.objects.filter(id=content.id).update(oembed=oembed)
return oembed
Content.objects.filter(id=content.id).update(oembed=oembed)
return oembed
return False
def get_recommended_products(cls, lang, family=None, category=None, subcategory=None):
products = []
query = Q(most_sold=True) | Q(product__products_image__principal=True)
if family is not None:
query &= Q(product__family=category)
if category is not None:
query &= Q(product__category=category)
if subcategory is not None:
query &= Q(product__subcategory=subcategory)
for product in cls.query_or(
query,
"{}__slug".format(lang),
"offer",
"created",
"offer",
"pk",
"product__{}__name".format(lang),
"product__model",
"product__brand__{}__name".format(lang),
"product__products_image__image",
"{}__meta_title".format(lang),
slug="{}__slug".format(lang),
meta_title="{}__meta_title".format(lang),
image="product__products_image__image",
name="product__{}__name".format(lang),
pop_annotations=True
):
product['new'] = 1 if (timezone.now() - product['created']).days <= settings.CDNX_PRODUCTS_NOVELTY_DAYS else 0
products.append(product)
return products
def get_products(cls, lang, family=None, category=None, subcategory=None, brand=None):
products = []
query = Q(product__products_image__principal=True)
if family is not None:
query &= Q(product__family=family)
if category is not None:
query &= Q(product__category=category)
if subcategory is not None:
query &= Q(product__subcategory=subcategory)
if brand is not None:
query &= Q(product__brand=brand)
for product in cls.query_or(
query,
"{}__slug".format(lang),
"offer",
"created",
"offer",
"pk",
"product__tax__tax",
"product__{}__name".format(lang),
"product__model",
"product__brand__{}__name".format(lang),
"product__products_image__image",
"{}__meta_title".format(lang),
slug="{}__slug".format(lang),
meta_title="{}__meta_title".format(lang),
image="product__products_image__image",
name="product__{}__name".format(lang),
pop_annotations=True
):
prices = cls.objects.get(pk=product['pk']).calculate_price()
product['price'] = prices['price_total']
product['new'] = 1 if (timezone.now() - product['created']).days <= settings.CDNX_PRODUCTS_NOVELTY_DAYS else 0
products.append(product)
return products
def _save(self, name, content):
# Make sure that the cache stores the file as bytes, like it would be
# on disk.
content = content.read()
try:
content = content.encode()
except AttributeError:
pass
with self._lock.writer():
while name in self.cache:
name = self.get_available_name(name)
self.cache[name] = FakeContent(content, now())
return name
def post_list(request):
posts = Post.objects.filter(published_date__lte=timezone.now()).order_by('published_date')
return render(request, 'blog/post_list.html', {'posts': posts})
def publish(self):
self.published_date = timezone.now()
self.save()
def start_current(self, **data):
try:
self.stop_current()
except exceptions.NoCurrentEntry:
pass
entry = TimeEntry(user=self.user, start=timezone.now(), **data)
entry.save()
return entry
def stop_current(self):
current = self.current()
current.stop = timezone.now()
current.save()
return current