def test_stop_entry(self):
DESCRIPTION = 'EXAMPLE'
startTime = timezone.now()
currentEntry = TimeEntry(user=self.TestUser, description=DESCRIPTION, start=startTime)
currentEntry.save()
url = reverse("api:time-entry-stop")
data = {}
response = self.client.post(url, data)
response_start_time = dateparse.parse_datetime(response.data['start'])
response_stop_time = dateparse.parse_datetime(response.data['stop'])
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['description'], DESCRIPTION)
self.assertEqual(response_start_time, startTime)
self.assertGreater(response_stop_time, response_start_time)
self.assertEqual(response.data['duration'],(response_stop_time- response_start_time).total_seconds())
python类parse_datetime()的实例源码
def update_segment(lsegment, segments):
s = find(lsegment, segments)
segment, created=Segment.objects.get_or_create(
departure_place=Place.objects.get(pk=s['OriginStation']),
arrival_place=Place.objects.get(pk=s['DestinationStation']),
departure=UTC.localize(parse_datetime(s['DepartureDateTime']), is_dst=True),
arrival=UTC.localize(parse_datetime(s['ArrivalDateTime']), is_dst=True),
duration=s['Duration'],
directionality=s['Directionality'],
journey_mode=JourneyMode.objects.get_or_create(name=s['JourneyMode'])[0],
flight_number=s['FlightNumber'],
carrier=Carrier.objects.get(pk=s['Carrier']),
operating_carrier=Carrier.objects.get(pk=s['OperatingCarrier'])
)
return segment
sync_deleted_instances_fix.py 文件源码
项目:fieldsight-kobocat
作者: awemulya
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def handle(self, *args, **kwargs):
# Reset all sql deletes to None
Instance.objects.exclude(
deleted_at=None, xform__downloadable=True).update(deleted_at=None)
# Get all mongo deletes
query = '{"$and": [{"_deleted_at": {"$exists": true}}, ' \
'{"_deleted_at": {"$ne": null}}]}'
query = json.loads(query)
xform_instances = settings.MONGO_DB.instances
cursor = xform_instances.find(query)
for record in cursor:
# update sql instance with deleted_at datetime from mongo
try:
i = Instance.objects.get(
uuid=record["_uuid"], xform__downloadable=True)
except Instance.DoesNotExist:
continue
else:
deleted_at = parse_datetime(record["_deleted_at"])
if not timezone.is_aware(deleted_at):
deleted_at = timezone.make_aware(
deleted_at, timezone.utc)
i.set_deleted(deleted_at)
def dispatch(self, request, *args, **kwargs):
'''
Require session data to be set to proceed, otherwise go back to step 1.
Because they have the same expiration date, this also implies that the
TemporaryRegistration object is not yet expired.
'''
if REG_VALIDATION_STR not in request.session:
return HttpResponseRedirect(reverse('registration'))
try:
self.temporaryRegistration = TemporaryRegistration.objects.get(
id=self.request.session[REG_VALIDATION_STR].get('temporaryRegistrationId')
)
except ObjectDoesNotExist:
messages.error(request,_('Invalid registration identifier passed to sign-up form.'))
return HttpResponseRedirect(reverse('registration'))
expiry = parse_datetime(
self.request.session[REG_VALIDATION_STR].get('temporaryRegistrationExpiry',''),
)
if not expiry or expiry < timezone.now():
messages.info(request,_('Your registration session has expired. Please try again.'))
return HttpResponseRedirect(reverse('registration'))
return super(StudentInfoView,self).dispatch(request,*args,**kwargs)
def dispatch(self,request,*args,**kwargs):
'''
Handle the session data passed by the prior view.
'''
lessonSession = request.session.get(PRIVATELESSON_VALIDATION_STR,{})
try:
self.lesson = PrivateLessonEvent.objects.get(id=lessonSession.get('lesson'))
except (ValueError, ObjectDoesNotExist):
messages.error(request,_('Invalid lesson identifier passed to sign-up form.'))
return HttpResponseRedirect(reverse('bookPrivateLesson'))
expiry = parse_datetime(lessonSession.get('expiry',''),)
if not expiry or expiry < timezone.now():
messages.info(request,_('Your registration session has expired. Please try again.'))
return HttpResponseRedirect(reverse('bookPrivateLesson'))
self.payAtDoor = lessonSession.get('payAtDoor',False)
return super(PrivateLessonStudentInfoView,self).dispatch(request,*args,**kwargs)
def deserialize_instance(model, data):
ret = model()
for k, v in data.items():
if v is not None:
try:
f = model._meta.get_field(k)
if isinstance(f, DateTimeField):
v = dateparse.parse_datetime(v)
elif isinstance(f, TimeField):
v = dateparse.parse_time(v)
elif isinstance(f, DateField):
v = dateparse.parse_date(v)
elif isinstance(f, BinaryField):
v = force_bytes(
base64.b64decode(
force_bytes(v)))
except FieldDoesNotExist:
pass
setattr(ret, k, v)
return ret
def __init__(self, manifest, manifest_url):
dict.__init__(self)
self['manifest'] = manifest
self['manifest_url'] = manifest_url
self['uuid'] = manifest['uuid']
self['name'] = manifest['name']
self['version'] = manifest.get('version', '')
self['created'] = parse_datetime(manifest.get('published_at', '')) or ''
self['ostype'] = Image.os_to_ostype(manifest)
self['desc'] = manifest.get('description', '')
self['homepage'] = manifest.get('homepage')
self['size'] = manifest.get('image_size', Image.DEFAULT_SIZE)
self['state'] = manifest.get('state', '')
try:
self['download_size'] = manifest['files'][0]['size']
except (KeyError, IndexError):
self['download_size'] = 0
def date_condition(date_or_str, **kwargs):
""" Does the current date match the given date?
date_or_str is either a date object or an ISO 8601 string """
try:
date = dateparse.parse_datetime(date_or_str)
except TypeError:
date = date_or_str
now = timezone.now()
try:
date_test = (now >= date)
except TypeError:
date_test = False
return date_test
def datetime_diff_seconds(older_time, newer_time=None):
"""
Return the seconds elapsed between older_time and newer_time. If
newer_time is unset, return the seconds elapsed between older_time and
now. older_time and newer_time are expected to be ISO-formatted datetime
strings.
"""
older_datetime = parse_datetime(older_time)
if not older_datetime.tzinfo:
# if no timezone set (naive datetime) we'll assume it to be UTC.
older_datetime = pytz.UTC.localize(older_datetime)
newer_datetime = parse_datetime(newer_time
if newer_time
else localized_datetime_string_now())
if not newer_datetime.tzinfo:
newer_datetime = pytz.UTC.localize(newer_datetime)
return (newer_datetime - older_datetime).total_seconds()
def test_earliest_start_date(self):
program = {
"courses": [
{
"course_runs": [
{
"start": "2016-01-01T00:00:00Z",
},
{
"start": "2017-01-01T00:00:00Z",
}
]
}
]
}
assert get_earliest_start_date_from_program(program) == parse_datetime('2016-01-01 00:00:00+0000')
def comp_sync():
for round in range(get_current_round(), ROUNDS+1):
logger.info("Fetching round %s/%s...", round, ROUNDS)
start = datetime.now()
r = requests.get("http://api.stats.foxsports.com.au/3.0/api/sports/league/series/1/seasons/115/rounds/"+str(round)+"/fixturesandresultswithbyes.json?userkey=A00239D3-45F6-4A0A-810C-54A347F144C2")
logger.info("%s", r.text)
for game in json.loads(r.text):
logger.info("%s", game["fixture_id"])
stored_game = Game.objects.get(fixture_id=game["fixture_id"])
logger.info("Syncing game %s vs. %s", str(stored_game.home_team), str(stored_game.away_team))
if stored_game.start_time != parse_datetime(game["match_start_date"]):
logger.info("Start time has changed... updating")
stored_game.start_time = parse_datetime(game["match_start_date"])
if stored_game.stadium != game["venue"]["name"]:
logger.info("Venue has changed... updating")
stored_game.stadium = game["venue"]["name"]
stored_game.save()
end = datetime.now()
elapsed_time = end-start
if elapsed_time.total_seconds()<5:
time.sleep(5 - elapsed_time.total_seconds())
payments.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def get_govuk_capture_time(self, govuk_payment):
try:
capture_submit_time = parse_datetime(
govuk_payment['settlement_summary'].get('capture_submit_time', '')
)
captured_date = parse_date(
govuk_payment['settlement_summary'].get('captured_date', '')
)
if captured_date is not None:
capture_submit_time = (
capture_submit_time or timezone.now()
).astimezone(timezone.utc)
if capture_submit_time.date() < captured_date:
return datetime.combine(
captured_date, time.min
).replace(tzinfo=timezone.utc)
elif capture_submit_time.date() > captured_date:
return datetime.combine(
captured_date, time.max
).replace(tzinfo=timezone.utc)
else:
return capture_submit_time
except (KeyError, TypeError):
pass
raise GovUkPaymentStatusException(
'Capture date not yet available for payment %s' % govuk_payment['reference']
)
def handle(self, *args, **kwargs):
# Reset all sql deletes to None
Instance.objects.exclude(
deleted_at=None, xform__downloadable=True).update(deleted_at=None)
# Get all mongo deletes
query = '{"$and": [{"_deleted_at": {"$exists": true}}, ' \
'{"_deleted_at": {"$ne": null}}]}'
query = json.loads(query)
xform_instances = settings.MONGO_DB.instances
cursor = xform_instances.find(query)
for record in cursor:
# update sql instance with deleted_at datetime from mongo
try:
i = Instance.objects.get(
uuid=record["_uuid"], xform__downloadable=True)
except Instance.DoesNotExist:
continue
else:
deleted_at = parse_datetime(record["_deleted_at"])
if not timezone.is_aware(deleted_at):
deleted_at = timezone.make_aware(
deleted_at, timezone.utc)
i.set_deleted(deleted_at)
def deserialize_instance(model, data):
ret = model()
for k, v in data.items():
if v is not None:
try:
f = model._meta.get_field(k)
if isinstance(f, DateTimeField):
v = dateparse.parse_datetime(v)
elif isinstance(f, TimeField):
v = dateparse.parse_time(v)
elif isinstance(f, DateField):
v = dateparse.parse_date(v)
elif isinstance(f, BinaryField):
v = force_bytes(
base64.b64decode(
force_bytes(v)))
except FieldDoesNotExist:
pass
setattr(ret, k, v)
return ret
def test_has_current_entry(self):
'''An entry exists'''
startTime = timezone.now()
currentEntry = TimeEntry(user=self.TestUser, description="Example", start=startTime)
currentEntry.save()
url = reverse("api:time-entry-current")
data = {}
response = self.client.get(url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['description'], "Example")
self.assertEqual(dateparse.parse_datetime(response.data['start']), startTime)
self.assertEqual(response.data['stop'], None)
self.assertGreater(response.data['duration'], 0)
def test_starts_entry_if_none_exists(self):
DESCRIPTION = 'description'
startTime = timezone.now()
url = reverse("api:time-entry-start")
data = {'description': DESCRIPTION}
response = self.client.post(url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['description'], DESCRIPTION)
self.assertGreater(dateparse.parse_datetime(response.data['start']), startTime)
self.assertEqual(response.data['stop'], None)
self.assertGreater(response.data['duration'], 0)
def test_start_entry_with_current_running(self):
DESCRIPTION = 'EXAMPLE'
startTime = timezone.now()
currentEntry = TimeEntry(user=self.TestUser, description=DESCRIPTION, start=startTime)
currentEntry.save()
NEWDESCRIPTION = "NEW DESCRIPTION"
url = reverse("api:time-entry-start")
data = {'description': NEWDESCRIPTION }
response = self.client.post(url, data)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['description'], NEWDESCRIPTION)
self.assertEqual(response.data['stop'], None)
self.assertGreater(dateparse.parse_datetime(response.data['start']), startTime)
self.assertGreater(response.data['duration'], 0)
url = reverse("api:time-entry-get", args=(currentEntry.id,))
response = self.client.get(url, data)
response_start_time = dateparse.parse_datetime(response.data['start'])
response_stop_time = dateparse.parse_datetime(response.data['stop'])
response_duration = response.data['duration']
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data['description'], DESCRIPTION)
self.assertEqual(response.data['user'], self.TestUser.id)
self.assertEqual(response_start_time, startTime)
self.assertGreater(response_stop_time, response_start_time)
self.assertEqual(response_duration,(response_stop_time- response_start_time).total_seconds())
response = self.client.get(url, data)
self.assertEqual(response_duration, response.data['duration'], "We checked the duration twice but it changed from %s to %s" % (response_duration, response.data['duration']))
def convert_datetimefield_value(self, value, expression, connection, context):
if value is not None:
if not isinstance(value, datetime.datetime):
value = parse_datetime(value)
if settings.USE_TZ:
value = timezone.make_aware(value, self.connection.timezone)
return value
def parse_datetime(s):
if not s:
return s
if ciso8601:
return ciso8601.parse_datetime(s)
return dateparse.parse_datetime(s)
def _date_from_string(self, field_name, val):
try:
setattr(self, field_name, dateparse.parse_datetime(val))
except Exception as e:
setattr(self, field_name, val)
logger.warning('can not parse date (raw value used) %s: %s', field_name, e)
def get_legs(result):
legs = result['Legs']
segments = result['Segments']
for leg in legs:
l, created = Leg.objects.get_or_create(
id = leg['Id'],
departure_place = Place.objects.get(pk=leg['OriginStation']),
arrival_place = Place.objects.get(pk=leg['DestinationStation']),
departure = UTC.localize(parse_datetime(leg['Departure']), is_dst=True),
arrival = UTC.localize(parse_datetime(leg['Arrival']), is_dst=True),
duration = leg['Duration'],
directionality = leg['Directionality'],
journey_mode = JourneyMode.objects.get_or_create(name=leg['JourneyMode'])[0]
)
carriers = leg['Carriers']
ocarriers = leg['OperatingCarriers']
stops = leg['Stops']
lsegments = leg['SegmentIds']
for carrier in carriers:
l.carriers.add(Carrier.objects.get(pk=carrier))
for ocarrier in ocarriers:
l.operating_carriers.add(Carrier.objects.get(pk=ocarrier))
for stop in stops:
if stop != 0:
l.stops.add(Place.objects.get(pk=stop))
for lsegment in lsegments:
l.segments.add(update_segment(lsegment, segments))
l.save()
def test_clean_dates(self):
"""
Cleaning models dates follow different type of rules, also depending on event
"""
event = Event.objects.get(title='PyCon SK 2016') # PyCon SK 2016 is in the past
tt = TicketType(title='Test Ticket', price=12, event=event)
# Past event and no dates raises error
self.assertRaises(ValidationError, tt.clean)
# Past event and dates after it raises error
tt.date_from = now
tt.date_to = now + 3 * day
self.assertRaises(ValidationError, tt.clean)
tt.date_from = parse_datetime('2016-01-11T09:00:00Z')
self.assertRaises(ValidationError, tt.clean)
# End date can not be before start date
tt.date_to = tt.date_from - 3 * day
self.assertRaises(ValidationError, tt.clean)
# Correct data in the past before event, SAVE.
tt.date_to = tt.date_from + 9 * day
tt.clean()
tt.save()
# PyCon SK 2054 is in the future
future_event = Event.objects.create(
title='PyCon SK 2054', description='test', event_type=Event.MEETUP, status=Event.PUBLISHED,
location=event.location, cfp_end=distant_future - 7 * day,
date_from=distant_future, date_to=distant_future + 7 * day)
ftt = TicketType(title='Test Future Ticket', price=120, event=future_event)
# Future event pre-populate the dates
ftt.clean()
self.assertEquals(abs(ftt.date_from - timezone.now()).seconds, 0)
self.assertEquals(abs(ftt.date_to - ftt.event.date_to).seconds, 0)
ftt.save()
def convert_time_from_deployd(d_time):
t_time = parse_datetime(d_time)
tzchina = timezone('Asia/Shanghai')
utc = timezone('UTC')
t_time = t_time.replace(tzinfo=utc).astimezone(tzchina)
try:
return t_time.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
logger.error("strftime error:%s d_time:%s", str(e), d_time)
return orc_convert_time_from_deployd(d_time)
def dateStringsToQ(self, field_name, date_from_str, date_to_str):
"""
Convert the date strings from_date_str and to_date_str into a
set of args in the form
{'<field_name>__gte': <date from>, '<field_name>__lte': <date to>}
where date_from and date_to are Django-timezone-aware dates; then
convert that into a Django Q object
Returns the Q object based on those criteria
"""
# one of the values required for the filter is missing, so set
# it to the one which was supplied
if date_from_str == '':
date_from_str = date_to_str
elif date_to_str == '':
date_to_str = date_from_str
date_from_naive = dateparse.parse_datetime(date_from_str + ' 00:00:00')
date_to_naive = dateparse.parse_datetime(date_to_str + ' 23:59:59')
tz = timezone.get_default_timezone()
date_from = timezone.make_aware(date_from_naive, tz)
date_to = timezone.make_aware(date_to_naive, tz)
args = {}
args[field_name + '__gte'] = date_from
args[field_name + '__lte'] = date_to
return Q(**args)
def to_python_datetime(value):
datetime = parse_datetime(value)
if value and not datetime:
raise ValueError('Can\'t convert "{}" to datetime'.format(value))
return datetime
def _parse_datetime(self, strdate):
tz = pytz.timezone(self.event.timezone)
obj = parse_datetime(strdate)
assert obj
if obj.tzinfo is None:
obj = tz.localize(obj)
return obj
def parse_datetime_with_timezone_support(value):
dt = parse_datetime(value)
# Confirm that dt is naive before overwriting its tzinfo.
if dt is not None and settings.USE_TZ and timezone.is_naive(dt):
dt = dt.replace(tzinfo=timezone.utc)
return dt
def insert(self, samples):
try:
outdated = Stats.insert((id, dateparse.parse_datetime(dt), value) for id, dt, value in samples)
except db.IntegrityError:
log.error("Duplicate stats insert: " + db.connection.queries[-1]['sql'])
db.transaction.rollback() # allow future stats to still work
except:
log.error("Error handling stats insert: " + traceback.format_exc())
else:
if outdated:
log.warn("Outdated samples ignored: {0}".format(outdated))
def setUp(self):
self._create_user_and_login()
self.fixture_dir = os.path.join(
self.this_directory, 'fixtures', 'csv_export')
self._submission_time = parse_datetime('2013-02-18 15:54:01Z')
def setUp(self):
super(TestExports, self).setUp()
self._submission_time = parse_datetime('2013-02-18 15:54:01Z')