python类now()的实例源码

utils.py 文件源码 项目:stregsystemet 作者: f-klubben 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def make_inactive_productlist_query(queryset):
    now = timezone.now()
    # Create a query of things are definitively inactive. Some of the ones
    # filtered here might be out of stock, but we include that later.
    inactive_candidates = (
        queryset
        .exclude(
            Q(active=True)
            & (Q(deactivate_date=None) | Q(deactivate_date__gte=now)))
        .values("id")
    )
    inactive_out_of_stock = (
        queryset
        .filter(sale__timestamp__gt=F("start_date"))
        .annotate(c=Count("sale__id"))
        .filter(c__gte=F("quantity"))
        .values("id")
    )
    return (
        queryset
        .filter(
            Q(id__in=inactive_candidates)
            | Q(id__in=inactive_out_of_stock))
    )
models.py 文件源码 项目:Bitpoll 作者: fsinfuhh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def send(self, request):
        if (not self.last_email) or self.last_email + timedelta(hours=12) < now():  # TODO: TIMEDELTA mit config
            old_lang = translation.get_language()
            translation.activate(self.user.language)
            link = reverse('poll_vote', args=(self.poll.url,))  # TODO: hier direkt das poll oder das Vote?
            email_content = render_to_string('invitations/mail_invite.txt', {
                'receiver': self.user.username,
                'creator': self.creator.username,
                'link': link
            })
            try:
                send_mail("Invitation to vote on {}".format(self.poll.title), email_content, None, [self.user.email])
                self.last_email = now()
                self.save()
            except SMTPRecipientsRefused:
                translation.activate(old_lang)
                messages.error(
                    request, _("The mail server had an error sending the notification to {}".format(self.user.username))
                )
            translation.activate(old_lang)
        else:
            messages.error(
                request, _("You have send an Email for {} in the last 12 Hours".format(self.user.username))
            )
views.py 文件源码 项目:stregsystemet 作者: f-klubben 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def razzia_wizard(request):
    if request.method == 'POST':
        return redirect(
            reverse("razzia_view") + "?start={0}-{1}-{2}&end={3}-{4}-{5}&products={6}&username=&razzia_title={7}"
            .format(int(request.POST['start_year']),
                    int(request.POST['start_month']),
                    int(request.POST['start_day']),
                    int(request.POST['end_year']), int(request.POST['end_month']),
                    int(request.POST['end_day']),
                    request.POST.get('products'),
                    request.POST.get('razzia_title')))

    suggested_start_date = timezone.now() - datetime.timedelta(days=-180)
    suggested_end_date = timezone.now()

    start_date_picker = fields.DateField(
        widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)]))
    end_date_picker = fields.DateField(
        widget=extras.SelectDateWidget(years=[x for x in range(2000, timezone.now().year + 1)]))

    return render(request, 'admin/stregsystem/razzia/wizard.html',
                  {
                      'start_date_picker': start_date_picker.widget.render("start", suggested_start_date),
                      'end_date_picker': end_date_picker.widget.render("end", suggested_end_date)},
                  )
views.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def nodeinfo_view(request):
    """Generate a NodeInfo document."""
    site = Site.objects.get_current()
    usage = {"users": {}}
    if settings.SOCIALHOME_STATISTICS:
        usage = {
            "users": {
                "total": User.objects.count(),
                "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(),
                "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(),
            },
            "localPosts": Content.objects.filter(author__user__isnull=False, content_type=ContentType.CONTENT).count(),
            "localComments": Content.objects.filter(author__user__isnull=False, content_type=ContentType.REPLY).count(),
        }
    nodeinfo = NodeInfo(
        software={"name": "socialhome", "version": version},
        protocols={"inbound": ["diaspora"], "outbound": ["diaspora"]},
        services={"inbound": [], "outbound": []},
        open_registrations=settings.ACCOUNT_ALLOW_REGISTRATION,
        usage=usage,
        metadata={"nodeName": site.name}
    )
    return JsonResponse(nodeinfo.doc)
