def apply(self, other):
n = self.n
wkday, _ = tslib.monthrange(other.year, other.month)
first = _get_firstbday(wkday)
monthsSince = (other.month - self.startingMonth) % 3
if n <= 0 and monthsSince != 0: # make sure to roll forward so negate
monthsSince = monthsSince - 3
# roll forward if on same month later than first bday
if n <= 0 and (monthsSince == 0 and other.day > first):
n = n + 1
# pretend to roll back if on same month but before firstbday
elif n > 0 and (monthsSince == 0 and other.day < first):
n = n - 1
# get the first bday for result
other = other + relativedelta(months=3 * n - monthsSince)
wkday, _ = tslib.monthrange(other.year, other.month)
first = _get_firstbday(wkday)
result = datetime(other.year, other.month, first,
other.hour, other.minute, other.second,
other.microsecond)
return result
python类year()的实例源码
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def onOffset(self, dt):
if self.normalize and not _is_normalized(dt):
return False
wkday, days_in_month = tslib.monthrange(dt.year, self.month)
return self.month == dt.month and dt.day == days_in_month
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 38
收藏 0
点赞 0
评论 0
def apply(self, other):
def _increment(date, n):
year = date.year + n - 1
if date.month >= self.month:
year += 1
return datetime(year, self.month, 1, date.hour, date.minute,
date.second, date.microsecond)
def _decrement(date, n):
year = date.year + n + 1
if date.month < self.month or (date.month == self.month and
date.day == 1):
year -= 1
return datetime(year, self.month, 1, date.hour, date.minute,
date.second, date.microsecond)
def _rollf(date):
if (date.month != self.month) or date.day > 1:
date = _increment(date, 1)
return date
n = self.n
result = other
if n > 0:
result = _increment(result, n)
elif n < 0:
result = _decrement(result, n)
else:
# n == 0, roll forward
result = _rollf(result)
return result
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def onOffset(self, dt):
if self.normalize and not _is_normalized(dt):
return False
dt = datetime(dt.year, dt.month, dt.day)
year_end = self.get_year_end(dt)
if self.variation == "nearest":
# We have to check the year end of "this" cal year AND the previous
return year_end == dt or \
self.get_year_end(dt - relativedelta(months=1)) == dt
else:
return year_end == dt
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _get_year_end_last(self, dt):
current_year = datetime(
dt.year, self.startingMonth, 1, tzinfo=dt.tzinfo)
return current_year + self._offset_lwom
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def apply(self, other):
base = other
n = self.n
if n > 0:
while n > 0:
if not self._offset.onOffset(other):
qtr_lens = self.get_weeks(other)
start = other - self._offset
else:
start = other
qtr_lens = self.get_weeks(other + self._offset)
for weeks in qtr_lens:
start += relativedelta(weeks=weeks)
if start > other:
other = start
n -= 1
break
else:
n = -n
while n > 0:
if not self._offset.onOffset(other):
qtr_lens = self.get_weeks(other)
end = other + self._offset
else:
end = other
qtr_lens = self.get_weeks(other)
for weeks in reversed(qtr_lens):
end -= relativedelta(weeks=weeks)
if end < other:
other = end
n -= 1
break
other = datetime(other.year, other.month, other.day,
base.hour, base.minute, base.second, base.microsecond)
return other
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def onOffset(self, dt):
if self.normalize and not _is_normalized(dt):
return False
return date(dt.year, dt.month, dt.day) == easter(dt.year)
# ---------------------------------------------------------------------
# Ticks
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def _get_firstbday(wkday):
"""
wkday is the result of monthrange(year, month)
If it's a saturday or sunday, increment first business day to reflect this
"""
first = 1
if wkday == 5: # on Saturday
first = 3
elif wkday == 6: # on Sunday
first = 2
return first
def from_datetime(date):
"""Return QDate object from datetime.date."""
return QDate(date.year, date.month, date.day)
def to_datetime(qdate):
"""Return datetime.date object from QDate."""
return date(day=qdate.day(), month=qdate.month(), year=qdate.year())
def _get_custom_select(params):
select = {}
group_by_date_field = params.get('group_by_date_field')
time_range = params.get('time_range')
if time_range == 'Day':
select['day'] = 'date(%s)' % group_by_date_field
elif time_range == 'Month':
select['month'] = "extract(month FROM %s)" % group_by_date_field
select['year'] = "extract(year FROM %s)" % group_by_date_field
elif time_range == 'Year':
select['year'] = "extract(year FROM %s)" % group_by_date_field
elif time_range == 'Week':
select['week'] = "date_trunc('week', %s)" % group_by_date_field
return select
def _format_data(data, params):
time_range = params.get('time_range')
day = 1
month = 1
for item in data:
if time_range == 'Day':
date = item.pop('day')
day = date.day
month = date.month
year = date.year
elif time_range == 'Month':
month = int(item.pop('month'))
year = int(item.pop('year'))
elif time_range == 'Year':
year = int(item.pop('year'))
elif time_range == 'Week':
date = item.pop('week')
day = date.day
month = date.month
year = date.year
item['label'] = "%d-%02d-%02d" % (year, month, day)
item['values'] = { 'value': item.pop('value') }
_sort_data_by_date(data)
_fill_empty_date(data, params)
def monthdelta(date, delta):
m = (date.month + delta) % 12
y = date.year + (date.month + delta - 1) // 12
if not m:
m = 12
d = min(date.day, [31, 29 if y % 4 == 0 and not y % 400 == 0
else 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31
][m - 1])
return date.replace(day=d, month=m, year=y)
def time_filter_between(period, m, df):
def _filter(o):
if period == 'm':
return df(o).year == m.year and df(o).month == m.month and df(o).day == m.day
return df(o).date() <= m and df(o).date() > (m - timedelta(days=1))
if period == 'y':
return df(o).year == m.year and df(o).month == m.month
return (df(o).date().replace(day=1) <= m and
df(o).date().replace(day=1) > (m - timedelta(days=30)))
return _filter
offsets.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def apply(self, other):
def _increment(date):
if date.month == self.month:
_, days_in_month = tslib.monthrange(date.year, self.month)
if date.day != days_in_month:
year = date.year
else:
year = date.year + 1
elif date.month < self.month:
year = date.year
else:
year = date.year + 1
_, days_in_month = tslib.monthrange(year, self.month)
return datetime(year, self.month, days_in_month,
date.hour, date.minute, date.second,
date.microsecond)
def _decrement(date):
year = date.year if date.month > self.month else date.year - 1
_, days_in_month = tslib.monthrange(year, self.month)
return datetime(year, self.month, days_in_month,
date.hour, date.minute, date.second,
date.microsecond)
def _rollf(date):
if date.month != self.month or\
date.day < tslib.monthrange(date.year, date.month)[1]:
date = _increment(date)
return date
n = self.n
result = other
if n > 0:
while n > 0:
result = _increment(result)
n -= 1
elif n < 0:
while n < 0:
result = _decrement(result)
n += 1
else:
# n == 0, roll forward
result = _rollf(result)
return result