python类utc()的实例源码

operations.py 文件源码 项目:Plamber 作者: OlegKlimenko 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def value_to_db_datetime(self, value):
        if value is None:
            return None
        # MySQL doesn't support tz-aware times
        if timezone.is_aware(value):
            if settings.USE_TZ:
                value = value.astimezone(timezone.utc).replace(tzinfo=None)
            else:
                raise ValueError(
                    "MySQL backend does not support timezone-aware times."
                )
        if not self.connection.features.supports_microsecond_precision:
            value = value.replace(microsecond=0)
        if not self.connection.use_pure:
            return datetime_to_mysql(value)
        return self.connection.converter.to_mysql(value)
test_models.py 文件源码 项目:parkkihubi 作者: City-of-Helsinki 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_parking_str(parking_factory):
    parking = parking_factory(
        time_start=datetime.datetime(2014, 1, 1, 6, 0, 0, tzinfo=utc),
        time_end=datetime.datetime(2016, 1, 1, 7, 0, 0, tzinfo=utc),
        registration_number='ABC-123',
    )
    assert all(str(parking).count(val) == 1 for val in ('2014', '2016', '8', '9', 'ABC-123'))

    parking = parking_factory(
        time_start=datetime.datetime(2016, 1, 1, 6, 0, 0, tzinfo=utc),
        time_end=datetime.datetime(2016, 1, 1, 7, 0, 0, tzinfo=utc),
        registration_number='ABC-123',
    )
    assert all(str(parking).count(val) == 1 for val in ('2016', '8', '9', 'ABC-123'))

    parking = parking_factory(
        time_start=datetime.datetime(2016, 1, 1, 6, 0, 0, tzinfo=utc),
        time_end=None,
        registration_number='ABC-123',
    )
    assert all(str(parking).count(val) == 1 for val in ('2016', '8', 'ABC-123'))
parse_date.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def render(self, datestring):
        """Parses a date-like input string into a timezone aware Python
        datetime.
        """
        formats = ["%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%d %H:%M:%S.%f",
                   "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"]
        if datestring:
            for format in formats:
                try:
                    parsed = datetime.strptime(datestring, format)
                    if not timezone.is_aware(parsed):
                        parsed = timezone.make_aware(parsed, timezone.utc)
                    return parsed
                except Exception:
                    pass
        return None
base.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def adapt_datetime_warn_on_aware_datetime(value, conv):
    # Remove this function and rely on the default adapter in Django 2.0.
    if settings.USE_TZ and timezone.is_aware(value):
        warnings.warn(
            "The MySQL database adapter received an aware datetime (%s), "
            "probably from cursor.execute(). Update your code to pass a "
            "naive datetime in the database connection's time zone (UTC by "
            "default).", RemovedInDjango20Warning)
        # This doesn't account for the database connection's timezone,
        # which isn't known. (That's why this adapter is deprecated.)
        value = value.astimezone(timezone.utc).replace(tzinfo=None)
    return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)

# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
models.py 文件源码 项目:planb 作者: ossobv 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def clone(self, **override):
        # See: https://github.com/django/django/commit/a97ecfdea8
        copy = self.__class__.objects.get(pk=self.pk)
        copy.pk = None
        copy.date_complete = datetime(1970, 1, 2, tzinfo=timezone.utc)
        copy.failure_datetime = None
        copy.queued = copy.running = False
        copy.complete_duration = 0
        copy.backup_size_mb = 0

        # Use the overrides.
        for key, value in override.items():
            setattr(copy, key, value)

        copy.save()
        return copy
test_models.py 文件源码 项目:arxiv-vanity 作者: arxiv-vanity 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_not_expired(self):
        paper = create_paper()
        render1 = create_render(paper=paper)
        render1.created_at = datetime.datetime(1900, 1, 1).replace(tzinfo=timezone.utc)
        render1.save()
        render2 = create_render(paper=paper)

        # haven't updated expired status yet
        qs = Render.objects.not_expired()
        self.assertIn(render1, qs)
        self.assertIn(render2, qs)

        # batch job which updates the expired flag
        Render.objects.update_is_expired()

        # render1 should have expired now
        qs = Render.objects.not_expired()
        self.assertNotIn(render1, qs)
        self.assertIn(render2, qs)