test_views.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_view_responds_stats_on(self):
        self.get(NODEINFO_DOCUMENT_PATH)
        self.response_200()
        self.assertEqual(
            json.loads(decode_if_bytes(self.last_response.content))["usage"],
            {
                "users": {
                    "total": User.objects.count(),
                    "activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(),
                    "activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(),
                },
                "localPosts": Content.objects.filter(
                    author__user__isnull=False, content_type=ContentType.CONTENT).count(),
                "localComments": Content.objects.filter(
                    author__user__isnull=False, content_type=ContentType.REPLY).count(),
            }
        )
mentions.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 159 收藏 0 点赞 0 评论 0
def mention_event(project, group, tenant, event=None):
    ts = to_timestamp(timezone.now())
    id = '%s/%s' % (group.id, event.id if event is not None else '-')
    item = json.dumps(
        {
            'project': project.id,
            'group': group.id,
            'event': event.id if event is not None else None,
            'last_mentioned': ts,
        }
    )

    expires = (RECENT_HOURS + 1) * 60 * 60
    with cluster.map() as client:
        key = get_key(tenant)
        client.zadd(key, ts, id)
        client.expire(key, expires)
        client.setex('%s:%s' % (key, id), expires, item)
        client.zremrangebyscore(key, '-inf', time.time() - (RECENT_HOURS * 60))
        client.zremrangebyrank(key, 0, -MAX_RECENT - 1)
test_oauth2_models.py 文件源码 项目:django-tdd-restful-api 作者: elastic7327 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_create_oauth2_token(self):
        admin_user = mixer.blend('auth.User', is_staff=True, is_superuser=True)
        app = Application.objects.create(
                name='SuperAPI OAUTH2 APP',
                user=admin_user,
                client_type=Application.CLIENT_PUBLIC,
                authorization_grant_type=Application.GRANT_PASSWORD,
        )
        assert Application.objects.count() == 1, "Should be equal"

        random = get_random_string(length=16)

        admin_token = AccessToken.objects.create(
                user=admin_user,
                scope='read write',
                # ?? ????? . . .
                expires=timezone.now() + timedelta(minutes=5),
                token=f'{random}---{admin_user.username}',
                application=app
        )

        assert admin_token is not None, "??? ???"
models.py 文件源码 项目:django_pipedrive 作者: MasAval 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_modifications(cls, instance, previous, current):

        prev = defaultdict(lambda: None, previous)
        curr = defaultdict(lambda: None, current)

        # Compute difference between previous and current
        diffkeys = set([k for k in prev if prev[k] != curr[k]])
        in_previous_not_current = set([k for k in prev if k not in curr])
        in_current_not_previous = set([k for k in curr if k not in prev])

        diffkeys = diffkeys.union(in_previous_not_current).union(in_current_not_previous)
        current_datetime = timezone.now()

        for key in diffkeys:
            FieldModification.objects.create(
                field_name=key,
                previous_value=prev[key],
                current_value=curr[key],
                content_object=instance,
                created=current_datetime,
            )
utils.py 文件源码 项目:stregsystemet 作者: f-klubben 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def make_active_productlist_query(queryset):
    now = timezone.now()
    # Create a query for the set of products that MIGHT be active. Might
    # because they can be out of stock. Which we compute later
    active_candidates = (
        queryset
        .filter(
            Q(active=True)
            & (Q(deactivate_date=None) | Q(deactivate_date__gte=now)))
    )
    # This query selects all the candidates that are out of stock.
    candidates_out_of_stock = (
        active_candidates
        .filter(sale__timestamp__gt=F("start_date"))
        .annotate(c=Count("sale__id"))
        .filter(c__gte=F("quantity"))
        .values("id")
    )
    # We can now create a query that selects all the candidates which are not
    # out of stock.
    return (
        active_candidates
        .exclude(
            Q(start_date__isnull=False)
            & Q(id__in=candidates_out_of_stock)))
