def test_iter_quarters():
start = timezone.make_aware(datetime(2015, 11, 30, 1, 2, 3))
end = timezone.make_aware(datetime(2017, 2, 28, 11, 22, 33))
quarters = iter_quarters(start, end)
assert type(quarters) is types.GeneratorType
starts = [
datetime.combine(datetime(year, month, day).date(), start.timetz())
for year, month, day in [
(2015, 11, 30),
(2016, 2, 29), # leap!
(2016, 5, 30),
(2016, 8, 30),
(2016, 11, 30),
(2017, 2, 28),
]
]
ends = starts[1:] + [end]
assert list(quarters) == list(zip(starts, ends))
python类combine()的实例源码
def test_iter_years():
start = timezone.make_aware(datetime(2016, 2, 29, 1, 2, 3))
end = timezone.make_aware(datetime(2019, 2, 28, 11, 22, 33))
years = iter_years(start, end)
assert type(years) is types.GeneratorType
starts = [
datetime.combine(datetime(year, month, day).date(), start.timetz())
for year, month, day in [
(2016, 2, 29), # leap!
(2017, 2, 28),
(2018, 2, 28),
(2019, 2, 28),
]
]
ends = starts[1:] + [end]
assert list(years) == list(zip(starts, ends))
def format_date(utc, isoformat=False):
"""Parse Twitter's UTC date into UTC or local time."""
u = datetime.strptime(utc.replace('+0000','UTC'), '%a %b %d %H:%M:%S %Z %Y')
# This is the least painful way I could find to create a non-naive
# datetime including a UTC timezone. Alternative suggestions
# welcome.
unew = datetime.combine(u.date(), time(u.time().hour,
u.time().minute, u.time().second, tzinfo=UTC))
# Convert to localtime
unew = unew.astimezone(Local)
if isoformat:
return unew.isoformat()
else:
return unew.strftime('%Y-%m-%d %H:%M:%S %Z')
def _get_day_attack_schedule(self):
"""
Return an array of datetimes according to the planner args
"""
planer_args = self.planner_config["args"]
start_time = datetime.strptime(planer_args["min_time"], "%H:%M").time()
start_date = datetime.combine(datetime.today().date(), start_time)
end_time = datetime.strptime(planer_args["max_time"], "%H:%M").time()
end_date = datetime.combine(datetime.today().date(), end_time)
random.seed()
attack_schedule = []
for start, end in self._split_date_range(start_date, end_date, planer_args["times"]):
attack_schedule.append(random.uniform(start, end))
return attack_schedule
def _validate_mindate(self, min_date, field, value):
""" {'type': ['date', 'datetime']} """
# Remarks
# -------
# the yaml-reader prepares a datetime.date objects when possible,
# the dwca-reader is not doing this, so compatibility need to be better
# ensured
if self._dateisrange(value):
[self._validate_mindate(min_date, field, valdate) for valdate in
value.split("/")]
else:
# convert schema info to datetime to enable comparison
if isinstance(min_date, date):
min_date = datetime.combine(min_date, datetime.min.time())
# try to parse the datetime-format
event_date = self._parse_date(field, value)
if event_date:
if event_date < min_date:
self._error(field, "date is before min limit " +
min_date.date().isoformat())
def _validate_maxdate(self, max_date, field, value):
""" {'type': ['date', 'datetime']} """
# Remarks
# -------
# the yaml-reader prepares a datetime.date objects when possible,
# the dwca-reader is not doing this, so compatibility need to be better
# ensured
if self._dateisrange(value):
for valdate in value.split("/"):
self._validate_maxdate(max_date, field, valdate)
else:
# convert schema info to datetime to enable comparison
if isinstance(max_date, date):
max_date = datetime.combine(max_date, datetime.min.time())
# try to parse the datetime-format
event_date = self._parse_date(field, value)
if event_date:
if event_date > max_date:
self._error(field, "date is after max limit " +
max_date.date().isoformat())
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
self.assertRaises(TypeError, combine, d, "time") # wrong type
self.assertRaises(TypeError, combine, "date", t) # wrong type
def calculate_mean_departure_datetime(departure_datetimes):
"""
Calculate the mean value of a list of departure_datetime values.
:param departure_datetimes: [datetime]
:return: mean_departure_datetime: datetime
"""
total = 0
number_of_departure_datetimes = len(departure_datetimes)
for departure_datetime in departure_datetimes:
total += (departure_datetime.hour * 3600) + (departure_datetime.minute * 60) + departure_datetime.second
avg = total / number_of_departure_datetimes
minutes, seconds = divmod(int(avg), 60)
hours, minutes = divmod(minutes, 60)
mean_departure_datetime = datetime.combine(departure_datetimes[0].date(), time(hours, minutes, seconds))
return mean_departure_datetime
def get_task_instances(
self, session, start_date=None, end_date=None, state=None):
TI = TaskInstance
if not start_date:
start_date = (timezone.utcnow() - timedelta(30)).date()
start_date = datetime.combine(start_date, datetime.min.time())
end_date = end_date or timezone.utcnow()
tis = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.execution_date >= start_date,
TI.execution_date <= end_date,
TI.task_id.in_([t.task_id for t in self.tasks]),
)
if state:
tis = tis.filter(TI.state == state)
tis = tis.order_by(TI.execution_date).all()
return tis
feature_engineering.py 文件源码
项目:smart-battery-for-smart-energy-usage
作者: AnatolyPavlov
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def transform_inverse(self, df):
price = pd.read_csv(self.path_to_price, parse_dates=True, index_col='Unnamed: 0')
#
index = []
values = []
days = extract_days(df)
time_intervs_in_day = [d.time() for i, d in enumerate(price.index)]
for day in days:
i = 0
for time_intv in time_intervs_in_day:
if time_intv <= df.index[i].time():
values.append(df.values[i][0])
index.append(datetime.combine(day, time_intv))
else:
i+=1
values.append(df.values[i][0])
index.append(datetime.combine(day, time_intv))
df_out = pd.DataFrame(values, columns=[df.columns[0]], index=index)
return df_out
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
def timetable_entries_which_violate_constraints(self):
start_date = self.start.date()
entries_which_violate_constraints = []
for constraint in self.constraints.all():
constraint_start = timezone.make_aware(
datetime.combine(start_date, constraint.start_time),
timezone.get_current_timezone())
constraint_end = timezone.make_aware(
datetime.combine(start_date, constraint.end_time),
timezone.get_current_timezone())
participations = Participation.objects.filter(entry__meeting=self,
user=constraint.user, ignored_for_optimization=False,
entry__timetable_index__isnull=False)
for participation in participations:
start = participation.entry.start
end = participation.entry.end
if (constraint_start >= start and constraint_start < end) or \
(constraint_end > start and constraint_end <= end) or \
(constraint_start <= start and constraint_end >= end):
entries_which_violate_constraints.append(participation.entry)
return entries_which_violate_constraints
def clean(self):
'''
Only allow submission if there are not already slots in the submitted window,
and only allow rooms associated with the chosen location.
'''
super(SlotCreationForm,self).clean()
startDate = self.cleaned_data.get('startDate')
endDate = self.cleaned_data.get('endDate')
startTime = self.cleaned_data.get('startTime')
endTime = self.cleaned_data.get('endTime')
instructor = self.cleaned_data.get('instructorId')
existingSlots = InstructorAvailabilitySlot.objects.filter(
instructor=instructor,
startTime__gt=(
ensure_localtime(datetime.combine(startDate,startTime)) -
timedelta(minutes=getConstant('privateLessons__lessonLengthInterval'))
),
startTime__lt=ensure_localtime(datetime.combine(endDate,endTime)),
)
if existingSlots.exists():
raise ValidationError(_('Newly created slots cannot overlap existing slots for this instructor.'),code='invalid')
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
self.assertRaises(TypeError, combine, d, "time") # wrong type
self.assertRaises(TypeError, combine, "date", t) # wrong type
def shouldStartAfter(self):
if self.endDate:
return
tender = self.__parent__
if tender.lots or tender.status not in ['active.tendering', 'active.pre-qualification.stand-still', 'active.auction']:
return
start_after = None
if tender.status == 'active.tendering' and tender.tenderPeriod.endDate:
start_after = calculate_business_date(tender.tenderPeriod.endDate, TENDERING_AUCTION, tender)
elif self.startDate and get_now() > calc_auction_end_time(tender.numberOfBids, self.startDate):
start_after = calc_auction_end_time(tender.numberOfBids, self.startDate)
elif tender.qualificationPeriod and tender.qualificationPeriod.endDate:
decision_dates = [
datetime.combine(complaint.dateDecision.date() + timedelta(days=3), time(0, tzinfo=complaint.dateDecision.tzinfo))
for qualification in tender.qualifications
for complaint in qualification.complaints
if complaint.dateDecision
]
decision_dates.append(tender.qualificationPeriod.endDate)
start_after = max(decision_dates)
if start_after:
return rounding_shouldStartAfter(start_after, tender).isoformat()
def shouldStartAfter(self):
if self.endDate:
return
tender = get_tender(self)
lot = self.__parent__
if tender.status not in ['active.tendering', 'active.pre-qualification.stand-still', 'active.auction'] or lot.status != 'active':
return
start_after = None
if tender.status == 'active.tendering' and tender.tenderPeriod.endDate:
start_after = calculate_business_date(tender.tenderPeriod.endDate, TENDERING_AUCTION, tender)
elif self.startDate and get_now() > calc_auction_end_time(lot.numberOfBids, self.startDate):
start_after = calc_auction_end_time(lot.numberOfBids, self.startDate)
elif tender.qualificationPeriod and tender.qualificationPeriod.endDate:
decision_dates = [
datetime.combine(complaint.dateDecision.date() + timedelta(days=3), time(0, tzinfo=complaint.dateDecision.tzinfo))
for qualification in tender.qualifications
for complaint in qualification.complaints
if complaint.dateDecision
]
decision_dates.append(tender.qualificationPeriod.endDate)
start_after = max(decision_dates)
if start_after:
return rounding_shouldStartAfter(start_after, tender).isoformat()
def _load(probe, starttime, endtime, instrument, product_id, cdfkeys):
daylist = helper._daysplitinterval(starttime, endtime)
data = []
for day in daylist:
date = day[0]
year = str(date.year)
month = str(date.month).zfill(2)
day = str(date.day).zfill(2)
local_dir = os.path.join(cluster_dir,
'c' + probe,
instrument,
year)
local_fname = 'C' + probe + '_' + product_id + '__' +\
year + month + day + '.cdf'
# If we don't have local file download it
if not os.path.exists(os.path.join(local_dir, local_fname)):
thisstart = datetime.combine(date, time.min)
thisend = datetime.combine(date, time.max)
try:
_download(probe, thisstart, thisend, instrument, product_id)
except Exception as err:
print(str(err), '\n')
continue
from pycdf import pycdf
cdf = pycdf.CDF(os.path.join(local_dir, local_fname))
for key, value in cdfkeys.items():
if value == 'Time':
index_key = key
break
data.append(helper.cdf2df(cdf, index_key, cdfkeys))
if len(data) == 0:
raise RuntimeError('No data available to download during requested '
'times')
return helper.timefilter(data, starttime, endtime)
def to_json(self):
speakers = []
for speaker in self.speakers.all():
speakers.append(speaker.to_json())
return {
'title': self.title,
'shortTitle': self.shortTitle,
'subTitle': self.subTitle,
'startTime': datetime_to_utc(datetime.combine(self.date, self.startTime)),
'endTime': datetime_to_utc(datetime.combine(self.date, self.endTime)),
'desc': self.description,
'img': self.image.url,
'location': self.location,
'speakers': speakers,
'registrationUrl': self.registrationUrl,
'prettyUrl': self.prettyUrl
}
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_NaT_methods(self):
# GH 9513
raise_methods = ['astimezone', 'combine', 'ctime', 'dst',
'fromordinal', 'fromtimestamp', 'isocalendar',
'strftime', 'strptime', 'time', 'timestamp',
'timetuple', 'timetz', 'toordinal', 'tzname',
'utcfromtimestamp', 'utcnow', 'utcoffset',
'utctimetuple']
nat_methods = ['date', 'now', 'replace', 'to_datetime', 'today']
nan_methods = ['weekday', 'isoweekday']
for method in raise_methods:
if hasattr(NaT, method):
self.assertRaises(ValueError, getattr(NaT, method))
for method in nan_methods:
if hasattr(NaT, method):
self.assertTrue(np.isnan(getattr(NaT, method)()))
for method in nat_methods:
if hasattr(NaT, method):
self.assertIs(getattr(NaT, method)(), NaT)
# GH 12300
self.assertEqual(NaT.isoformat(), 'NaT')
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def test_class_ops_pytz(self):
tm._skip_if_no_pytz()
from pytz import timezone
def compare(x, y):
self.assertEqual(int(Timestamp(x).value / 1e9),
int(Timestamp(y).value / 1e9))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(timezone('UTC')))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def test_class_ops_dateutil(self):
tm._skip_if_no_dateutil()
from dateutil.tz import tzutc
def compare(x, y):
self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
int(np.round(Timestamp(y).value / 1e9)))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(tzutc()))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
def town_factory(**custom):
"""Generate a `Town` namedtuple from `custom` parameters.
Required parameters: `dep`, `com` and `nccenr`.
"""
params = {
'actual': '1',
'modification': 0,
'ancestors': '',
'successors': '',
'start_date': START_DATE,
'end_date': END_DATE,
'population': 'NULL',
'parents': ''
}
custom['depcom'] = custom['dep'] + custom['com']
params.update(custom)
params['id'] = compute_id(params['depcom'], params['start_date'])
params['start_datetime'] = datetime.combine(params['start_date'],
datetime.min.time())
params['end_datetime'] = datetime.combine(params['end_date'],
datetime.max.time())
return Town(**params)
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
self.assertRaises(TypeError, combine, d, "time") # wrong type
self.assertRaises(TypeError, combine, "date", t) # wrong type
def test_combine(self):
d = date(2002, 3, 4)
t = time(18, 45, 3, 1234)
expected = self.theclass(2002, 3, 4, 18, 45, 3, 1234)
combine = self.theclass.combine
dt = combine(d, t)
self.assertEqual(dt, expected)
dt = combine(time=t, date=d)
self.assertEqual(dt, expected)
self.assertEqual(d, dt.date())
self.assertEqual(t, dt.time())
self.assertEqual(dt, combine(dt.date(), dt.time()))
self.assertRaises(TypeError, combine) # need an arg
self.assertRaises(TypeError, combine, d) # need two args
self.assertRaises(TypeError, combine, t, d) # args reversed
self.assertRaises(TypeError, combine, d, t, 1) # too many args
self.assertRaises(TypeError, combine, "date", "time") # wrong types
def block(user_id):
form = UserBlockForm()
myUser = User.q.get(user_id)
if form.validate_on_submit():
end_date = datetime.combine(form.date.data, time(0))
if form.unlimited.data:
end_date = None
try:
blocked_user = lib.user.block(
user=myUser,
date=end_date,
reason=form.reason.data,
processor=current_user)
except ValueError as e:
flash(e.message, 'error')
else:
flash(u'Nutzer gesperrt', 'success')
return redirect(url_for('.user_show', user_id=user_id))
return render_template('user/user_block.html', form=form, user_id=user_id)
def move_out(user_id):
form = UserMoveOutForm()
myUser = User.q.get(user_id)
if myUser is None:
flash(u"Nutzer mit ID %s existiert nicht!" % (user_id,), 'error')
abort(404)
if form.validate_on_submit():
lib.user.move_out(
user=myUser,
date=datetime.combine(form.date.data, time(0)),
processor=current_user,
comment=form.comment.data
)
flash(u'Nutzer wurde ausgezogen', 'success')
return redirect(url_for('.user_show', user_id=myUser.id))
return render_template('user/user_moveout.html', form=form, user_id=user_id)
def move_out_tmp(user_id):
form = UserMoveOutForm()
my_user = User.q.get(user_id)
if my_user is None:
flash(u"Nutzer mit ID %s existiert nicht!" % (user_id,), 'error')
abort(404)
if form.validate_on_submit():
changed_user = lib.user.move_out_tmp(
user=my_user,
date=datetime.combine(form.date.data,time(0)),
comment=form.comment.data,
processor=current_user
)
flash(u'Nutzer zieht am %s vorübegehend aus' % form.date.data,
'success')
return redirect(url_for('.user_show', user_id=changed_user.id))
return render_template('user/user_moveout.html', form=form, user_id=user_id)
def get_task_instances(
self, session, start_date=None, end_date=None, state=None):
TI = TaskInstance
if not start_date:
start_date = (datetime.today()-timedelta(30)).date()
start_date = datetime.combine(start_date, datetime.min.time())
end_date = end_date or datetime.now()
tis = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.execution_date >= start_date,
TI.execution_date <= end_date,
TI.task_id.in_([t.task_id for t in self.tasks]),
)
if state:
tis = tis.filter(TI.state == state)
tis = tis.all()
return tis