python类now()的实例源码

base.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_expiry_date(self, **kwargs):
        """Get session the expiry date (as a datetime object).

        Optionally, this function accepts `modification` and `expiry` keyword
        arguments specifying the modification and expiry of the session.
        """
        try:
            modification = kwargs['modification']
        except KeyError:
            modification = timezone.now()
        # Same comment as in get_expiry_age
        try:
            expiry = kwargs['expiry']
        except KeyError:
            expiry = self.get('_session_expiry')

        if isinstance(expiry, datetime):
            return expiry
        if not expiry:   # Checks both None and 0 cases
            expiry = settings.SESSION_COOKIE_AGE
        return modification + timedelta(seconds=expiry)
questioner.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _ask_default(self):
        print("Please enter the default value now, as valid Python")
        print("The datetime and django.utils.timezone modules are available, so you can do e.g. timezone.now()")
        while True:
            if six.PY3:
                # Six does not correctly abstract over the fact that
                # py3 input returns a unicode string, while py2 raw_input
                # returns a bytestring.
                code = input(">>> ")
            else:
                code = input(">>> ").decode(sys.stdin.encoding)
            if not code:
                print("Please enter some code, or 'exit' (with no quotes) to exit.")
            elif code == "exit":
                sys.exit(1)
            else:
                try:
                    return eval(code, {}, {"datetime": datetime_safe, "timezone": timezone})
                except (SyntaxError, NameError) as e:
                    print("Invalid input: %s" % e)
questioner.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def ask_not_null_addition(self, field_name, model_name):
        "Adding a NOT NULL field to a model"
        if not self.dry_run:
            choice = self._choice_input(
                "You are trying to add a non-nullable field '%s' to %s without a default; "
                "we can't do that (the database needs something to populate existing rows).\n"
                "Please select a fix:" % (field_name, model_name),
                [
                    "Provide a one-off default now (will be set on all existing rows)",
                    "Quit, and let me add a default in models.py",
                ]
            )
            if choice == 2:
                sys.exit(3)
            else:
                return self._ask_default()
        return None
questioner.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ask_not_null_alteration(self, field_name, model_name):
        "Changing a NULL field to NOT NULL"
        if not self.dry_run:
            choice = self._choice_input(
                "You are trying to change the nullable field '%s' on %s to non-nullable "
                "without a default; we can't do that (the database needs something to "
                "populate existing rows).\n"
                "Please select a fix:" % (field_name, model_name),
                [
                    "Provide a one-off default now (will be set on all existing rows)",
                    ("Ignore for now, and let me handle existing rows with NULL myself "
                     "(e.g. because you added a RunPython or RunSQL operation to handle "
                     "NULL values in a previous data migration)"),
                    "Quit, and let me add a default in models.py",
                ]
            )
            if choice == 2:
                return NOT_PROVIDED
            elif choice == 3:
                sys.exit(3)
            else:
                return self._ask_default()
        return None
dates.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_dated_queryset(self, **lookup):
        """
        Get a queryset properly filtered according to `allow_future` and any
        extra lookup kwargs.
        """
        qs = self.get_queryset().filter(**lookup)
        date_field = self.get_date_field()
        allow_future = self.get_allow_future()
        allow_empty = self.get_allow_empty()
        paginate_by = self.get_paginate_by(qs)

        if not allow_future:
            now = timezone.now() if self.uses_datetime_field else timezone_today()
            qs = qs.filter(**{'%s__lte' % date_field: now})

        if not allow_empty:
            # When pagination is enabled, it's better to do a cheap query
            # than to load the unpaginated queryset in memory.
            is_empty = len(qs) == 0 if paginate_by is None else not qs.exists()
            if is_empty:
                raise Http404(_("No %(verbose_name_plural)s available") % {
                    'verbose_name_plural': force_text(qs.model._meta.verbose_name_plural)
                })

        return qs
db.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def has_key(self, key, version=None):
        key = self.make_key(key, version=version)
        self.validate_key(key)

        db = router.db_for_read(self.cache_model_class)
        connection = connections[db]
        table = connection.ops.quote_name(self._table)

        if settings.USE_TZ:
            now = datetime.utcnow()
        else:
            now = datetime.now()
        now = now.replace(microsecond=0)

        with connection.cursor() as cursor:
            cursor.execute("SELECT cache_key FROM %s "
                           "WHERE cache_key = %%s and expires > %%s" % table,
                           [key, connection.ops.adapt_datetimefield_value(now)])
            return cursor.fetchone() is not None