test_tasks.py 文件源码 项目:kuzgun.io 作者: yigitgenc 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_update_and_stop_seeding_that_seeding_return_none(self, mock_get_torrent, mock_hset):
        mock_get_torrent.return_value = self.torrent(
            status='seeding',
            progress=Decimal('100.00'),
            ratio=Decimal('9.99'),
            rateUpload=10500,
            rateDownload=105000,
            stop=mock_stop
        )

        self.torrent_model.created = timezone.now() + timezone.timedelta(hours=-24, seconds=-1)
        self.torrent_model.save()

        self.assertIsNone(update_and_stop_seeding(self.torrent_model.pk))

        mock_get_torrent.assert_called_with(self.torrent_model.hash)
        mock_hset.assert_called_with('torrent:{}'.format(self.torrent_model.pk), 'rate_upload', 0)
h1sync.py 文件源码 项目:tts-bug-bounty-dashboard 作者: 18F 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _sync_report(self, h1_report, now):
            self.stdout.write(f"Synchronizing #{h1_report.id}.")
            scope = h1_report.structured_scope

            # Create or update the report
            report, created = Report.objects.update_or_create(
                defaults=dict(
                    title=h1_report.title,
                    created_at=h1_report.created_at,
                    triaged_at=h1_report.triaged_at,
                    closed_at=h1_report.closed_at,
                    disclosed_at=h1_report.disclosed_at,
                    state=h1_report.state,
                    issue_tracker_reference_url=h1_report.issue_tracker_reference_url or "",
                    weakness=h1_report.weakness.name if h1_report.weakness else "",
                    asset_identifier=scope and scope.asset_identifier,
                    asset_type=scope and scope.asset_type,
                    is_eligible_for_bounty=scope and scope.eligible_for_bounty,
                    last_synced_at=now,
                ),
                id=h1_report.id
            )

            self._sync_bounties(report, h1_report)
            self._sync_activities(report, h1_report)
views.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def build_rebuild(request, build_id):
    build = get_object_or_404(Build, id=build_id)

    if not request.user.is_staff:
        return HttpResponseForbidden(
            'You are not authorized to rebuild builds')

    rebuild = Rebuild(
        build=build,
        user=request.user,
        status='queued',
    )
    rebuild.save()

    if not build.log:
        build.log = ''

    build.log += '\n=== Build restarted at {} by {} ===\n'.format(
        timezone.now(), request.user.username)
    build.current_rebuild = rebuild
    build.save()

    return HttpResponseRedirect('/builds/{}'.format(build.id))
metaci_scheduled_jobs.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        job, created = RepeatableJob.objects.get_or_create(
            callable = 'metaci.build.tasks.check_waiting_builds',
            enabled = True,
            name = 'check_waiting_builds',
            queue = 'short',
            defaults={
                'interval': 1,
                'interval_unit': 'minutes',
                'scheduled_time': timezone.now(),
            }
        )
        if created:
            self.stdout.write(self.style.SUCCESS('Created job check_waiting_builds with id {}'.format(job.id)))
        else:
            self.stdout.write(self.style.SUCCESS('Scheduled job check_waiting_builds with id {} already exists and is {}.'.format(job.id, 'enabled' if job.enabled else 'disabled')))
tests.py 文件源码 项目:Peru 作者: ESEGroup 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def teste_cenario_1(self):
        inicio = timezone.now()
        fim = inicio + datetime.timedelta(days=10)
        c_user = Usuario(nome="Test User")
        c_user.save()
        c_local = Localidade(nome="Web")
        c_local.save()
        anunciante = Usuario.objects.get(nome="Test User")
        localidade = Localidade.objects.get(nome = "Web")
        anuncio = Anuncio(anunciante=anunciante, titulo="Choppada Engenharia Eletrônica", descricao="", data_inicio=inicio, data_fim=fim, localidade=localidade)
        self.assertIs(anuncio.publicar(), None)

    ####################################################
    #Cenário 2:
    #
    #Título: Choppada de  Engenharia Eletrônica, de Engenharia de Controle e Automação, de Engenharia de  Computação e Informação, de Engenharia de Produção, de Engenharia Metalúrgica, de Psicologia e de Ciências Sociais (inválido)
    #Data Inicio: data atual (válido)
    #Data fim: data atual + 10 dias (válido)
    ####################################################
