def late(date):
return timezone.datetime(date.year, date.month, date.day, 23, 59, 59)
python类datetime()的实例源码
def first_of_month(date):
return timezone.datetime(date.year, date.month, 1, 23, 59, 59)
def daily(request):
current_date = timezone.now().replace(hour=0, minute=0, second=0)
latest_sales = (Sale.objects
.prefetch_related('product', 'member')
.order_by('-timestamp')[:7])
top_today = (Product.objects
.filter(sale__timestamp__gt=current_date)
.annotate(Count('sale'))
.order_by('-sale__count')[:7])
startTime_day = timezone.now() - datetime.timedelta(hours=24)
revenue_day = (Sale.objects
.filter(timestamp__gt=startTime_day)
.aggregate(Sum("price"))
["price__sum"]) or 0.0
startTime_month = timezone.now() - datetime.timedelta(days=30)
revenue_month = (Sale.objects
.filter(timestamp__gt=startTime_month)
.aggregate(Sum("price"))
["price__sum"]) or 0.0
top_month_category = (Category.objects
.filter(product__sale__timestamp__gt=startTime_month)
.annotate(sale=Count("product__sale"))
.order_by("-sale")[:7])
return render(request, 'admin/stregsystem/report/daily.html', locals())
def sales_api(request):
startTime_month = timezone.now() - datetime.timedelta(days=30)
qs = (Sale.objects
.filter(timestamp__gt=startTime_month)
.annotate(day=TruncDay('timestamp'))
.values('day')
.annotate(c=Count('*'))
.annotate(r=Sum('price'))
)
db_sales = {i["day"].date(): (i["c"], money(i["r"])) for i in qs}
base = timezone.now().date()
date_list = [base - datetime.timedelta(days=x) for x in range(0, 30)]
sales_list = []
revenue_list = []
for date in date_list:
if date in db_sales:
sales, revenue = db_sales[date]
sales_list.append(sales)
revenue_list.append(revenue)
else:
sales_list.append(0)
revenue_list.append(0)
items = {
"day": date_list,
"sales": sales_list,
"revenue": revenue_list,
}
return JsonResponse(items)
def test_invoice_get_rates_paidtask_rates(member):
"""Tests that `Invoice.get_rates()` returns the rates set for users in their
`PaidTask` entries.
"""
USER_RATE_ONE = 0.5
USER_RATE_TWO = 0.2
# Set some user rate
member.hourly_rate = USER_RATE_ONE
member.save()
month = timezone.datetime(2014, 04, 01)
paid_task_kwargs = {
'rate': USER_RATE_ONE,
'datetime': month,
'user': member,
'task_type': PaidTaskTypes.HOURLY_WORK,
}
PaidTaskFactory(**paid_task_kwargs)
invoice = Invoice(member, FAKE_CONFIG, month=month)
# Set user rate to something else to ensure we get the recorded rates
member.hourly_rate = USER_RATE_TWO
member.save()
rate, review_rate, hourly_rate = invoice.get_rates()
assert hourly_rate == USER_RATE_ONE
def test_aggregate_states_1(self):
start = datetime(2016, 10, 1)
end = datetime(2016, 10, 2)
telescope_states = TelescopeStates(start, end).get()
self.assertIn(self.tk1, telescope_states)
self.assertIn(self.tk2, telescope_states)
doma_expected_available_state = {'telescope': 'tst.doma.1m0a',
'event_type': 'AVAILABLE',
'event_reason': 'Available for scheduling',
'start': datetime(2016, 10, 1, 18, 24, 58, tzinfo=timezone.utc),
'end': datetime(2016, 10, 1, 20, 44, 58, tzinfo=timezone.utc)
}
self.assertIn(doma_expected_available_state, telescope_states[self.tk1])
domb_expected_available_state1 = {'telescope': 'tst.domb.1m0a',
'event_type': 'AVAILABLE',
'event_reason': 'Available for scheduling',
'start': datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc),
'end': datetime(2016, 10, 1, 19, 24, 59, tzinfo=timezone.utc)
}
self.assertIn(domb_expected_available_state1, telescope_states[self.tk2])
domb_expected_available_state2 = {'telescope': 'tst.domb.1m0a',
'event_type': 'AVAILABLE',
'event_reason': 'Available for scheduling',
'start': datetime(2016, 10, 1, 20, 24, 59, tzinfo=timezone.utc),
'end': datetime(2016, 10, 1, 20, 44, 58, tzinfo=timezone.utc)
}
self.assertIn(domb_expected_available_state2, telescope_states[self.tk2])
def test_telescope_availability_limits_interval(self, mock_intervals):
mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)),
(datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 10, 1, 21, 0, 0, tzinfo=timezone.utc)),
(datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))]
start = datetime(2016, 9, 30, tzinfo=timezone.utc)
end = datetime(2016, 10, 2, tzinfo=timezone.utc)
telescope_availability = get_telescope_availability_per_day(start, end)
self.assertIn(self.tk1, telescope_availability)
self.assertIn(self.tk2, telescope_availability)
doma_available_time = (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
doma_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
doma_expected_availability = doma_available_time / doma_total_time
self.assertAlmostEqual(doma_expected_availability, telescope_availability[self.tk1][0][1])
domb_available_time = (datetime(2016, 10, 1, 19, 24, 59) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
domb_available_time += (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 20, 24, 59)).total_seconds()
domb_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
domb_expected_availability = domb_available_time / domb_total_time
self.assertAlmostEqual(domb_expected_availability, telescope_availability[self.tk2][0][1])
def test_telescope_availability_combine(self, mock_intervals):
mock_intervals.return_value = [(datetime(2016, 9, 30, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 9, 30, 21, 0, 0, tzinfo=timezone.utc)),
(datetime(2016, 10, 1, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 10, 1, 21, 0, 0, tzinfo=timezone.utc)),
(datetime(2016, 10, 2, 18, 30, 0, tzinfo=timezone.utc),
datetime(2016, 10, 2, 21, 0, 0, tzinfo=timezone.utc))]
start = datetime(2016, 9, 30, tzinfo=timezone.utc)
end = datetime(2016, 10, 2, tzinfo=timezone.utc)
telescope_availability = get_telescope_availability_per_day(start, end)
self.assertIn(self.tk1, telescope_availability)
self.assertIn(self.tk2, telescope_availability)
combined_telescope_availability = combine_telescope_availabilities_by_site_and_class(telescope_availability)
combined_key = TelescopeKey(self.tk1.site, '', self.tk1.telescope[:-1])
self.assertIn(combined_key, combined_telescope_availability)
doma_available_time = (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
doma_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
doma_expected_availability = doma_available_time / doma_total_time
domb_available_time = (datetime(2016, 10, 1, 19, 24, 59) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
domb_available_time += (datetime(2016, 10, 1, 20, 44, 58) - datetime(2016, 10, 1, 20, 24, 59)).total_seconds()
domb_total_time = (datetime(2016, 10, 1, 21, 0, 0) - datetime(2016, 10, 1, 18, 30, 0)).total_seconds()
domb_expected_availability = domb_available_time / domb_total_time
total_expected_availability = (doma_expected_availability + domb_expected_availability) / 2.0
self.assertAlmostEqual(total_expected_availability, combined_telescope_availability[combined_key][0][1])
def setUp(self):
self.configdb_null_patcher = patch('valhalla.common.configdb.ConfigDB._get_configdb_data')
mock_configdb_null = self.configdb_null_patcher.start()
mock_configdb_null.return_value = {}
self.configdb_patcher = patch('valhalla.common.configdb.ConfigDB.get_instrument_types_per_telescope')
self.mock_configdb = self.configdb_patcher.start()
self.mock_configdb.return_value = {
TelescopeKey(site='coj', observatory='clma', telescope='2m0a'): ['2M0-FLOYDS-SCICAM',
'2M0-SCICAM-SPECTRAL'],
TelescopeKey(site='coj', observatory='doma', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'],
TelescopeKey(site='coj', observatory='domb', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'],
TelescopeKey(site='cpt', observatory='domb', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'],
TelescopeKey(site='cpt', observatory='domc', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'],
TelescopeKey(site='elp', observatory='doma', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'],
TelescopeKey(site='lsc', observatory='domb', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'],
TelescopeKey(site='lsc', observatory='domc', telescope='1m0a'): ['1M0-SCICAM-SINISTRO'],
TelescopeKey(site='ogg', observatory='clma', telescope='0m4b'): ['0M4-SCICAM-SBIG'],
TelescopeKey(site='ogg', observatory='clma', telescope='2m0a'): ['2M0-FLOYDS-SCICAM'],
TelescopeKey(site='sqa', observatory='doma', telescope='0m8a'): ['0M8-SCICAM-SBIG',
'0M8-NRES-SCICAM']}
with open('valhalla/common/test_data/es_telescope_states_data.txt', 'r') as input_file:
self.es_output = json.loads(input_file.read())
self.start = datetime(2016, 10, 1, tzinfo=timezone.utc)
self.end = datetime(2016, 10, 10, tzinfo=timezone.utc)
self.short_end = datetime(2016, 10, 4, tzinfo=timezone.utc)
self.es_patcher = patch('valhalla.common.telescope_states.TelescopeStates._get_es_data')
self.mock_es = self.es_patcher.start()
self.mock_es.return_value = self.es_output
def test_get_site_rise_set_intervals_should_not_return_an_interval(self):
start = timezone.datetime(year=2017, month=5, day=5, tzinfo=timezone.utc)
end = timezone.datetime(year=2017, month=5, day=6, tzinfo=timezone.utc)
self.assertFalse(rise_set_utils.get_site_rise_set_intervals(start=start, end=end, site_code='bpl'))
def test_if_url_mapping_is_removed(self):
log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc))
factories.UrlMappingFactory(last_usage=log, id=199)
management.call_command('remove_expired_redirects')
self.assertFalse(models.UrlMapping.objects.filter(pk=199).exists())
def test_if_regexp_mapping_is_removed(self):
log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc))
factories.UrlRegexpMappingFactory(last_usage=log, id=233)
management.call_command('remove_expired_redirects')
self.assertFalse(models.UrlRegexpMapping.objects.filter(pk=233).exists())
def test_if_used_mapping_is_not_removed(self):
log = factories.LastUsageLogFactory(used_date=timezone.datetime(2001, 12, 10, 22, 11, tzinfo=pytz.utc))
factories.UrlMappingFactory(last_usage=log, id=344)
management.call_command('remove_expired_redirects')
self.assertTrue(models.UrlMapping.objects.filter(pk=344).exists())
def test_if_used_regexp_mapping_is_not_removed(self):
log = factories.LastUsageLogFactory(used_date=timezone.datetime(2001, 12, 10, 22, 11, tzinfo=pytz.utc))
factories.UrlRegexpMappingFactory(last_usage=log, id=422)
management.call_command('remove_expired_redirects')
self.assertTrue(models.UrlRegexpMapping.objects.filter(pk=422).exists())
def test_if_removing_regexp_mapping_does_not_remove_generated_mappings(self):
log = factories.LastUsageLogFactory(used_date=timezone.datetime(1999, 12, 10, 22, 11, tzinfo=pytz.utc))
mapping = factories.UrlRegexpMappingFactory(last_usage=log, id=988)
factories.RegexpGeneratedMappingFactory(regexp=mapping, id=1022)
management.call_command('remove_expired_redirects')
self.assertTrue(models.RegexpGeneratedMapping.objects.filter(pk=1022).exists())
def get_default_value_due():
return timezone.now() + datetime.timedelta(days=7)
def get_default_value_mref():
return "ONDON" + str((timezone.now() - timezone.datetime(1970,1,1,tzinfo=timezone.utc)).total_seconds())
def test_syncsession_creation_fails_with_expired_nonce(self):
data = self.get_initial_syncsession_data_for_request()
Nonce.objects.all().update(timestamp=timezone.datetime(2000, 1, 1, tzinfo=timezone.get_current_timezone()))
self.assertSyncSessionCreationFails(data)
def test_get_video_processing_state_started_at_truncated_microseconds(self):
factories.VideoFactory(public_id='videoid', title="Some title", owner=self.user)
started_at = datetime(2016, 1, 1, 12, 13, 14, 1516, get_current_timezone())
models.ProcessingState.objects.filter(video__public_id='videoid').update(started_at=started_at)
response = self.client.get(reverse('api:v1:video-detail', kwargs={'id': 'videoid'}))
video = response.json()
# Check that microseconds are truncated
self.assertEqual('2016-01-01T12:13:14Z', video['processing']['started_at'])
def test_get_for_notification(self):
date = timezone.datetime(2016, 12, 8, 10, 0, 0, 0, pytz.UTC)
queryset = Event.objects.get_for_notification(date)
self.assertGreater(queryset.count(), 0)
for event in queryset:
self.assertTrue(event.begin >= date)
self.assertTrue(event.notified_at is None)