def _apply(self, value):
if len(value) < self.min_length:
#
# Note that we do not pad the value:
# - It is not clear to which end(s) we should add the
# padding.
# - It is not clear what the padding value(s) should be.
# - We should keep this filter's behavior consistent with
# that of MaxLength.
#
return self._invalid_value(
value = value,
reason = self.CODE_TOO_SHORT,
template_vars = {
'length': len(value),
'min': self.min_length,
},
)
return value
python类min()的实例源码
def test_can_issue_daily(self):
"""
Tests can_issue_daily method
Decides if the daily report surveys can be sent out or not
"""
self.assertTrue(self.report.can_issue_daily()) # if daily report isn't issued, should be True
self.report.survey_send_time = time.max
self.assertFalse(self.report.can_issue_daily()) # if hour is later than current time, don't issue
self.report.survey_send_time = time.min
self.report.get_daily() # create daily report
self.assertFalse(self.report.can_issue_daily(),
"Assumed after daily report is created the daily report has already been sent")
def test_can_issue_summary(self):
"""
Tests can_issue_summary
Decides if summary for daily report can be sent out or not based on if it already has and if the daily report has been created
"""
daily = self.report.get_daily()
daily.delete()
self.assertFalse(self.report.can_issue_summary(),
"Daily report hasn't been created yet, summary should not be submitted")
daily = self.report.get_daily()
self.report.summary_send_time = time.min
self.assertTrue(self.report.can_issue_summary(),
"Daily report created but summary hasn't been submitted. Should be allowed")
daily.summary_submitted = datetime.now()
daily.save()
self.assertFalse(self.report.can_issue_summary(),
"Daily report created and summary been submitted. No more summaries should be allowed")
def limit_remaining(self):
limit = self.profile('limit_post')
# If False is returned, no post limit is assumed.
if limit == 0:
return False
today_min = timezone.datetime.combine(timezone.datetime.today(), time.min)
today_max = timezone.datetime.combine(timezone.datetime.today(), time.max)
# recent_posts =
# Posts made by the user today + posts made by the IP today +
# same thing except with comments
recent_posts = Post.real.filter(Q(creator=self.id, created__range=(today_min, today_max)) | Q(creator__addr=self.addr, created__range=(today_min, today_max))).count() + Comment.real.filter(Q(creator=self.id, created__range=(today_min, today_max)) | Q(creator__addr=self.addr, created__range=(today_min, today_max))).count()
# Posts remaining
return int(limit) - recent_posts
def _daysplitinterval(starttime, endtime):
"""
Splits an interval into a list of dates, start times and end times
Parameters
----------
starttime : datetime
Start date/time of interval
endtime : datetime
End date/time of interval
Returns
-------
intervals : list
A list of lists. Each item in the sublists consists of the date,
start time, and end time for the particular date.
"""
assert starttime < endtime, 'Start datetime must be before end datetime'
out = []
starttime_orig = starttime
while starttime.date() <= endtime.date():
if starttime.date() == starttime_orig.date():
stime = starttime.time()
else:
stime = time.min
if starttime.date() == endtime.date():
etime = endtime.time()
else:
etime = time.max
out.append([starttime.date(), stime, etime])
starttime += timedelta(days=1)
return out
def _load(probe, starttime, endtime, instrument, product_id, cdfkeys):
daylist = helper._daysplitinterval(starttime, endtime)
data = []
for day in daylist:
date = day[0]
year = str(date.year)
month = str(date.month).zfill(2)
day = str(date.day).zfill(2)
local_dir = os.path.join(cluster_dir,
'c' + probe,
instrument,
year)
local_fname = 'C' + probe + '_' + product_id + '__' +\
year + month + day + '.cdf'
# If we don't have local file download it
if not os.path.exists(os.path.join(local_dir, local_fname)):
thisstart = datetime.combine(date, time.min)
thisend = datetime.combine(date, time.max)
try:
_download(probe, thisstart, thisend, instrument, product_id)
except Exception as err:
print(str(err), '\n')
continue
from pycdf import pycdf
cdf = pycdf.CDF(os.path.join(local_dir, local_fname))
for key, value in cdfkeys.items():
if value == 'Time':
index_key = key
break
data.append(helper.cdf2df(cdf, index_key, cdfkeys))
if len(data) == 0:
raise RuntimeError('No data available to download during requested '
'times')
return helper.timefilter(data, starttime, endtime)
def create_future_events(self, date_stop=None):
if not self.date_stop and not date_stop:
raise ValidationError(_("Stop date should be specified."))
date_stop = min(filter(None, [date_stop, self.date_stop]))
current_date = max(self.time_start, timezone.now())
last_event = self.events.order_by('-date_start').first()
if last_event:
current_date = max(current_date, last_event.date_start + timedelta(days=1))
current_date = make_aware(datetime.combine(current_date, time.min))
added_events = []
for day in iter_daterange(current_date, date_stop):
if not getattr(self, 'on_day%d' % day.weekday()):
continue
event = self.gen_future_event(day)
event.full_clean()
event.save(force_insert=True)
added_events.append(event)
return added_events
payments.py 文件源码
项目:money-to-prisoners-send-money
作者: ministryofjustice
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def get_govuk_capture_time(self, govuk_payment):
try:
capture_submit_time = parse_datetime(
govuk_payment['settlement_summary'].get('capture_submit_time', '')
)
captured_date = parse_date(
govuk_payment['settlement_summary'].get('captured_date', '')
)
if captured_date is not None:
capture_submit_time = (
capture_submit_time or timezone.now()
).astimezone(timezone.utc)
if capture_submit_time.date() < captured_date:
return datetime.combine(
captured_date, time.min
).replace(tzinfo=timezone.utc)
elif capture_submit_time.date() > captured_date:
return datetime.combine(
captured_date, time.max
).replace(tzinfo=timezone.utc)
else:
return capture_submit_time
except (KeyError, TypeError):
pass
raise GovUkPaymentStatusException(
'Capture date not yet available for payment %s' % govuk_payment['reference']
)
def get_queryset(self):
"""returns actions"""
start_date = self.request.GET.get('start')
end_date = self.request.GET.get('end')
if not start_date or not end_date:
raise ParseError("Period frame missing")
queryset = self.model.objects.all()
queryset = self._apply_in_action_type_lookup(queryset)
queryset = self._apply_in_charge_lookup(queryset)
try:
start_date = self._parse_date(start_date)
end_date = self._parse_date(end_date)
except ValueError:
raise ParseError("Invalid period frame")
start_datetime = datetime.combine(start_date, time.min)
end_datetime = datetime.combine(end_date, time.max)
if end_datetime < start_datetime:
return self.model.objects.none()
queryset = queryset.filter(
Q(planned_date__lte=start_datetime, end_datetime__gte=end_datetime) | # starts before, ends after period
Q(planned_date__gte=start_datetime, end_datetime__lte=end_datetime) | # starts and ends during period
Q(planned_date__lte=start_datetime, end_datetime__gte=start_datetime) | # starts before, ends during
Q(planned_date__lte=end_datetime, end_datetime__gte=end_datetime) | # starts during period, ends after
Q(
planned_date__gte=start_datetime,
end_datetime__isnull=True,
planned_date__lte=end_datetime
) # no end, starts during period
)
return queryset
def _get_datetimes(self):
"""return selected date times"""
start_date, end_date = get_date_bounds(self.value)
return datetime.combine(start_date, time.min), datetime.combine(end_date, time.max)
def form_valid(self, form):
"""create a new sale"""
analysis_code = form.cleaned_data['analysis_code']
planned_date = datetime.combine(form.cleaned_data['date'], time.min)
amount = form.cleaned_data['amount']
vat_rate = form.cleaned_data['vat_rate']
action = Action.objects.create(type=analysis_code.action_type, planned_date=planned_date)
if action.sale:
action.sale.analysis_code = analysis_code
action.sale.save()
SaleItem.objects.create(
sale=action.sale, pre_tax_price=amount, text=analysis_code.name, vat_rate=vat_rate, quantity=Decimal(1)
)
return super(AddExtraSaleView, self).form_valid(form)
def queryset(self, request, queryset):
daystart = timezone.make_aware(datetime.combine(timezone.localdate(), time.min))
dayend = timezone.make_aware(datetime.combine(timezone.localdate(), time.max))
# using daystart and dayend because I can't directly filter using start_date.day
if self.value() == 'today':
return queryset.filter(start_date__gte=daystart,
start_date__lte=dayend)
if self.value() == 'tomorrow':
daystart += timedelta(days=1)
dayend += timedelta(days=1)
return queryset.filter(start_date__gte=daystart,
start_date__lte=dayend)
if self.value() == 'week':
dayend += timedelta(days=7)
return queryset.filter(start_date__gte=daystart,
start_date__lte=dayend)
if self.value() == 'month':
dayend += timedelta(days=30)
return queryset.filter(start_date__gte=daystart,
start_date__lte=dayend)
if self.value() == 'last_week':
daystart -= timedelta(days=7)
return queryset.filter(start_date__gte=daystart,
start_date__lte=dayend)
# no else to allow fall through to return all
def download_files(start_date, number_of_days, lon, lat, target_directory):
print('Downloading files...')
if not os.path.exists(f'{target_directory}/originals'):
os.makedirs(f'{target_directory}/originals')
target = '%s/originals/{}' % target_directory
pathlib.Path(target_directory).mkdir(parents=True, exist_ok=True)
start_datetime = datetime.combine(start_date, time.min)
first_day = int(start_datetime.timestamp() / 86400) # days since epoch
BASE_URL = 'https://oceancolor.gsfc.nasa.gov/cgi/browse.pl'
GET_FILE_URL = 'https://oceandata.sci.gsfc.nasa.gov/cgi/getfile/'
url = BASE_URL + '?sub=level1or2list&sen=am&per=DAY&day={}&prm=CHL&n={}&s={}&w={}&e={}'
for d in range(first_day, first_day + number_of_days):
_url = url.format(d, lat[1], lat[0], lon[0], lon[1])
_data = requests.get(_url)
if _data:
content = _data.content
all_a_href = re.findall(r'(?<=<a href=")[^"]*', str(content))
for a_href in all_a_href:
# if 'getfile' in a_href and any((True for x in ['OC', 'SST'] if x in a_href)):
if 'file' in a_href:
try:
response = requests.get(BASE_URL + a_href, timeout=(3, 60))
for link in re.findall(r'(?<=<a href=")[^"]*', str(response.content)):
if 'LAC_OC.nc' in link:
filename = link.split('/')[-1]
r = requests.get(link)
if not os.path.exists(target.format(filename)):
with open(target.format(filename), 'wb') as f:
f.write(r.content)
print('downloaded file {}'.format(filename))
except Exception as e:
print('Failed to download file due to: {}'.format(e))
print('Done downloading files...')
def _apply(self, value):
if isinstance(value, datetime):
parsed = value
elif isinstance(value, date):
# http://stackoverflow.com/a/1937636
parsed = datetime.combine(value, time.min)
else:
try:
#
# It's a shame we can't pass ``tzinfos`` to
# :py:meth:`dateutil_parse.parse`; ``tzinfos`` only has
# effect if we also specify ``ignoretz = True``, which
# we definitely don't want to do here!
#
# https://dateutil.readthedocs.org/en/latest/parser.html#dateutil.parser.parse
#
parsed = dateutil_parse(value)
except ValueError:
return self._invalid_value(
value = value,
reason = self.CODE_INVALID,
exc_info = True,
)
if not parsed.tzinfo:
parsed = parsed.replace(tzinfo=self.timezone)
# Always covert to UTC.
aware_result = parsed.astimezone(utc)
return (
aware_result.replace(tzinfo=None)
if self.naive
else aware_result
)
def _reporthook(blocknum, blocksize, totalsize):
readsofar = blocknum * blocksize
if totalsize > 0:
percent = min(100, readsofar * 1e2 / totalsize)
s = "\r%5.1f%% %*d / %d" % (
percent, len(str(totalsize)), readsofar, totalsize)
sys.stderr.write(s)
# Near the end
if readsofar >= totalsize:
sys.stderr.write("\n")
# Total size is unknown
else:
sys.stderr.write("\rRead %d" % (readsofar,))
def start_time(self, time_part=time.min):
return datetime.combine(self.start_date, time_part).replace(tzinfo=timezone.utc)
def set_data_time(self, season, cpp):
""" Calculate and set data_times based off actual data and season. """
self.min_data_time, self.max_data_time = [from_unix(d) for d in cpp.min_max_data_time()]
if not self.min_data_time and not self.max_data_time:
self.min_data_time = self.max_data_time = self.data_time = season.start_time()
if season.is_open():
self.data_time = self.max_data_time
else:
self.data_time = min(self.max_data_time, season.end_time())
0002_regenerate_missing_gas_data.py 文件源码
项目:dsmr-reader
作者: dennissiemensma
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def regenerate_missing_data(apps, schema_editor):
GasConsumption = apps.get_model('dsmr_consumption', 'GasConsumption')
HourStatistics = apps.get_model('dsmr_stats', 'HourStatistics')
DayStatistics = apps.get_model('dsmr_stats', 'DayStatistics')
# Skip when there were no gas readings at all.
if not GasConsumption.objects.exists():
return
try:
# Check for any missing gas data.
first_missing_gas_stat = HourStatistics.objects.filter(
gas__isnull=True,
hour_start__gte=timezone.make_aware(timezone.datetime(2016, 1, 1, 12))
).order_by('hour_start')[0]
except IndexError:
return
print('')
target_hour = timezone.localtime(first_missing_gas_stat.hour_start)
day_start = timezone.make_aware(timezone.datetime.combine(target_hour, time.min))
print('Deleting statistics starting from: {}'.format(day_start))
HourStatistics.objects.filter(hour_start__gte=day_start).delete()
DayStatistics.objects.filter(day__gte=day_start.date()).delete()
days_diff = (timezone.now() - day_start).days
import dsmr_stats.services
for x in range(1, days_diff + 1):
# Just call analyze for each day. If we missed a day or so, the backend will regenerate it.
print('Regenerating day: {} / {}'.format(x, days_diff))
dsmr_stats.services.analyze()
def day_statistics(target_date):
""" Alias of daterange_statistics() for a day targeted. """
next_day = timezone.datetime.combine(target_date + relativedelta(days=1), time.min)
return range_statistics(start=target_date, end=next_day)
def month_statistics(target_date):
""" Alias of daterange_statistics() for a month targeted. """
start_of_month = timezone.datetime(year=target_date.year, month=target_date.month, day=1)
end_of_month = timezone.datetime.combine(start_of_month + relativedelta(months=1), time.min)
return range_statistics(start=start_of_month, end=end_of_month)
def year_statistics(target_date):
""" Alias of daterange_statistics() for a year targeted. """
start_of_year = timezone.datetime(year=target_date.year, month=1, day=1)
end_of_year = timezone.datetime.combine(start_of_year + relativedelta(years=1), time.min)
return range_statistics(start=start_of_year, end=end_of_year)
def to_excel(self, value):
if type(value) == date:
value = datetime.combine(value, time.min)
if not isinstance(value, datetime):
raise UnableToParseDatetime(value=value)
delta = (value - datetime(year=1900, month=1, day=1))
value = delta.days + delta.seconds / self.SECONDS_PER_DAY + 2
# Excel incorrectly assumes 1900 to be a leap year.
if value < 61:
if value < 1:
raise UnableToParseDatetime(value=value)
value -= 1
return value
def dateString(dato):
"""
Return a datetime as a nicely formatted string
"""
if dato.time() == time.min:
return dato.strftime('%Y-%m-%d')
else:
return dato.strftime('%Y-%m-%d %H:%M')
def clean_planned_date(self):
"""validate planned date"""
the_date = self.cleaned_data["date"]
the_time = self.cleaned_data.get("time", None)
if the_date:
return datetime.combine(the_date, the_time or datetime.min.time())
return None
def _get_dates(self, dates):
"""return dates"""
return [
{'date': datetime.combine(_date, time.min).isoformat()}
for _date in dates
]
def get_date_range(self, **kwargs):
"""date range from args"""
from_month = int(kwargs['from_month'])
from_year = int(kwargs['from_year'])
to_month = int(kwargs['to_month'])
to_year = int(kwargs['to_year'])
first_date = datetime.combine(date(from_year, from_month, 1), time.min)
last_date = datetime.combine(date(to_year, to_month, 1), time.min)
return first_date, last_date