tests.py 文件源码 项目:Peru 作者: ESEGroup 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def teste_cenario_2(self):
        c_user = Usuario(nome="Test User")
        c_user.save()
        c_local = Localidade(nome="Web")
        c_local.save()

        anunciante = Usuario.objects.get(nome="Test User")
        localidade = Localidade.objects.get(nome = "Web")
        inicio = timezone.now()
        fim = inicio + datetime.timedelta(days=10)
        titulo = "Choppada de  Engenharia Eletrônica, de Engenharia de Controle e Automação, de Engenharia de  Computação e Informação, de Engenharia de Produção, de Engenharia Metalúrgica, de Psicologia e de Ciências Sociais"
        anuncio = Anuncio(anunciante=anunciante, titulo=titulo, data_inicio=inicio, data_fim=fim, localidade=localidade)
        self.assertIsNot(anuncio.publicar(), None)

    ####################################################
    #Cenário 3:
    #
    #Título: Choppada Engenharia Eletrônica (válido)
    #Data Inicio: em branco (inválido)
    #Data fim: data atual + 10 dias (válido)
    ####################################################
tests.py 文件源码 项目:Peru 作者: ESEGroup 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def teste_cenario_3(self):
        c_user = Usuario(nome="Test User")
        c_user.save()
        c_local = Localidade(nome="Web")
        c_local.save()

        anunciante = Usuario.objects.get(nome="Test User")
        localidade = Localidade.objects.get(nome = "Web")
        fim = timezone.now() + datetime.timedelta(days=10)
        titulo = "Choppada de  Engenharia Eletrônica, de Engenharia de Controle e Automação, de Engenharia de  Computação e Informação, de Engenharia de Produção, de Engenharia Metalúrgica, de Psicologia e de Ciências Sociais"
        anuncio = Anuncio(anunciante=anunciante, titulo=titulo, data_fim=fim, localidade=localidade)
        self.assertIsNot(anuncio.publicar(), None)

    ####################################################
    #Cenário 4:
    #
    #Título: Choppada Engenharia Eletrônica (válido)
    #Data Inicio: em branco (inválido)
    #Data fim: data atual + 10 dias (válido)
    ####################################################
valid_parking.py 文件源码 项目:parkkihubi 作者: City-of-Helsinki 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def filter_time(self, queryset, name, value):
        """
        Filter to valid parkings at given time stamp.

        If there is no valid parkings at given time, but there is a
        parking within a day from given time, then return the parking
        that has the latest ending time.

        :type queryset: parkings.models.ParkingQuerySet
        :type name: str
        :type value: datetime.datetime
        """
        time = value if value else timezone.now()
        valid_parkings = queryset.valid_at(time)
        if valid_parkings:
            return valid_parkings
        limit = time - get_time_old_parkings_visible()
        valid_within_limit = queryset.starts_before(time).ends_after(limit)
        return valid_within_limit.order_by('-time_end')[:1]
models.py 文件源码 项目:byro 作者: byro 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def send(self):
        if self.sent:
            raise Exception('This mail has been sent already. It cannot be sent again.')

        from byro.mails.send import mail_send_task
        mail_send_task.apply_async(
            kwargs={
                'to': self.to.split(','),
                'subject': self.subject,
                'body': self.text,
                'sender': self.reply_to,
                'cc': (self.cc or '').split(','),
                'bcc': (self.bcc or '').split(','),
            }
        )

        self.sent = now()
        self.save()
billing.py 文件源码 项目:dj-paypal 作者: HearthSim 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def execute(self):
        """
        Execute the PreparedBillingAgreement by creating and executing a
        matching BillingAgreement.
        """
        # Save the execution time first.
        # If execute() fails, executed_at will be set, with no executed_agreement set.
        self.executed_at = now()
        self.save()

        with transaction.atomic():
            ret = BillingAgreement.execute(self.id)
            ret.user = self.user
            ret.save()
            self.executed_agreement = ret
            self.save()

        return ret
