def value_to_db_datetime(self, value):
if value is None:
return None
# MySQL doesn't support tz-aware times
if timezone.is_aware(value):
if settings.USE_TZ:
value = value.astimezone(timezone.utc).replace(tzinfo=None)
else:
raise ValueError(
"MySQL backend does not support timezone-aware times."
)
if not self.connection.features.supports_microsecond_precision:
value = value.replace(microsecond=0)
if not self.connection.use_pure:
return datetime_to_mysql(value)
return self.connection.converter.to_mysql(value)
python类utc()的实例源码
def test_parking_str(parking_factory):
parking = parking_factory(
time_start=datetime.datetime(2014, 1, 1, 6, 0, 0, tzinfo=utc),
time_end=datetime.datetime(2016, 1, 1, 7, 0, 0, tzinfo=utc),
registration_number='ABC-123',
)
assert all(str(parking).count(val) == 1 for val in ('2014', '2016', '8', '9', 'ABC-123'))
parking = parking_factory(
time_start=datetime.datetime(2016, 1, 1, 6, 0, 0, tzinfo=utc),
time_end=datetime.datetime(2016, 1, 1, 7, 0, 0, tzinfo=utc),
registration_number='ABC-123',
)
assert all(str(parking).count(val) == 1 for val in ('2016', '8', '9', 'ABC-123'))
parking = parking_factory(
time_start=datetime.datetime(2016, 1, 1, 6, 0, 0, tzinfo=utc),
time_end=None,
registration_number='ABC-123',
)
assert all(str(parking).count(val) == 1 for val in ('2016', '8', 'ABC-123'))
def render(self, datestring):
"""Parses a date-like input string into a timezone aware Python
datetime.
"""
formats = ["%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"]
if datestring:
for format in formats:
try:
parsed = datetime.strptime(datestring, format)
if not timezone.is_aware(parsed):
parsed = timezone.make_aware(parsed, timezone.utc)
return parsed
except Exception:
pass
return None
def adapt_datetime_warn_on_aware_datetime(value, conv):
# Remove this function and rely on the default adapter in Django 2.0.
if settings.USE_TZ and timezone.is_aware(value):
warnings.warn(
"The MySQL database adapter received an aware datetime (%s), "
"probably from cursor.execute(). Update your code to pass a "
"naive datetime in the database connection's time zone (UTC by "
"default).", RemovedInDjango20Warning)
# This doesn't account for the database connection's timezone,
# which isn't known. (That's why this adapter is deprecated.)
value = value.astimezone(timezone.utc).replace(tzinfo=None)
return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)
# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
def clone(self, **override):
# See: https://github.com/django/django/commit/a97ecfdea8
copy = self.__class__.objects.get(pk=self.pk)
copy.pk = None
copy.date_complete = datetime(1970, 1, 2, tzinfo=timezone.utc)
copy.failure_datetime = None
copy.queued = copy.running = False
copy.complete_duration = 0
copy.backup_size_mb = 0
# Use the overrides.
for key, value in override.items():
setattr(copy, key, value)
copy.save()
return copy
def test_not_expired(self):
paper = create_paper()
render1 = create_render(paper=paper)
render1.created_at = datetime.datetime(1900, 1, 1).replace(tzinfo=timezone.utc)
render1.save()
render2 = create_render(paper=paper)
# haven't updated expired status yet
qs = Render.objects.not_expired()
self.assertIn(render1, qs)
self.assertIn(render2, qs)
# batch job which updates the expired flag
Render.objects.update_is_expired()
# render1 should have expired now
qs = Render.objects.not_expired()
self.assertNotIn(render1, qs)
self.assertIn(render2, qs)
def get_request_state_from_pond_blocks(request_state, acceptability_threshold, request_blocks):
active_blocks = False
future_blocks = False
now = timezone.now()
for block in request_blocks:
start_time = dateutil.parser.parse(block['start']).replace(tzinfo=timezone.utc)
end_time = dateutil.parser.parse(block['end']).replace(tzinfo=timezone.utc)
# mark a block as complete if a % of the total exposures of all its molecules are complete
completion_percent = exposure_completion_percentage_from_pond_block(block)
if isclose(acceptability_threshold, completion_percent) or completion_percent >= acceptability_threshold:
return 'COMPLETED'
if (not block['canceled'] and not any(molecule['failed'] for molecule in block['molecules'])
and start_time < now < end_time):
active_blocks = True
if now < start_time:
future_blocks = True
if not (future_blocks or active_blocks):
return 'FAILED'
return request_state
def setUp(self):
super().setUp()
self.proposal = mixer.blend(Proposal)
self.user = mixer.blend(User)
mixer.blend(Profile, user=self.user)
self.client.force_login(self.user)
self.semester = mixer.blend(
Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc),
end=datetime(2016, 12, 31, tzinfo=timezone.utc)
)
self.time_allocation_1m0_sbig = mixer.blend(
TimeAllocation, proposal=self.proposal, semester=self.semester,
telescope_class='1m0', instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0,
too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0
)
self.time_allocation_2m0_floyds = mixer.blend(
TimeAllocation, proposal=self.proposal, semester=self.semester,
telescope_class='2m0', instrument_name='2M0-FLOYDS-SCICAM', std_allocation=100.0, std_time_used=0.0,
too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0
)
self.membership = mixer.blend(Membership, user=self.user, proposal=self.proposal)
self.generic_payload = copy.deepcopy(generic_payload)
self.generic_payload['proposal'] = self.proposal.id
def setUp(self):
super().setUp()
self.proposal = mixer.blend(Proposal)
self.user = mixer.blend(User)
self.client.force_login(self.user)
mixer.blend(Membership, user=self.user, proposal=self.proposal)
semester = mixer.blend(
Semester,
id='2016B',
start=datetime(2016, 9, 1, tzinfo=timezone.utc),
end=datetime(2016, 12, 31, tzinfo=timezone.utc),
)
self.time_allocation_1m0 = mixer.blend(
TimeAllocation, proposal=self.proposal, semester=semester,
telescope_class='1m0', std_allocation=100.0, std_time_used=0.0,
too_allocation=10, too_time_used=0.0, ipp_limit=10.0,
ipp_time_available=5.0
)
self.generic_payload = copy.deepcopy(generic_payload)
self.generic_payload['proposal'] = self.proposal.id
def setUp(self):
super().setUp()
self.proposal = mixer.blend(Proposal)
self.user = mixer.blend(User)
self.client.force_login(self.user)
semester = mixer.blend(Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc),
end=datetime(2016, 12, 31, tzinfo=timezone.utc)
)
self.time_allocation_1m0_sbig = mixer.blend(
TimeAllocation, proposal=self.proposal, semester=semester,
telescope_class='1m0', instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0,
std_time_used=0.0, too_allocation=10, too_time_used=0.0, ipp_limit=10.0,
ipp_time_available=5.0
)
self.time_allocation_2m0_floyds = mixer.blend(
TimeAllocation, proposal=self.proposal, semester=semester,
telescope_class='2m0', instrument_name='2M0-FLOYDS-SCICAM', std_allocation=100.0, std_time_used=0.0,
too_allocation=10, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=5.0
)
mixer.blend(Membership, user=self.user, proposal=self.proposal)
self.generic_payload = copy.deepcopy(generic_payload)
self.generic_payload['proposal'] = self.proposal.id
def setUp(self):
super().setUp()
self.proposal = mixer.blend(Proposal, id='temp')
self.semester = mixer.blend(Semester, id='2016B', start=datetime(2016, 9, 1, tzinfo=timezone.utc),
end=datetime(2016, 12, 31, tzinfo=timezone.utc))
self.time_allocation_1m0_sbig = mixer.blend(
TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='1m0',
instrument_name='1M0-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0,
too_allocation=10.0, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=1.0
)
self.time_allocation_0m4_sbig = mixer.blend(
TimeAllocation, proposal=self.proposal, semester=self.semester, telescope_class='0m4',
instrument_name='0M4-SCICAM-SBIG', std_allocation=100.0, std_time_used=0.0,
too_allocation=10.0, too_time_used=0.0, ipp_limit=10.0, ipp_time_available=1.0
)
self.user = mixer.blend(User)
mixer.blend(Membership, user=self.user, proposal=self.proposal)
self.client.force_login(self.user)
self.generic_payload = copy.deepcopy(generic_payload)
def test_request_intervals_for_one_week(self):
intervals = get_rise_set_intervals(self.request.as_dict)
truth_intervals = [(datetime(2016, 10, 1, 0, 0, tzinfo=timezone.utc),
datetime(2016, 10, 1, 3, 20, 31, 366820, tzinfo=timezone.utc)),
(datetime(2016, 10, 1, 19, 13, 14, 944205, tzinfo=timezone.utc),
datetime(2016, 10, 2, 3, 19, 9, 181040, tzinfo=timezone.utc)),
(datetime(2016, 10, 2, 19, 9, 19, 241762, tzinfo=timezone.utc),
datetime(2016, 10, 3, 3, 17, 47, 117329, tzinfo=timezone.utc)),
(datetime(2016, 10, 3, 19, 5, 23, 539011, tzinfo=timezone.utc),
datetime(2016, 10, 4, 3, 16, 25, 202612, tzinfo=timezone.utc)),
(datetime(2016, 10, 4, 19, 1, 27, 835928, tzinfo=timezone.utc),
datetime(2016, 10, 5, 3, 15, 3, 464340, tzinfo=timezone.utc)),
(datetime(2016, 10, 5, 18, 57, 32, 132481, tzinfo=timezone.utc),
datetime(2016, 10, 6, 3, 12, 5, 895932, tzinfo=timezone.utc)),
(datetime(2016, 10, 6, 18, 53, 36, 428629, tzinfo=timezone.utc),
datetime(2016, 10, 7, 3, 8, 10, 183626, tzinfo=timezone.utc)),
(datetime(2016, 10, 7, 18, 49, 40, 724307, tzinfo=timezone.utc),
datetime(2016, 10, 8, 0, 0, tzinfo=timezone.utc))]
self.assertEqual(intervals, truth_intervals)
def test_telescope_availability_spans_interval(self, mock_intervals):
mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)),
(datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 10, 1, 19, 0, 0, tzinfo=timezone.utc)),
(datetime(2016, 10, 1, 19, 10, 0, tzinfo=timezone.utc),
datetime(2016, 10, 1, 19, 20, 0, tzinfo=timezone.utc)),
(datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))]
start = datetime(2016, 9, 30, tzinfo=timezone.utc)
end = datetime(2016, 10, 2, tzinfo=timezone.utc)
telescope_availability = get_telescope_availability_per_day(start, end)
self.assertIn(self.tk1, telescope_availability)
self.assertIn(self.tk2, telescope_availability)
doma_available_time = (datetime(2016, 10, 1, 19, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
doma_available_time += (datetime(2016, 10, 1, 19, 20, 0) - datetime(2016, 10, 1, 19, 10, 0)).total_seconds()
doma_total_time = doma_available_time
doma_expected_availability = doma_available_time / doma_total_time
self.assertAlmostEqual(doma_expected_availability, telescope_availability[self.tk1][0][1])
domb_expected_availability = 1.0
self.assertAlmostEqual(domb_expected_availability, telescope_availability[self.tk2][0][1])
def _order_downtime_by_resource(raw_downtime_intervals):
''' Puts the raw downtime interval sets into a dictionary by resource
'''
downtime_intervals = {}
for interval in raw_downtime_intervals:
resource = '.'.join([interval['telescope'], interval['observatory'], interval['site']])
if resource not in downtime_intervals:
downtime_intervals[resource] = []
start = datetime.strptime(interval['start'], DOWNTIME_DATE_FORMAT).replace(tzinfo=timezone.utc)
end = datetime.strptime(interval['end'], DOWNTIME_DATE_FORMAT).replace(tzinfo=timezone.utc)
downtime_intervals[resource].append({'type': 'start', 'time': start})
downtime_intervals[resource].append({'type': 'end', 'time': end})
for resource in downtime_intervals:
downtime_intervals[resource] = Intervals(downtime_intervals[resource])
return downtime_intervals
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None):
try:
self.es = Elasticsearch([settings.ELASTICSEARCH_URL])
except LocationValueError:
logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.')
raise ImproperlyConfigured('ELASTICSEARCH_URL')
self.instrument_types = instrument_types
self.available_telescopes = self._get_available_telescopes()
sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites
telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \
if not telescopes else telescopes
self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0)
self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0)
cached_event_data = cache.get('tel_event_data')
if cached_event_data:
self.event_data = cached_event_data
else:
self.event_data = self._get_es_data(sites, telescopes)
cache.set('tel_event_data', self.event_data, 1800)
def value_to_db_datetime(self, value):
if value is None:
return None
# MySQL doesn't support tz-aware times
if timezone.is_aware(value):
if settings.USE_TZ:
value = value.astimezone(timezone.utc).replace(tzinfo=None)
else:
raise ValueError(
"MySQL backend does not support timezone-aware times."
)
if not self.connection.features.supports_microsecond_precision:
value = value.replace(microsecond=0)
if not self.connection.use_pure:
return datetime_to_mysql(value)
return self.connection.converter.to_mysql(value)
def value_to_db_datetime(self, value):
if value is None:
return None
# MySQL doesn't support tz-aware times
if timezone.is_aware(value):
if settings.USE_TZ:
value = value.astimezone(timezone.utc).replace(tzinfo=None)
else:
raise ValueError(
"MySQL backend does not support timezone-aware times."
)
if not self.connection.features.supports_microsecond_precision:
value = value.replace(microsecond=0)
if not self.connection.use_pure:
return datetime_to_mysql(value)
return self.connection.converter.to_mysql(value)
def _occurrences_after_generator(self, after=None, tzinfo=pytz.utc):
"""
returns a generator that produces unpresisted occurrences after the
datetime ``after``.
"""
if after is None:
after = timezone.now()
rule = self.get_rrule_object()
if rule is None:
if self.end > after:
yield self._create_occurrence(self.start, self.end)
raise StopIteration
date_iter = iter(rule)
difference = self.end - self.start
while True:
o_start = next(date_iter)
if self.end_recurring_period and o_start > self.end_recurring_period:
raise StopIteration
o_end = o_start + difference
if o_end > after:
yield self._create_occurrence(o_start, o_end)
def _format_parameters( self, parameters ):
parameters = list( parameters )
for index in range( len( parameters ) ):
# With raw SQL queries, datetimes can reach this function
# without being converted by DateTimeField.get_db_prep_value.
if settings.USE_TZ and isinstance( parameters[index], datetime.datetime ):
param = parameters[index]
if timezone.is_naive( param ):
warnings.warn(u"Received a naive datetime (%s)"
u" while time zone support is active." % param,
RuntimeWarning)
default_timezone = timezone.get_default_timezone()
param = timezone.make_aware( param, default_timezone )
param = param.astimezone(timezone.utc).replace(tzinfo=None)
parameters[index] = param
return tuple( parameters )
# Over-riding this method to modify SQLs which contains format parameter to qmark.
def value_to_db_datetime( self, value ):
if value is None:
return None
if( djangoVersion[0:2] <= ( 1, 3 ) ):
#DB2 doesn't support time zone aware datetime
if ( value.tzinfo is not None ):
raise ValueError( "Timezone aware datetime not supported" )
else:
return value
else:
if is_aware(value):
if settings.USE_TZ:
value = value.astimezone( utc ).replace( tzinfo=None )
else:
raise ValueError( "Timezone aware datetime not supported" )
return unicode( value )
password_recovery.py 文件源码
项目:django-open-volunteering-platform
作者: OpenVolunteeringPlatform
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def create(self, request, *args, **kwargs):
email = request.data.get('email', None)
try:
user = self.get_queryset().get(email__iexact=email)
except:
user = None
if user:
# Allow only 5 requests per hour
limit = 5
now = timezone.now()
to_check = (now - relativedelta(hours=1)).replace(tzinfo=timezone.utc)
tokens = models.PasswordRecoveryToken.objects.filter(user=user, created_date__gte=to_check, channel__slug=request.channel)
if tokens.count() >= limit:
will_release = tokens.order_by('-created_date')[limit-1].created_date + relativedelta(hours=1)
seconds = abs((will_release - now).seconds)
return response.Response({'success': False, 'message': 'Five tokens generated last hour.', 'try_again_in': seconds}, status=status.HTTP_429_TOO_MANY_REQUESTS)
token = models.PasswordRecoveryToken.objects.create(user=user, object_channel=request.channel)
return response.Response({'success': True, 'message': 'Token requested successfully(if user exists).'})
def jwt_blacklist_set_handler(payload):
"""
Default implementation that blacklists a jwt token.
Should return a black listed token or None.
"""
try:
data = {
'jti': payload.get('jti'),
'created': now(),
'expires': datetime.fromtimestamp(payload.get('exp'), tz=utc)
}
token = JWTBlacklistToken.objects.create(**data)
except (TypeError, IntegrityError, Exception):
return None
else:
return token
def create_invoices(self, date):
# monthly quotas
start = make_aware(datetime.datetime(date.year, date.month, 1), utc)
if date.month == 12:
year = date.year + 1
month = 1
else:
year = date.year
month = date.month + 1
end = make_aware(datetime.datetime(year, month, 1), utc)
for quota in self.filter(quota_type__periodicity=QuotaType.MONTHLY):
if not quota.quotainvoice_set.filter(date__range=(start, end)).exists():
print "quota necesita invoice:", quota.create_invoice(date)
# yearly quotas
start = make_aware(datetime.datetime(date.year, 1, 1), utc)
end = make_aware(datetime.datetime(date.year + 1, 1, 1), utc)
quotainvoices = []
for quota in self.filter(quota_type__periodicity=QuotaType.ANUAL):
if not quota.quotainvoice_set.filter(date__range=(start, end)).exists():
quotainvoice = quota.create_invoice(date)
quotainvoices.append(quotainvoice)
return quotainvoices
def needs_invoice(self, obj):
if obj.last_invoice_date is None:
return True
date = now()
if obj.quota_type.periodicity == obj.quota_type.ANUAL:
start = make_aware(datetime.datetime(date.year, 1, 1), utc)
return obj.last_invoice_date < start
elif obj.quota_type.periodicity == obj.quota_type.ANUAL:
if date.month == 12:
year = date.year + 1
month = 1
else:
year = date.year
month = date.month + 1
start = make_aware(datetime.datetime(year, month, 1), utc)
return obj.last_invoice_date < start
return False
def adapt_datetime_with_timezone_support(value, conv):
# Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL.
if settings.USE_TZ:
if timezone.is_naive(value):
warnings.warn("MySQL received a naive datetime (%s)"
" while time zone support is active." % value,
RuntimeWarning)
default_timezone = timezone.get_default_timezone()
value = timezone.make_aware(value, default_timezone)
value = value.astimezone(timezone.utc).replace(tzinfo=None)
return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)
# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
# Finally, MySQLdb always returns naive datetime objects. However, when
# timezone support is active, Django expects timezone-aware datetime objects.
def test_json_stores_user_attribute(self, mock_time):
mock_time.return_value = datetime.utcnow().replace(tzinfo=utc)
self._publish_transportation_form()
# make account require phone auth
self.user.profile.require_auth = True
self.user.profile.save()
# submit instance with a request user
path = os.path.join(
self.this_directory, 'fixtures', 'transportation', 'instances',
self.surveys[0], self.surveys[0] + '.xml')
auth = DigestAuth(self.login_username, self.login_password)
self._make_submission(path, auth=auth)
instances = Instance.objects.filter(xform_id=self.xform).all()
self.assertTrue(len(instances) > 0)
for instance in instances:
self.assertEqual(instance.json[SUBMITTED_BY], 'bob')
# check that the parsed instance's to_dict_for_mongo also contains
# the _user key, which is what's used by the JSON REST service
pi = ParsedInstance.objects.get(instance=instance)
self.assertEqual(pi.to_dict_for_mongo()[SUBMITTED_BY], 'bob')
sync_deleted_instances_fix.py 文件源码
项目:fieldsight-kobocat
作者: awemulya
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def handle(self, *args, **kwargs):
# Reset all sql deletes to None
Instance.objects.exclude(
deleted_at=None, xform__downloadable=True).update(deleted_at=None)
# Get all mongo deletes
query = '{"$and": [{"_deleted_at": {"$exists": true}}, ' \
'{"_deleted_at": {"$ne": null}}]}'
query = json.loads(query)
xform_instances = settings.MONGO_DB.instances
cursor = xform_instances.find(query)
for record in cursor:
# update sql instance with deleted_at datetime from mongo
try:
i = Instance.objects.get(
uuid=record["_uuid"], xform__downloadable=True)
except Instance.DoesNotExist:
continue
else:
deleted_at = parse_datetime(record["_deleted_at"])
if not timezone.is_aware(deleted_at):
deleted_at = timezone.make_aware(
deleted_at, timezone.utc)
i.set_deleted(deleted_at)
def do_password_reset(request, token=None):
try:
email, timestamp = _password_reset_token_factory.parse_token(token)
except (signing.BadSignature, signing.SignatureExpired):
return render(request, 'users/password_reset/reset_token_invalid.html', {})
try:
user = get_user(email)
except User.DoesNotExist:
raise Http404()
profile = user.profile
timestamp = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc)
if profile.last_password_change and profile.last_password_change > timestamp:
return render(request, 'users/password_reset/token_already_used.html', {})
form = SetPasswordForm(user, request.POST or None)
if form.is_valid():
form.save()
profile.last_password_change = timezone.now()
profile.save()
return render(request, 'users/password_reset/reset_complete.html', {})
return render(request, 'users/password_reset/reset_form.html', {
'user': user,
'form': form,
})
def adapt_datetime_with_timezone_support(value, conv):
# Equivalent to DateTimeField.get_db_prep_value. Used only by raw SQL.
if settings.USE_TZ:
if timezone.is_naive(value):
warnings.warn("MySQL received a naive datetime (%s)"
" while time zone support is active." % value,
RuntimeWarning)
default_timezone = timezone.get_default_timezone()
value = timezone.make_aware(value, default_timezone)
value = value.astimezone(timezone.utc).replace(tzinfo=None)
return Thing2Literal(value.strftime("%Y-%m-%d %H:%M:%S.%f"), conv)
# MySQLdb-1.2.1 returns TIME columns as timedelta -- they are more like
# timedelta in terms of actual behavior as they are signed and include days --
# and Django expects time, so we still need to override that. We also need to
# add special handling for SafeText and SafeBytes as MySQLdb's type
# checking is too tight to catch those (see Django ticket #6052).
# Finally, MySQLdb always returns naive datetime objects. However, when
# timezone support is active, Django expects timezone-aware datetime objects.
def authenticate(self, request, **kwargs):
"""
Validates a lost_key request by hash, id, and expiration time
"""
user_hash = request.QUERY_PARAMS.get('hash', None)
if not user_hash:
user_hash = request.DATA.get('hash')
lost_key_id = request.parser_context.get('view').kwargs.get('pk')
try:
lost_key = LostKey.objects.get(hash=user_hash,
pk=lost_key_id,
expires_at__gte=datetime.utcnow().
replace(tzinfo=utc),
used=False)
return lost_key.created_by, lost_key.hash
except:
return None