state_changes.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_request_state_from_pond_blocks(request_state, acceptability_threshold, request_blocks):
    active_blocks = False
    future_blocks = False
    now = timezone.now()
    for block in request_blocks:
        start_time = dateutil.parser.parse(block['start']).replace(tzinfo=timezone.utc)
        end_time = dateutil.parser.parse(block['end']).replace(tzinfo=timezone.utc)
        # mark a block as complete if a % of the total exposures of all its molecules are complete
        completion_percent = exposure_completion_percentage_from_pond_block(block)
        if isclose(acceptability_threshold, completion_percent) or completion_percent >= acceptability_threshold:
            return 'COMPLETED'
        if (not block['canceled'] and not any(molecule['failed'] for molecule in block['molecules'])
                and start_time < now < end_time):
            active_blocks = True
        if now < start_time:
            future_blocks = True

    if not (future_blocks or active_blocks):
        return 'FAILED'

    return request_state
test_api.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        super().setUp()
        self.proposal = mixer.blend(Proposal)
        self.user = mixer.blend(User)
        mixer.blend(Profile, user=self.user)
        self.client.force_login(self.user)
        self.semester = mixer.blend(
            Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc),
            end=datetime(2016, 12, 31, tzinfo=timezone.utc)
        )
        self.time_allocation_1m0_sbig = mixer.blend(
            TimeAllocation, proposal=self.proposal, semester=self.semester,
            telescope_class='1m0', instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0,
            too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0
        )

        self.time_allocation_2m0_floyds = mixer.blend(
            TimeAllocation, proposal=self.proposal, semester=self.semester,
            telescope_class='2m0', instrument_name='2M0-FLOYDS-SCICAM', std_allocation=100.0, std_time_used=0.0,
            too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0
        )

        self.membership = mixer.blend(Membership, user=self.user, proposal=self.proposal)
        self.generic_payload = copy.deepcopy(generic_payload)
        self.generic_payload['proposal'] = self.proposal.id
test_api.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        super().setUp()
        self.proposal = mixer.blend(Proposal)
        self.user = mixer.blend(User)
        self.client.force_login(self.user)

        mixer.blend(Membership, user=self.user, proposal=self.proposal)

        semester = mixer.blend(
            Semester,
            id='2016B',
            start=datetime(2016, 9, 1, tzinfo=timezone.utc),
            end=datetime(2016, 12, 31, tzinfo=timezone.utc),
        )

        self.time_allocation_1m0 = mixer.blend(
            TimeAllocation, proposal=self.proposal, semester=semester,
            telescope_class='1m0', std_allocation=100.0, std_time_used=0.0,
            too_allocation=10, too_time_used=0.0, ipp_limit=10.0,
            ipp_time_available=5.0
        )
        self.generic_payload = copy.deepcopy(generic_payload)
        self.generic_payload['proposal'] = self.proposal.id
test_api.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        super().setUp()
        self.proposal = mixer.blend(Proposal)
        self.user = mixer.blend(User)
        self.client.force_login(self.user)

        semester = mixer.blend(Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc),
                               end=datetime(2016, 12, 31, tzinfo=timezone.utc)
                               )
        self.time_allocation_1m0_sbig = mixer.blend(
            TimeAllocation, proposal=self.proposal, semester=semester,
            telescope_class='1m0', instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0,
            std_time_used=0.0, too_allocation=10, too_time_used=0.0, ipp_limit=10.0,
            ipp_time_available=5.0
        )

        self.time_allocation_2m0_floyds = mixer.blend(
            TimeAllocation, proposal=self.proposal, semester=semester,
            telescope_class='2m0', instrument_name='2M0-FLOYDS-SCICAM', std_allocation=100.0, std_time_used=0.0,
            too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0
        )

        mixer.blend(Membership, user=self.user, proposal=self.proposal)
        self.generic_payload = copy.deepcopy(generic_payload)
        self.generic_payload['proposal'] = self.proposal.id
test_api.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        super().setUp()
        self.proposal = mixer.blend(Proposal, id='temp')
        self.semester = mixer.blend(Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc),
                                    end=datetime(2016, 12, 31, tzinfo=timezone.utc))

        self.time_allocation_1m0_sbig = mixer.blend(
            TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='1m0',
            instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0,
            too_allocation=10.0, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=1.0
        )
        self.time_allocation_0m4_sbig = mixer.blend(
            TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='0m4',
            instrument_name='0M4-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0,
            too_allocation=10.0, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=1.0
        )
        self.user = mixer.blend(User)
        mixer.blend(Membership, user=self.user, proposal=self.proposal)
        self.client.force_login(self.user)
        self.generic_payload = copy.deepcopy(generic_payload)