authentication.py 文件源码 项目:kel-api 作者: kelproject 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_identity(self, token):
        """
        Lookup token on identity service and create/update local user.
        """
        logger.info("checking identity server {}".format(settings.KEL["IDENTITY_URL"]))
        params = {"access_token": token}
        resp = requests.get("{}/tokeninfo/".format(settings.KEL["IDENTITY_URL"]), params=params)
        if not resp.ok:
            return None
        payload = resp.json()
        with transaction.atomic():
            user = next(iter(User.objects.filter(username=payload["user"]["username"])), None)
            if user is None:
                user = User.objects.create(username=payload["user"]["username"])
            else:
                user.last_login = timezone.now()
                user.save()
        return user
reports.py 文件源码 项目:django-souvenirs 作者: appsembler 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def customer_monthly_usage(subscription_start, start=None, end=None):
    if start is None:
        start = subscription_start

    # regardless of start, the monthly iterator must use subscription_start for
    # the sake of enumerating.
    periods = iter_months(start=subscription_start,
                          end=end or timezone.now())

    for m, usage in enumerate(usage_for_periods(periods), 1):
        if usage['period']['end'] <= start:
            continue
        usage.update(
            labels=dict(
                year_month=label_year_month_m(m),
                year_quarter=label_year_quarter_m(m),
                year=label_year_m(m),
            ),
        )
        yield usage
reports.py 文件源码 项目:django-souvenirs 作者: appsembler 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def customer_quarterly_usage(subscription_start, start=None, end=None):
    if start is None:
        start = subscription_start

    # regardless of start, the quarterly iterator must use subscription_start
    # for the sake of enumerating.
    periods = iter_quarters(start=subscription_start,
                            end=end or timezone.now())

    for q, usage in enumerate(usage_for_periods(periods), 1):
        if usage['period']['end'] <= start:
            continue
        usage.update(
            labels=dict(
                year_quarter=label_year_quarter_q(q),
                year=label_year_q(q),
            ),
        )
        yield usage
reports.py 文件源码 项目:django-souvenirs 作者: appsembler 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def customer_yearly_usage(subscription_start, start=None, end=None):
    if start is None:
        start = subscription_start

    # regardless of start, the yearly iterator must use subscription_start
    # for the sake of enumerating.
    periods = iter_years(start=subscription_start,
                         end=end or timezone.now())

    for y, usage in enumerate(usage_for_periods(periods), 1):
        if usage['period']['end'] <= start:
            continue
        usage.update(
            labels=dict(
                year=label_year_y(y)
            ),
        )
        yield usage
sync.py 文件源码 项目:serverthrallapi 作者: NullSoldier 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def post(self, request, server_id):
        if 'private_secret' not in request.GET:
            return HttpResponse('missing required param private_secret', status=400)

        server = (Server.objects
            .filter(id=server_id, private_secret=request.GET['private_secret'])
            .first())

        if server is None:
            return HttpResponse('server does not exist', status=404)

        data = json.loads(request.body)

        if 'characters' in data:
            sync_characters_task.delay(server.id, data['characters'], request.GET)

        if 'clans' in data:
            sync_clans_task.delay(server.id, data['clans'])

        server.last_sync = timezone.now()
        server.save()

        delete_old_history.delay()
        return HttpResponse(status=200)
models.py 文件源码 项目:django-virtual-pos 作者: intelligenia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        """
        Guarda el objeto en BD, en realidad lo único que hace es actualizar los datetimes.
        El datetime de actualización se actualiza siempre, el de creación sólo al guardar de nuevas.
        """
        # Datetime con el momento actual en UTC
        now_datetime = datetime.datetime.now()
        # Si no se ha guardado aún, el datetime de creación es la fecha actual
        if not self.id:
            self.creation_datetime = localize_datetime(now_datetime)
        # El datetime de actualización es la fecha actual
        self.last_update_datetime = localize_datetime(now_datetime)
        # Llamada al constructor del padre
        super(VPOSPaymentOperation, self).save(*args, **kwargs)


####################################################################
####################################################################

