def format_date(val, fmt='%m-%d-%Y'):
"""
Transform the input string to a datetime object
:param val: the input string for date
:param fmt: the input format for the date
"""
date_obj = None
try:
date_obj = datetime.strptime(val, fmt)
except Exception as exc:
log.warning("Problem formatting date: {} - {} due: {}"
.format(val, fmt, exc))
return date_obj
python类date()的实例源码
test_dashboard_projects_week.py 文件源码
项目:FRG-Crowdsourcing
作者: 97amarnathk
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_format_updated_projects(self):
"""Test format updated projects works."""
old_date = date(2014, 11, 24)
old_project = ProjectFactory.create(updated=old_date)
p = ProjectFactory.create()
p.name = 'NewNewNew'
project_repo = ProjectRepository(db)
project_repo.update(p)
update_projects_week()
day = datetime.utcnow().strftime('%Y-%m-%d')
res = format_update_projects()
assert len(res) == 1, res
res = res[0]
assert res['day'] == day, res['day']
assert res['id'] == p.id
assert res['short_name'] == p.short_name
assert res['p_name'] == p.name
assert res['email_addr'] == p.owner.email_addr
assert res['owner_id'] == p.owner.id
assert res['u_name'] == p.owner.name
def default(self, o):
"""Implement this method in a subclass such that it returns a
serializable object for ``o``, or calls the base implementation (to
raise a :exc:`TypeError`).
For example, to support arbitrary iterators, you could implement
default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
return JSONEncoder.default(self, o)
"""
if isinstance(o, date):
return http_date(o.timetuple())
if isinstance(o, uuid.UUID):
return str(o)
if hasattr(o, '__html__'):
return text_type(o.__html__())
return _json.JSONEncoder.default(self, o)
def get_month_to_message_statistic(self, message_statistic):
"""
Maps each month between the month of the first message and the month of
the last message, inclusive, to the sum of the values of a message
statistic over all messages from that month.
Args:
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
month_to_message_statistic: A dict mapping a string of the form
'YYYY-MM' representing a month between the month of the first
message and the month of the last message to the sum of the
values of message_statistic over all messages in self.messages
from that month.
"""
start_dt = self.messages[0].timestamp
end_dt = self.messages[-1].timestamp
start_date_month_start = datetime(start_dt.year, start_dt.month, 1)
end_date_month_start = datetime(end_dt.year, end_dt.month, 1)
dt_month_range = rrule(MONTHLY, dtstart=start_date_month_start,
until=end_date_month_start)
month_range = [dt.date() for dt in dt_month_range]
month_to_message_statistic = {dt.strftime(self.MONTH_FORMAT): 0 for dt in month_range}
for message in self.messages:
month_str = message.timestamp.strftime(self.MONTH_FORMAT)
month_to_message_statistic[month_str] += message_statistic(message)
return month_to_message_statistic
def get_time_interval_to_message_statistic(self, time_interval,
message_statistic):
"""
Maps each value of time interval to the sum of the values of a message
statistic over all messages from that time interval value. Wrapper
function for the functions 'get_' + time_interval.replace(' ', '_') +
'_to_message_statistic'.
Args:
time_interval: One of 'minute in hour', 'minute in day', 'hour',
'date', 'week', 'month', 'year'.
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
time_interval_to_message_statistic: A dict mapping each value of a
time interval to the sum of the values of message_statistic
over all messages in self.messages from that time interval
value.
"""
if time_interval not in self.TIME_INTERVALS:
raise ValueError('time_interval must be in {}'.format(self.TIME_INTERVALS))
getter = getattr(self, 'get_' + time_interval.replace(' ', '_') + '_to_message_statistic')
time_interval_to_message_statistic = getter(message_statistic)
return time_interval_to_message_statistic
def test_calendary_weekday_returns_ordinal(self):
third_tuesday_in_july = Calendary(2016).weekday('Tuesday', month=7, ordinal=3)
assert third_tuesday_in_july == ('Tuesday', datetime.date(month=7, day=19, year=2016))
def test_calendary_weekday_returns_right_day_for_year(self):
first_monday_in_nineteen_fortyseven = Calendary(1947).weekday('Monday', ordinal=1)
assert first_monday_in_nineteen_fortyseven == ('Monday', datetime.date(month=1, day=6, year=1947))
def test_weekday_with_multiple_inputs_returns_multiple_days(self):
mondays_and_tuesdays = Calendary(2016).weekday((0, 1), month=7)
mondays = [d[1] for d in mondays_and_tuesdays if d[0] == 'Monday']
tuesdays = [d[1] for d in mondays_and_tuesdays if d[0] == 'Tuesday']
monday_july_eighteenth = datetime.date(year=2016, month=7, day=18)
tuesday_july_nineteenth = datetime.date(year=2016, month=7, day=19)
assert monday_july_eighteenth in mondays
assert tuesday_july_nineteenth in tuesdays
def Log(self, eventType, dataList):
'''
Create an entry in the log file. Each entry will look like:
timestamp\tevent\tdata1\tdata2 <etc>\n
where:
timestamp = integer seconds since the UNIX epoch
event = string identifying the event
data1..n = individual data fields, as appropriate for each event type.
To avoid maintenance issues w/r/t enormous log files, the log filename
that's stored in the settings file is passed through datetime.strftime()
so we can expand any format codes found there against the current date/time
and create e.g. a monthly log file.
'''
now = int(time())
today = datetime.fromtimestamp(now)
# if there's no explicit log file path/name, we create one
# that's the current year & month.
fileName = self.settings.logFilePath
if not fileName:
fileName = "%Y-%m.txt"
self.settings.logFilePath = fileName
path = self.GetPath(fileName)
path = today.strftime(path)
with open(path, "a+t") as f:
f.write("{0}\t{1}\t".format(now, eventType))
f.write("\t".join(dataList))
f.write("\n")
def is_due_date(date):
if 15 <= date.day <= 21:
if date.weekday() == 4:
return True
return False
def gen_due_date(year, month):
date0 = datetime.date(year,month, 1)
for i in range(31):
date0 = date0 + datetime.timedelta(1)
if is_due_date(date0):
return date0
return None
def get_due_date(date):
due_date_this_month = gen_due_date(date.year, date.month)
if date.month != 12:
due_date_next_month = gen_due_date(date.year, date.month + 1)
else:
due_date_next_month = gen_due_date(date.year - 1, 1)
if date > due_date_this_month:
return due_date_next_month
else:
return due_date_this_month
def get_previous_and_next_day(db, message_date):
"""Gets the previous and following saved days given the day in between in the database"""
previous = db.query_message("where date < '{}' order by id desc"
.format(message_date))
following = db.query_message("where date >= '{}' order by id asc"
.format(message_date+timedelta(days=1)))
return Exporter.get_message_date(previous), Exporter.get_message_date(following)
def get_message_date(message):
"""Retrieves the given message DATE, ignoring the time (hour, minutes, seconds, etc.)"""
if message:
return date(year=message.date.year, month=message.date.month, day=message.date.day)
#endregion
def _build_date(date, kwargs):
"""
Builds the date argument for event rules.
"""
if date is None:
if not kwargs:
raise ValueError('Must pass a date or kwargs')
else:
return datetime.date(**kwargs)
elif kwargs:
raise ValueError('Cannot pass kwargs and a date')
else:
return date
def _get_open(self, dt, env):
"""
Cache the open for each day.
"""
if self._dt is None or (self._dt.date() != dt.date()):
self._dt = env.get_open_and_close(dt)[0] \
- datetime.timedelta(minutes=1)
return self._dt
def _get_close(self, dt, env):
"""
Cache the close for each day.
"""
if self._dt is None or (self._dt.date() != dt.date()):
self._dt = env.get_open_and_close(dt)[1]
return self._dt
def should_trigger(self, dt, env):
return dt.date() not in env.early_closes
def get_first_trading_day_of_week(self, dt, env):
prev = dt
dt = env.previous_trading_day(dt)
while dt.date().weekday() < prev.date().weekday():
prev = dt
dt = env.previous_trading_day(dt)
return prev.date()
def __init__(self, n):
if not 0 <= n < MAX_WEEK_RANGE:
raise _out_of_range_error(MAX_WEEK_RANGE)
self.td_delta = -n
self.date = None