test_request_utils.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_request_intervals_for_one_week(self):
        intervals = get_rise_set_intervals(self.request.as_dict)

        truth_intervals = [(datetime(2016, 10, 1, 0, 0, tzinfo=timezone.utc),
                            datetime(2016, 10, 1, 3, 20, 31, 366820, tzinfo=timezone.utc)),
                           (datetime(2016, 10, 1, 19, 13, 14, 944205, tzinfo=timezone.utc),
                            datetime(2016, 10, 2, 3, 19, 9, 181040, tzinfo=timezone.utc)),
                           (datetime(2016, 10, 2, 19, 9, 19, 241762, tzinfo=timezone.utc),
                            datetime(2016, 10, 3, 3, 17, 47, 117329, tzinfo=timezone.utc)),
                           (datetime(2016, 10, 3, 19, 5, 23, 539011, tzinfo=timezone.utc),
                            datetime(2016, 10, 4, 3, 16, 25, 202612, tzinfo=timezone.utc)),
                           (datetime(2016, 10, 4, 19, 1, 27, 835928, tzinfo=timezone.utc),
                            datetime(2016, 10, 5, 3, 15, 3, 464340, tzinfo=timezone.utc)),
                           (datetime(2016, 10, 5, 18, 57, 32, 132481, tzinfo=timezone.utc),
                            datetime(2016, 10, 6, 3, 12, 5, 895932, tzinfo=timezone.utc)),
                           (datetime(2016, 10, 6, 18, 53, 36, 428629, tzinfo=timezone.utc),
                            datetime(2016, 10, 7, 3, 8, 10, 183626, tzinfo=timezone.utc)),
                           (datetime(2016, 10, 7, 18, 49, 40, 724307, tzinfo=timezone.utc),
                            datetime(2016, 10, 8, 0, 0, tzinfo=timezone.utc))]

        self.assertEqual(intervals, truth_intervals)
test_telescope_states.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_telescope_availability_spans_interval(self, mock_intervals):
        mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc),
                                        datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)),
                                       (datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc),
                                        datetime(2016, 10, 1, 19, 0, 0, tzinfo=timezone.utc)),
                                       (datetime(2016, 10, 1, 19, 10, 0, tzinfo=timezone.utc),
                                        datetime(2016, 10, 1, 19, 20, 0, tzinfo=timezone.utc)),
                                       (datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc),
                                        datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))]
        start = datetime(2016, 9, 30, tzinfo=timezone.utc)
        end = datetime(2016, 10, 2, tzinfo=timezone.utc)
        telescope_availability = get_telescope_availability_per_day(start, end)

        self.assertIn(self.tk1, telescope_availability)
        self.assertIn(self.tk2, telescope_availability)

        doma_available_time = (datetime(2016, 10, 1, 19, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
        doma_available_time += (datetime(2016, 10, 1, 19, 20, 0) - datetime(2016, 10, 1, 19, 10, 0)).total_seconds()
        doma_total_time = doma_available_time

        doma_expected_availability = doma_available_time / doma_total_time
        self.assertAlmostEqual(doma_expected_availability, telescope_availability[self.tk1][0][1])

        domb_expected_availability = 1.0
        self.assertAlmostEqual(domb_expected_availability, telescope_availability[self.tk2][0][1])
downtimedb.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _order_downtime_by_resource(raw_downtime_intervals):
        ''' Puts the raw downtime interval sets into a dictionary by resource
        '''
        downtime_intervals = {}
        for interval in raw_downtime_intervals:
            resource = '.'.join([interval['telescope'], interval['observatory'], interval['site']])
            if resource not in downtime_intervals:
                downtime_intervals[resource] = []
            start = datetime.strptime(interval['start'], DOWNTIME_DATE_FORMAT).replace(tzinfo=timezone.utc)
            end = datetime.strptime(interval['end'], DOWNTIME_DATE_FORMAT).replace(tzinfo=timezone.utc)
            downtime_intervals[resource].append({'type': 'start', 'time': start})
            downtime_intervals[resource].append({'type': 'end', 'time': end})

        for resource in downtime_intervals:
            downtime_intervals[resource] = Intervals(downtime_intervals[resource])

        return downtime_intervals
telescope_states.py 文件源码 项目:valhalla 作者: LCOGT 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None):
        try:
            self.es = Elasticsearch([settings.ELASTICSEARCH_URL])
        except LocationValueError:
            logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.')
            raise ImproperlyConfigured('ELASTICSEARCH_URL')

        self.instrument_types = instrument_types
        self.available_telescopes = self._get_available_telescopes()

        sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites
        telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \
            if not telescopes else telescopes

        self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0)
        self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0)
        cached_event_data = cache.get('tel_event_data')
        if cached_event_data:
            self.event_data = cached_event_data
        else:
            self.event_data = self._get_es_data(sites, telescopes)
            cache.set('tel_event_data', self.event_data, 1800)