# Excepción para indicar que la operación charge ha devuelto una respuesta incorrecta o de fallo
models.py 文件源码 项目:django-virtual-pos 作者: intelligenia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
        """
        Guarda el objeto en BD, en realidad lo único que hace es actualizar los datetimes.
        El datetime de actualización se actualiza siempre, el de creación sólo al guardar de nuevas.
        """
        # Datetime con el momento actual en UTC
        now_datetime = datetime.datetime.now()
        # Si no se ha guardado aún, el datetime de creación es la fecha actual
        if not self.id:
            self.creation_datetime = localize_datetime(now_datetime)
        # El datetime de actualización es la fecha actual
        self.last_update_datetime = localize_datetime(now_datetime)
        # Llamada al constructor del padre
        super(VPOSRefundOperation, self).save(*args, **kwargs)


########################################################################################################################
########################################################################################################################
####################################################### TPV Ceca #######################################################
########################################################################################################################
########################################################################################################################
imagemodels.py 文件源码 项目:ecs_sclm 作者: meaningful 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def save(self, *args, **kwargs):
            if self.date_taken is None:
                try:
                    exif_date = self.exif.get('DateTimeOriginal', None)
                    if exif_date is not None:
                        d, t = exif_date.split(" ")
                        year, month, day = d.split(':')
                        hour, minute, second = t.split(':')
                        if getattr(settings, "USE_TZ", False):
                            tz = get_current_timezone()
                            self.date_taken = make_aware(datetime(
                                int(year), int(month), int(day),
                                int(hour), int(minute), int(second)), tz)
                        else:
                            self.date_taken = datetime(
                                int(year), int(month), int(day),
                                int(hour), int(minute), int(second))
                except Exception:
                    pass
            if self.date_taken is None:
                self.date_taken = now()
            super(Image, self).save(*args, **kwargs)
db.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def has_key(self, key, version=None):
        key = self.make_key(key, version=version)
        self.validate_key(key)

        db = router.db_for_read(self.cache_model_class)
        connection = connections[db]
        table = connection.ops.quote_name(self._table)

        if settings.USE_TZ:
            now = datetime.utcnow()
        else:
            now = datetime.now()
        now = now.replace(microsecond=0)

        with connection.cursor() as cursor:
            cursor.execute("SELECT cache_key FROM %s "
                           "WHERE cache_key = %%s and expires > %%s" % table,
                           [key, connection.ops.adapt_datetimefield_value(now)])
            return cursor.fetchone() is not None
db.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _cull(self, db, cursor, now):
        if self._cull_frequency == 0:
            self.clear()
        else:
            connection = connections[db]
            table = connection.ops.quote_name(self._table)
            cursor.execute("DELETE FROM %s WHERE expires < %%s" % table,
                           [connection.ops.adapt_datetimefield_value(now)])
            cursor.execute("SELECT COUNT(*) FROM %s" % table)
            num = cursor.fetchone()[0]
            if num > self._max_entries:
                cull_num = num // self._cull_frequency
                cursor.execute(
                    connection.ops.cache_key_culling_sql() % table,
                    [cull_num])
                cursor.execute("DELETE FROM %s "
                               "WHERE cache_key < %%s" % table,
                               [cursor.fetchone()[0]])
base.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_expiry_age(self, **kwargs):
        """Get the number of seconds until the session expires.

        Optionally, this function accepts `modification` and `expiry` keyword
        arguments specifying the modification and expiry of the session.
        """
        try:
            modification = kwargs['modification']
        except KeyError:
            modification = timezone.now()
        # Make the difference between "expiry=None passed in kwargs" and
        # "expiry not passed in kwargs", in order to guarantee not to trigger
        # self.load() when expiry is provided.
        try:
            expiry = kwargs['expiry']
        except KeyError:
            expiry = self.get('_session_expiry')

        if not expiry:   # Checks both None and 0 cases
            return settings.SESSION_COOKIE_AGE
        if not isinstance(expiry, datetime):
            return expiry
        delta = expiry - modification
        return delta.days * 86400 + delta.seconds


问题


面经


文章

微信
公众号

扫码关注公众号