db.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _cull(self, db, cursor, now):
        if self._cull_frequency == 0:
            self.clear()
        else:
            connection = connections[db]
            table = connection.ops.quote_name(self._table)
            cursor.execute("DELETE FROM %s WHERE expires < %%s" % table,
                           [connection.ops.adapt_datetimefield_value(now)])
            cursor.execute("SELECT COUNT(*) FROM %s" % table)
            num = cursor.fetchone()[0]
            if num > self._max_entries:
                cull_num = num // self._cull_frequency
                cursor.execute(
                    connection.ops.cache_key_culling_sql() % table,
                    [cull_num])
                cursor.execute("DELETE FROM %s "
                               "WHERE cache_key < %%s" % table,
                               [cursor.fetchone()[0]])
cached_db.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def load(self):
        try:
            data = self._cache.get(self.cache_key)
        except Exception:
            # Some backends (e.g. memcache) raise an exception on invalid
            # cache keys. If this happens, reset the session. See #17810.
            data = None

        if data is None:
            # Duplicate DBStore.load, because we need to keep track
            # of the expiry date to set it properly in the cache.
            try:
                s = self.model.objects.get(
                    session_key=self.session_key,
                    expire_date__gt=timezone.now()
                )
                data = self.decode(s.session_data)
                self._cache.set(self.cache_key, data, self.get_expiry_age(expiry=s.expire_date))
            except (self.model.DoesNotExist, SuspiciousOperation) as e:
                if isinstance(e, SuspiciousOperation):
                    logger = logging.getLogger('django.security.%s' % e.__class__.__name__)
                    logger.warning(force_text(e))
                self._session_key = None
                data = {}
        return data
base.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_expiry_age(self, **kwargs):
        """Get the number of seconds until the session expires.

        Optionally, this function accepts `modification` and `expiry` keyword
        arguments specifying the modification and expiry of the session.
        """
        try:
            modification = kwargs['modification']
        except KeyError:
            modification = timezone.now()
        # Make the difference between "expiry=None passed in kwargs" and
        # "expiry not passed in kwargs", in order to guarantee not to trigger
        # self.load() when expiry is provided.
        try:
            expiry = kwargs['expiry']
        except KeyError:
            expiry = self.get('_session_expiry')

        if not expiry:   # Checks both None and 0 cases
            return settings.SESSION_COOKIE_AGE
        if not isinstance(expiry, datetime):
            return expiry
        delta = expiry - modification
        return delta.days * 86400 + delta.seconds
base.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_expiry_date(self, **kwargs):
        """Get session the expiry date (as a datetime object).

        Optionally, this function accepts `modification` and `expiry` keyword
        arguments specifying the modification and expiry of the session.
        """
        try:
            modification = kwargs['modification']
        except KeyError:
            modification = timezone.now()
        # Same comment as in get_expiry_age
        try:
            expiry = kwargs['expiry']
        except KeyError:
            expiry = self.get('_session_expiry')

        if isinstance(expiry, datetime):
            return expiry
        if not expiry:   # Checks both None and 0 cases
            expiry = settings.SESSION_COOKIE_AGE
        return modification + timedelta(seconds=expiry)
utils.py 文件源码 项目:django-rest-framework-passwordless 作者: aaronn 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def validate_token_age(callback_token):
    """
    Returns True if a given token is within the age expiration limit.
    """
    try:
        token = CallbackToken.objects.get(key=callback_token, is_active=True)
        seconds = (timezone.now() - token.created_at).total_seconds()
        token_expiry_time = api_settings.PASSWORDLESS_TOKEN_EXPIRE_TIME

        if seconds <= token_expiry_time:
            return True
        else:
            # Invalidate our token.
            token.is_active = False
            token.save()
            return False

    except CallbackToken.DoesNotExist:
        # No valid token.
        return False
previews.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def fetch_og_preview(content, urls):
    """Fetch first opengraph entry for a list of urls."""
    for url in urls:
        # See first if recently cached already
        if OpenGraphCache.objects.filter(url=url, modified__gte=now() - datetime.timedelta(days=7)).exists():
            opengraph = OpenGraphCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(opengraph=opengraph)
            return opengraph
        try:
            og = OpenGraph(url=url, parser="lxml")
        except AttributeError:
            continue
        if not og or ("title" not in og and "site_name" not in og and "description" not in og and "image" not in og):
            continue
        try:
            title = og.title if "title" in og else og.site_name if "site_name" in og else ""
            description = og.description if "description" in og else ""
            image = og.image if "image" in og and not content.is_nsfw else ""
            try:
                with transaction.atomic():
                    opengraph = OpenGraphCache.objects.create(
                        url=url,
                        title=truncate_letters(safe_text(title), 250),
                        description=safe_text(description),
                        image=safe_text(image),
                    )
            except DataError:
                continue
        except IntegrityError:
            # Some other process got ahead of us
            opengraph = OpenGraphCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(opengraph=opengraph)
            return opengraph
        Content.objects.filter(id=content.id).update(opengraph=opengraph)
        return opengraph
    return False
