def _to_seconds(td):
'''Convert a timedelta to seconds'''
return td.seconds + td.days * 24 * 60 * 60
python类timedelta()的实例源码
def utcoffset(self, dt, is_dst=None):
'''See datetime.tzinfo.utcoffset
The is_dst parameter may be used to remove ambiguity during DST
transitions.
>>> from pytz import timezone
>>> tz = timezone('America/St_Johns')
>>> ambiguous = datetime(2009, 10, 31, 23, 30)
>>> tz.utcoffset(ambiguous, is_dst=False)
datetime.timedelta(-1, 73800)
>>> tz.utcoffset(ambiguous, is_dst=True)
datetime.timedelta(-1, 77400)
>>> try:
... tz.utcoffset(ambiguous)
... except AmbiguousTimeError:
... print('Ambiguous')
Ambiguous
'''
if dt is None:
return None
elif dt.tzinfo is not self:
dt = self.localize(dt, is_dst)
return dt.tzinfo._utcoffset
else:
return self._utcoffset
def dayList(backlog_days, forced_dates = None):
from datetime import datetime, timedelta
import re
forced_dates = forced_dates or []
days_to_analyze = [];
for daysback in xrange(backlog_days):
days_to_analyze.append(beforeTodayString(days=daysback))
for anaday in forced_dates:
if re.match(r"\d+-\d+-\d+", anaday) and anaday not in days_to_analyze:
days_to_analyze.append(anaday)
days_to_analyze.sort()
return days_to_analyze
def beforeTodayString(days = 0, weeks = 0):
from datetime import datetime, timedelta
return (datetime.utcnow() - timedelta(days=days, weeks=weeks)).strftime('%Y-%m-%d')
def dayStringAdd(anaday, days = 0, weeks = 0):
from datetime import datetime, timedelta
return (datetime.strptime(anaday, '%Y-%m-%d') + timedelta(days=days, weeks=weeks)).strftime('%Y-%m-%d')
def startScheduler():
db.create_all()
#create default roles!
if not db.session.query(models.Role).filter(models.Role.name == "admin").first():
admin_role = models.Role(name='admin', description='Administrator Role')
user_role = models.Role(name='user', description='User Role')
db.session.add(admin_role)
db.session.add(user_role)
db.session.commit()
try:
import tzlocal
tz = tzlocal.get_localzone()
logger.info("local timezone: %s" % tz)
except:
tz = None
if not tz or tz.zone == "local":
logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log'
'messages. To resolve this set up /etc/timezone with correct time zone name.')
tz = pytz.utc
#in debug mode this is executed twice :(
#DONT run flask in auto reload mode when testing this!
scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz)
scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1,
start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2))
scheduler.start()
sched = scheduler
#notify.task()
def every(self, get_coro_or_fut, interval: _timedelta_cls, start_at: datetime,
count=float('inf'), loop=None):
"""
executes the given job at (start_at, start_at + interval, start_at + 2 * interval, ....)
:param get_coro_or_fut: a callable which returns a co-routine or a future
:param interval: a datetime.timedelta object denoting the interval between
consecutive job excutions
:param start_at: a datetime.datetime object denoting the start time to start
this schedule
:param count: the number of time to execute this job, by default the job will be executed
continously
:param loop: event loop if the given future by get_coro_or_fut is hooked up
with a custom event loop
:return: JobSchedule object so that the user can control the schedule based
on it's future
"""
if interval.total_seconds() < 0:
raise AScheduleException("error: invalid interval ({} seconds).".format(
interval.total_seconds()))
if count <= 0:
raise AScheduleException("error: invalid count({}).".format(count))
diff = start_at - datetime.now()
if round(diff.total_seconds()) < 0:
start_at += interval * ceil((-diff) / interval)
intervals = itertools.chain(
iter([round((start_at - datetime.now()).total_seconds())]),
map(lambda x: x[1], itertools.takewhile(
lambda x: x[0] < (count - 1),
enumerate(itertools.repeat(round(interval.total_seconds()))))))
schedule = JobSchedule(get_coro_or_fut, intervals, loop=loop)
self.add_schedule(schedule)
return schedule
def every_random_interval(job, interval: timedelta, loop=None):
"""
executes the job randomly once in the specified interval.
example:
run a job every day at random time
run a job every hour at random time
:param job: a callable(co-routine function) which returns
a co-routine or a future or an awaitable
:param interval: the interval can also be given in the format of datetime.timedelta,
then seconds, minutes, hours, days, weeks parameters are ignored.
:param loop: io loop if the provided job is a custom future linked up
with a different event loop.
:return: schedule object, so it could be cancelled at will of the user by
aschedule.cancel(schedule)
"""
if loop is None:
loop = asyncio.get_event_loop()
start = loop.time()
def wait_time_gen():
count = 0
while True:
rand = random.randrange(round(interval.total_seconds()))
tmp = round(start + interval.total_seconds() * count + rand - loop.time())
yield tmp
count += 1
schedule = JobSchedule(job, wait_time_gen(), loop=loop)
# add it to default_schedule_manager, so that user can aschedule.cancel it
default_schedule_manager.add_schedule(schedule)
return schedule
def every_day(job, loop=None):
return every(job, timedelta=timedelta(days=1), loop=loop)
def every_week(job, loop=None):
return every(job, timedelta=timedelta(days=7), loop=loop)
def _every_weekday(job, weekday, loop=None):
return every(job, timedelta=timedelta(days=7), start_at=_nearest_weekday(weekday), loop=loop)
def set_to_default():
import datetime
set(NICETIES_OPEN, datetime.timedelta(days=14))
set(CLOSING_TIME, datetime.time(18, 0))
set(CLOSING_BUFFER, datetime.timedelta(minutes=30))
set(CACHE_TIMEOUT, datetime.timedelta(days=7))
set(INCLUDE_FACULTY, False)
set(INCLUDE_RESIDENTS, False)
db.session.commit()
def memorized_timedelta(seconds):
'''Create only one instance of each distinct timedelta'''
try:
return _timedelta_cache[seconds]
except KeyError:
delta = timedelta(seconds=seconds)
_timedelta_cache[seconds] = delta
return delta
def memorized_datetime(seconds):
'''Create only one instance of each distinct datetime'''
try:
return _datetime_cache[seconds]
except KeyError:
# NB. We can't just do datetime.utcfromtimestamp(seconds) as this
# fails with negative values under Windows (Bug #90096)
dt = _epoch + timedelta(seconds=seconds)
_datetime_cache[seconds] = dt
return dt
def _to_seconds(td):
'''Convert a timedelta to seconds'''
return td.seconds + td.days * 24 * 60 * 60
def utcoffset(self, dt, is_dst=None):
'''See datetime.tzinfo.utcoffset
The is_dst parameter may be used to remove ambiguity during DST
transitions.
>>> from pytz import timezone
>>> tz = timezone('America/St_Johns')
>>> ambiguous = datetime(2009, 10, 31, 23, 30)
>>> tz.utcoffset(ambiguous, is_dst=False)
datetime.timedelta(-1, 73800)
>>> tz.utcoffset(ambiguous, is_dst=True)
datetime.timedelta(-1, 77400)
>>> try:
... tz.utcoffset(ambiguous)
... except AmbiguousTimeError:
... print('Ambiguous')
Ambiguous
'''
if dt is None:
return None
elif dt.tzinfo is not self:
dt = self.localize(dt, is_dst)
return dt.tzinfo._utcoffset
else:
return self._utcoffset
def __init__(self, offset=0):
self.ZERO = datetime.timedelta(hours=offset)
def _total_seconds(td):
# Python 2.6 doesn't have a total_seconds() method on timedelta objects
return ((td.seconds + td.days * 86400) * 1000000 +
td.microseconds) // 1000000
def to_date_model(date):
raise ValueError("given date must be: a) ISO format string, b) datetime.date object, c) datetime.timedelta object, or d) dateutil.relativedelta object")
# date obj converts to absolute date
def _total_seconds(td):
# Python 2.6 doesn't have a total_seconds() method on timedelta objects
return ((td.seconds + td.days * 86400) * 1000000 +
td.microseconds) // 1000000