operations.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def value_to_db_datetime(self, value):
        if value is None:
            return None
        # MySQL doesn't support tz-aware times
        if timezone.is_aware(value):
            if settings.USE_TZ:
                value = value.astimezone(timezone.utc).replace(tzinfo=None)
            else:
                raise ValueError(
                    "MySQL backend does not support timezone-aware times."
                )
        if not self.connection.features.supports_microsecond_precision:
            value = value.replace(microsecond=0)
        if not self.connection.use_pure:
            return datetime_to_mysql(value)
        return self.connection.converter.to_mysql(value)
operations.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def value_to_db_datetime(self, value):
        if value is None:
            return None
        # MySQL doesn't support tz-aware times
        if timezone.is_aware(value):
            if settings.USE_TZ:
                value = value.astimezone(timezone.utc).replace(tzinfo=None)
            else:
                raise ValueError(
                    "MySQL backend does not support timezone-aware times."
                )
        if not self.connection.features.supports_microsecond_precision:
            value = value.replace(microsecond=0)
        if not self.connection.use_pure:
            return datetime_to_mysql(value)
        return self.connection.converter.to_mysql(value)
events.py 文件源码 项目:wagtail_room_booking 作者: Tamriel 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _occurrences_after_generator(self, after=None, tzinfo=pytz.utc):
        """
        returns a generator that produces unpresisted occurrences after the
        datetime ``after``.
        """

        if after is None:
            after = timezone.now()
        rule = self.get_rrule_object()
        if rule is None:
            if self.end > after:
                yield self._create_occurrence(self.start, self.end)
            raise StopIteration
        date_iter = iter(rule)
        difference = self.end - self.start
        while True:
            o_start = next(date_iter)
            if self.end_recurring_period and o_start > self.end_recurring_period:
                raise StopIteration
            o_end = o_start + difference
            if o_end > after:
                yield self._create_occurrence(o_start, o_end)
pybase.py 文件源码 项目:python-ibmdb-django 作者: ibmdb 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _format_parameters( self, parameters ):
        parameters = list( parameters )
        for index in range( len( parameters ) ):
            # With raw SQL queries, datetimes can reach this function
            # without being converted by DateTimeField.get_db_prep_value.
            if settings.USE_TZ and isinstance( parameters[index], datetime.datetime ):
                param = parameters[index]
                if timezone.is_naive( param ):
                    warnings.warn(u"Received a naive datetime (%s)"
                              u" while time zone support is active." % param,
                              RuntimeWarning)
                    default_timezone = timezone.get_default_timezone()
                    param = timezone.make_aware( param, default_timezone )
                param = param.astimezone(timezone.utc).replace(tzinfo=None)
                parameters[index] = param
        return tuple( parameters )

    # Over-riding this method to modify SQLs which contains format parameter to qmark.
operations.py 文件源码 项目:python-ibmdb-django 作者: ibmdb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def value_to_db_datetime( self, value ):
        if value is None:
            return None

        if( djangoVersion[0:2] <= ( 1, 3 ) ):
            #DB2 doesn't support time zone aware datetime
            if ( value.tzinfo is not None ):
                raise ValueError( "Timezone aware datetime not supported" )
            else:
                return value
        else:
            if is_aware(value):
                if settings.USE_TZ:
                    value = value.astimezone( utc ).replace( tzinfo=None )
                else:
                    raise ValueError( "Timezone aware datetime not supported" )
            return unicode( value )