previews.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def fetch_oembed_preview(content, urls):
    """Fetch first oembed content for a list of urls."""
    for url in urls:
        # See first if recently cached already
        if OEmbedCache.objects.filter(url=url, modified__gte=now()-datetime.timedelta(days=7)).exists():
            oembed = OEmbedCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(oembed=oembed)
            return oembed
        # Fetch oembed
        options = {}
        if url.startswith("https://twitter.com/"):
            # This probably has little effect since we fetch these on the backend...
            # But, DNT is always good to communicate if possible :)
            options = {"dnt": "true"}
        try:
            oembed = PyEmbed(discoverer=OEmbedDiscoverer()).embed(url, **options)
        except (PyEmbedError, PyEmbedDiscoveryError, PyEmbedConsumerError, ValueError):
            continue
        if not oembed:
            continue
        # Ensure width is 100% not fixed
        oembed = re.sub(r'width="[0-9]*"', 'width="100%"', oembed)
        oembed = re.sub(r'height="[0-9]*"', "", oembed)
        try:
            with transaction.atomic():
                oembed = OEmbedCache.objects.create(url=url, oembed=oembed)
        except IntegrityError:
            # Some other process got ahead of us
            oembed = OEmbedCache.objects.get(url=url)
            Content.objects.filter(id=content.id).update(oembed=oembed)
            return oembed
        Content.objects.filter(id=content.id).update(oembed=oembed)
        return oembed
    return False
models.py 文件源码 项目:django-codenerix-products 作者: centrologic 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_recommended_products(cls, lang, family=None, category=None, subcategory=None):
        products = []
        query = Q(most_sold=True) | Q(product__products_image__principal=True)
        if family is not None:
            query &= Q(product__family=category)
        if category is not None:
            query &= Q(product__category=category)
        if subcategory is not None:
            query &= Q(product__subcategory=subcategory)
        for product in cls.query_or(
            query,
            "{}__slug".format(lang),
            "offer",
            "created",
            "offer",
            "pk",
            "product__{}__name".format(lang),
            "product__model",
            "product__brand__{}__name".format(lang),
            "product__products_image__image",
            "{}__meta_title".format(lang),
            slug="{}__slug".format(lang),
            meta_title="{}__meta_title".format(lang),
            image="product__products_image__image",
            name="product__{}__name".format(lang),
            pop_annotations=True
        ):
            product['new'] = 1 if (timezone.now() - product['created']).days <= settings.CDNX_PRODUCTS_NOVELTY_DAYS else 0
            products.append(product)

        return products
models.py 文件源码 项目:django-codenerix-products 作者: centrologic 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_products(cls, lang, family=None, category=None, subcategory=None, brand=None):
        products = []
        query = Q(product__products_image__principal=True)

        if family is not None:
            query &= Q(product__family=family)
        if category is not None:
            query &= Q(product__category=category)
        if subcategory is not None:
            query &= Q(product__subcategory=subcategory)
        if brand is not None:
            query &= Q(product__brand=brand)
        for product in cls.query_or(
            query,
            "{}__slug".format(lang),
            "offer",
            "created",
            "offer",
            "pk",
            "product__tax__tax",
            "product__{}__name".format(lang),
            "product__model",
            "product__brand__{}__name".format(lang),
            "product__products_image__image",
            "{}__meta_title".format(lang),
            slug="{}__slug".format(lang),
            meta_title="{}__meta_title".format(lang),
            image="product__products_image__image",
            name="product__{}__name".format(lang),
            pop_annotations=True
        ):
            prices = cls.objects.get(pk=product['pk']).calculate_price()
            product['price'] = prices['price_total']
            product['new'] = 1 if (timezone.now() - product['created']).days <= settings.CDNX_PRODUCTS_NOVELTY_DAYS else 0
            products.append(product)

        return products
storage.py 文件源码 项目:django-override-storage 作者: danifus 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _save(self, name, content):
        # Make sure that the cache stores the file as bytes, like it would be
        # on disk.
        content = content.read()
        try:
            content = content.encode()
        except AttributeError:
            pass
        with self._lock.writer():
            while name in self.cache:
                name = self.get_available_name(name)
            self.cache[name] = FakeContent(content, now())
        return name
views.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def post_list(request):
    posts = Post.objects.filter(published_date__lte=timezone.now()).order_by('published_date')
    return render(request, 'blog/post_list.html', {'posts': posts})
models.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def publish(self):
        self.published_date = timezone.now()
        self.save()
user_time_entry_manager.py 文件源码 项目:timeblob 作者: dkieffer 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def start_current(self, **data):
        try:
            self.stop_current()
        except exceptions.NoCurrentEntry:
            pass
        entry = TimeEntry(user=self.user, start=timezone.now(), **data)
        entry.save()
        return entry
user_time_entry_manager.py 文件源码 项目:timeblob 作者: dkieffer 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def stop_current(self):
        current = self.current()
        current.stop = timezone.now()
        current.save()
        return current


问题


面经


文章

微信
公众号

扫码关注公众号