password_recovery.py 文件源码 项目:django-open-volunteering-platform 作者: OpenVolunteeringPlatform 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create(self, request, *args, **kwargs):
    email = request.data.get('email', None)

    try:
      user = self.get_queryset().get(email__iexact=email)
    except:
      user = None

    if user:
      # Allow only 5 requests per hour
      limit = 5
      now = timezone.now()
      to_check = (now - relativedelta(hours=1)).replace(tzinfo=timezone.utc)
      tokens = models.PasswordRecoveryToken.objects.filter(user=user, created_date__gte=to_check, channel__slug=request.channel)

      if tokens.count() >= limit:
        will_release = tokens.order_by('-created_date')[limit-1].created_date + relativedelta(hours=1)
        seconds = abs((will_release - now).seconds)
        return response.Response({'success': False, 'message': 'Five tokens generated last hour.', 'try_again_in': seconds}, status=status.HTTP_429_TOO_MANY_REQUESTS)

      token = models.PasswordRecoveryToken.objects.create(user=user, object_channel=request.channel)

    return response.Response({'success': True, 'message': 'Token requested successfully(if user exists).'})
utils.py 文件源码 项目:graphene-jwt-auth 作者: darwin4031 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def jwt_blacklist_set_handler(payload):
    """
    Default implementation that blacklists a jwt token.
    Should return a black listed token or None.
    """
    try:
        data = {
            'jti': payload.get('jti'),
            'created': now(),
            'expires': datetime.fromtimestamp(payload.get('exp'), tz=utc)
        }
        token = JWTBlacklistToken.objects.create(**data)
    except (TypeError, IntegrityError, Exception):
        return None
    else:
        return token
models.py 文件源码 项目:guifiadmin 作者: guifi-org 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_invoices(self, date):
        # monthly quotas
        start = make_aware(datetime.datetime(date.year, date.month, 1), utc)
        if date.month == 12:
            year = date.year + 1
            month = 1
        else:
            year = date.year
            month = date.month + 1
        end = make_aware(datetime.datetime(year, month, 1), utc)
        for quota in self.filter(quota_type__periodicity=QuotaType.MONTHLY):
            if not quota.quotainvoice_set.filter(date__range=(start, end)).exists():
                print "quota necesita invoice:", quota.create_invoice(date)

        # yearly quotas
        start = make_aware(datetime.datetime(date.year, 1, 1), utc)
        end = make_aware(datetime.datetime(date.year + 1, 1, 1), utc)

        quotainvoices = []
        for quota in self.filter(quota_type__periodicity=QuotaType.ANUAL):
            if not quota.quotainvoice_set.filter(date__range=(start, end)).exists():
                quotainvoice = quota.create_invoice(date)
                quotainvoices.append(quotainvoice)
        return quotainvoices
admin.py 文件源码 项目:guifiadmin 作者: guifi-org 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def needs_invoice(self, obj):
        if obj.last_invoice_date is None:
            return True
        date = now()
        if obj.quota_type.periodicity == obj.quota_type.ANUAL:
            start = make_aware(datetime.datetime(date.year, 1, 1), utc)
            return obj.last_invoice_date < start
        elif obj.quota_type.periodicity == obj.quota_type.ANUAL:
            if date.month == 12:
                year = date.year + 1
                month = 1
            else:
                year = date.year
                month = date.month + 1
            start = make_aware(datetime.datetime(year, month, 1), utc)
            return obj.last_invoice_date < start
        return False
base.py 文件源码 项目:DjangoBlog 作者: 0daybug 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def adapt_datetime_with_timezone_support(value, conv):
    # Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL.
    if settings.USE_TZ:
        if timezone.is_naive(value):
            warnings.warn("MySQL received a naive datetime (%s)"
                          " while time zone support is active." % value,
                          RuntimeWarning)
            default_timezone = timezone.get_default_timezone()
            value = timezone.make_aware(value, default_timezone)
        value = value.astimezone(timezone.utc).replace(tzinfo=None)
    return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)

# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
# Finally, MySQLdb always returns naive datetime objects. However, when
# timezone support is active, Django expects timezone-aware datetime objects.
test_instance.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_json_stores_user_attribute(self, mock_time):
        mock_time.return_value = datetime.utcnow().replace(tzinfo=utc)
        self._publish_transportation_form()

        # make account require phone auth
        self.user.profile.require_auth = True
        self.user.profile.save()

        # submit instance with a request user
        path = os.path.join(
            self.this_directory, 'fixtures', 'transportation', 'instances',
            self.surveys[0], self.surveys[0] + '.xml')

        auth = DigestAuth(self.login_username, self.login_password)
        self._make_submission(path, auth=auth)

        instances = Instance.objects.filter(xform_id=self.xform).all()
        self.assertTrue(len(instances) > 0)

        for instance in instances:
            self.assertEqual(instance.json[SUBMITTED_BY], 'bob')
            # check that the parsed instance's to_dict_for_mongo also contains
            # the _user key, which is what's used by the JSON REST service
            pi = ParsedInstance.objects.get(instance=instance)
            self.assertEqual(pi.to_dict_for_mongo()[SUBMITTED_BY], 'bob')
sync_deleted_instances_fix.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle(self, *args, **kwargs):
        # Reset all sql deletes to None
        Instance.objects.exclude(
            deleted_at=None, xform__downloadable=True).update(deleted_at=None)

        # Get all mongo deletes
        query = '{"$and": [{"_deleted_at": {"$exists": true}}, ' \
                '{"_deleted_at": {"$ne": null}}]}'
        query = json.loads(query)
        xform_instances = settings.MONGO_DB.instances
        cursor = xform_instances.find(query)
        for record in cursor:
            # update sql instance with deleted_at datetime from mongo
            try:
                i = Instance.objects.get(
                    uuid=record["_uuid"],  xform__downloadable=True)
            except Instance.DoesNotExist:
                continue
            else:
                deleted_at = parse_datetime(record["_deleted_at"])
                if not timezone.is_aware(deleted_at):
                    deleted_at = timezone.make_aware(
                        deleted_at, timezone.utc)
                i.set_deleted(deleted_at)
views.py 文件源码 项目:ecs 作者: ecs-org 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_password_reset(request, token=None):
    try:
        email, timestamp = _password_reset_token_factory.parse_token(token)
    except (signing.BadSignature, signing.SignatureExpired):
        return render(request, 'users/password_reset/reset_token_invalid.html', {})

    try:
        user = get_user(email)
    except User.DoesNotExist:
        raise Http404()
    profile = user.profile
    timestamp = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc)
    if profile.last_password_change and profile.last_password_change > timestamp:
        return render(request, 'users/password_reset/token_already_used.html', {})

    form = SetPasswordForm(user, request.POST or None)
    if form.is_valid():
        form.save()
        profile.last_password_change = timezone.now()
        profile.save()
        return render(request, 'users/password_reset/reset_complete.html', {})
    return render(request, 'users/password_reset/reset_form.html', {
        'user': user,
        'form': form,
    })
base.py 文件源码 项目:trydjango18 作者: lucifer-yqh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def adapt_datetime_with_timezone_support(value, conv):
    # Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL.
    if settings.USE_TZ:
        if timezone.is_naive(value):
            warnings.warn("MySQL received a naive datetime (%s)"
                          " while time zone support is active." % value,
                          RuntimeWarning)
            default_timezone = timezone.get_default_timezone()
            value = timezone.make_aware(value, default_timezone)
        value = value.astimezone(timezone.utc).replace(tzinfo=None)
    return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)

# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
# Finally, MySQLdb always returns naive datetime objects. However, when
# timezone support is active, Django expects timezone-aware datetime objects.
authentication.py 文件源码 项目:vaultier 作者: Movile 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def authenticate(self, request, **kwargs):
        """
        Validates a lost_key request by hash, id, and expiration time
        """
        user_hash = request.QUERY_PARAMS.get('hash', None)
        if not user_hash:
            user_hash = request.DATA.get('hash')

        lost_key_id = request.parser_context.get('view').kwargs.get('pk')
        try:
            lost_key = LostKey.objects.get(hash=user_hash,
                                           pk=lost_key_id,
                                           expires_at__gte=datetime.utcnow().
                                           replace(tzinfo=utc),
                                           used=False)

            return lost_key.created_by, lost_key.hash
        except:
            return None


问题


面经


文章

微信
公众号

扫码关注公众号