def timezone_offset(timezone):
"""Takes a tz name like 'US/Eastern' and returns 'US/Eastern - UTC-05:00'.
Intended to be used by the timezone notification page fragment. The pytz
package should gracefully handle DST so the above will render 'US/Eastern -
UTC-04:00' when DST is in effect.
"""
if timezone == 'UTC':
return 'UTC'
now = datetime.datetime.now()
try:
seconds = pytz.timezone(timezone).utcoffset(now).total_seconds()
except (pytz.NonExistentTimeError, pytz.AmbiguousTimeError):
# If we're in the midst of a DST transition, add an hour and try again.
now = now + datetime.timedelta(hours=1)
seconds = pytz.timezone(timezone).utcoffset(now).total_seconds()
sign = '+'
if seconds < 0:
# The minus sign is added automatically!
sign = ''
hours, remainder = divmod(seconds, 60*60)
minutes, _ = divmod(remainder, 60)
offset = '%02d:%02d' % (hours, minutes)
display = '%s (UTC%s%s)' % (timezone, sign, offset)
return display
python类AmbiguousTimeError()的实例源码
test_timezones.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_tz_localize_dti(self):
dti = DatetimeIndex(start='1/1/2005', end='1/1/2005 0:00:30.256',
freq='L')
dti2 = dti.tz_localize(self.tzstr('US/Eastern'))
dti_utc = DatetimeIndex(start='1/1/2005 05:00',
end='1/1/2005 5:00:30.256', freq='L', tz='utc')
self.assert_numpy_array_equal(dti2.values, dti_utc.values)
dti3 = dti2.tz_convert(self.tzstr('US/Pacific'))
self.assert_numpy_array_equal(dti3.values, dti_utc.values)
dti = DatetimeIndex(start='11/6/2011 1:59', end='11/6/2011 2:00',
freq='L')
self.assertRaises(pytz.AmbiguousTimeError, dti.tz_localize,
self.tzstr('US/Eastern'))
dti = DatetimeIndex(start='3/13/2011 1:59', end='3/13/2011 2:00',
freq='L')
self.assertRaises(pytz.NonExistentTimeError, dti.tz_localize,
self.tzstr('US/Eastern'))
test_timezones.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_with_tz_ambiguous_times(self):
tz = self.tz('US/Eastern')
# March 13, 2011, spring forward, skip from 2 AM to 3 AM
dr = date_range(datetime(2011, 3, 13, 1, 30), periods=3,
freq=datetools.Hour())
self.assertRaises(pytz.NonExistentTimeError, dr.tz_localize, tz)
# after dst transition, it works
dr = date_range(datetime(2011, 3, 13, 3, 30), periods=3,
freq=datetools.Hour(), tz=tz)
# November 6, 2011, fall back, repeat 2 AM hour
dr = date_range(datetime(2011, 11, 6, 1, 30), periods=3,
freq=datetools.Hour())
self.assertRaises(pytz.AmbiguousTimeError, dr.tz_localize, tz)
# UTC is OK
dr = date_range(datetime(2011, 3, 13), periods=48,
freq=datetools.Minute(30), tz=pytz.utc)
def _CombineDateAndTime(date, time, tzinfo):
"""Creates a datetime object from date and time objects.
This is similar to the datetime.combine method, but its timezone
calculations are designed to work with pytz.
Arguments:
date: a datetime.date object, in any timezone
time: a datetime.time object, in any timezone
tzinfo: a pytz timezone object, or None
Returns:
a datetime.datetime object, in the timezone 'tzinfo'
"""
naive_result = datetime.datetime(
date.year, date.month, date.day, time.hour, time.minute, time.second)
if tzinfo is None:
return naive_result
try:
return tzinfo.localize(naive_result, is_dst=None)
except AmbiguousTimeError:
return min(tzinfo.localize(naive_result, is_dst=True),
tzinfo.localize(naive_result, is_dst=False))
except NonExistentTimeError:
while True:
naive_result += datetime.timedelta(minutes=1)
try:
return tzinfo.localize(naive_result, is_dst=None)
except NonExistentTimeError:
pass
def _CombineDateAndTime(date, time, tzinfo):
"""Creates a datetime object from date and time objects.
This is similar to the datetime.combine method, but its timezone
calculations are designed to work with pytz.
Arguments:
date: a datetime.date object, in any timezone
time: a datetime.time object, in any timezone
tzinfo: a pytz timezone object, or None
Returns:
a datetime.datetime object, in the timezone 'tzinfo'
"""
naive_result = datetime.datetime(
date.year, date.month, date.day, time.hour, time.minute, time.second)
if tzinfo is None:
return naive_result
try:
return tzinfo.localize(naive_result, is_dst=None)
except AmbiguousTimeError:
return min(tzinfo.localize(naive_result, is_dst=True),
tzinfo.localize(naive_result, is_dst=False))
except NonExistentTimeError:
while True:
naive_result += datetime.timedelta(minutes=1)
try:
return tzinfo.localize(naive_result, is_dst=None)
except NonExistentTimeError:
pass
def parse_time(input_time):
"""Parse an Atom time stamp."""
parsed = None
try:
parsed = timezone.make_aware(timezone.datetime(*input_time[:-3]),
timezone.utc)
except (pytz.NonExistentTimeError, pytz.AmbiguousTimeError):
added_hour = (timezone.datetime(*input_time[:-3]) +
datetime.timedelta(hours=1))
parsed = timezone.make_aware(added_hour, timezone.utc)
return parsed.astimezone(TZ)
def test_end_dst_double_occurrence(self):
cron_expression = "30 1 * * * *"
timezone = pytz.timezone("Europe/London")
start = dt.datetime.strptime('2015-10-25T00:00:00',
"%Y-%m-%dT%H:%M:%S")
start = timezone.localize(start, is_dst=True)
testee = tzcron.Schedule(cron_expression, timezone, start)
self.assertRaises(pytz.AmbiguousTimeError,
lambda: next(testee))
test_timezones.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_ambiguous_infer(self):
# November 6, 2011, fall back, repeat 2 AM hour
# With no repeated hours, we cannot infer the transition
tz = self.tz('US/Eastern')
dr = date_range(datetime(2011, 11, 6, 0), periods=5,
freq=datetools.Hour())
self.assertRaises(pytz.AmbiguousTimeError, dr.tz_localize, tz)
# With repeated hours, we can infer the transition
dr = date_range(datetime(2011, 11, 6, 0), periods=5,
freq=datetools.Hour(), tz=tz)
times = ['11/06/2011 00:00', '11/06/2011 01:00', '11/06/2011 01:00',
'11/06/2011 02:00', '11/06/2011 03:00']
di = DatetimeIndex(times)
localized = di.tz_localize(tz, ambiguous='infer')
self.assert_numpy_array_equal(dr, localized)
with tm.assert_produces_warning(FutureWarning):
localized_old = di.tz_localize(tz, infer_dst=True)
self.assert_numpy_array_equal(dr, localized_old)
self.assert_numpy_array_equal(dr, DatetimeIndex(times, tz=tz,
ambiguous='infer'))
# When there is no dst transition, nothing special happens
dr = date_range(datetime(2011, 6, 1, 0), periods=10,
freq=datetools.Hour())
localized = dr.tz_localize(tz)
localized_infer = dr.tz_localize(tz, ambiguous='infer')
self.assert_numpy_array_equal(localized, localized_infer)
with tm.assert_produces_warning(FutureWarning):
localized_infer_old = dr.tz_localize(tz, infer_dst=True)
self.assert_numpy_array_equal(localized, localized_infer_old)
def _CombineDateAndTime(date, time, tzinfo):
"""Creates a datetime object from date and time objects.
This is similar to the datetime.combine method, but its timezone
calculations are designed to work with pytz.
Arguments:
date: a datetime.date object, in any timezone
time: a datetime.time object, in any timezone
tzinfo: a pytz timezone object, or None
Returns:
a datetime.datetime object, in the timezone 'tzinfo'
"""
naive_result = datetime.datetime(
date.year, date.month, date.day, time.hour, time.minute, time.second)
if tzinfo is None:
return naive_result
try:
return tzinfo.localize(naive_result, is_dst=None)
except AmbiguousTimeError:
return min(tzinfo.localize(naive_result, is_dst=True),
tzinfo.localize(naive_result, is_dst=False))
except NonExistentTimeError:
while True:
naive_result += datetime.timedelta(minutes=1)
try:
return tzinfo.localize(naive_result, is_dst=None)
except NonExistentTimeError:
pass
def _CombineDateAndTime(date, time, tzinfo):
"""Creates a datetime object from date and time objects.
This is similar to the datetime.combine method, but its timezone
calculations are designed to work with pytz.
Arguments:
date: a datetime.date object, in any timezone
time: a datetime.time object, in any timezone
tzinfo: a pytz timezone object, or None
Returns:
a datetime.datetime object, in the timezone 'tzinfo'
"""
naive_result = datetime.datetime(
date.year, date.month, date.day, time.hour, time.minute, time.second)
if tzinfo is None:
return naive_result
try:
return tzinfo.localize(naive_result, is_dst=None)
except AmbiguousTimeError:
return min(tzinfo.localize(naive_result, is_dst=True),
tzinfo.localize(naive_result, is_dst=False))
except NonExistentTimeError:
while True:
naive_result += datetime.timedelta(minutes=1)
try:
return tzinfo.localize(naive_result, is_dst=None)
except NonExistentTimeError:
pass
def _CombineDateAndTime(date, time, tzinfo):
"""Creates a datetime object from date and time objects.
This is similar to the datetime.combine method, but its timezone
calculations are designed to work with pytz.
Arguments:
date: a datetime.date object, in any timezone
time: a datetime.time object, in any timezone
tzinfo: a pytz timezone object, or None
Returns:
a datetime.datetime object, in the timezone 'tzinfo'
"""
naive_result = datetime.datetime(
date.year, date.month, date.day, time.hour, time.minute, time.second)
if tzinfo is None:
return naive_result
try:
return tzinfo.localize(naive_result, is_dst=None)
except AmbiguousTimeError:
return min(tzinfo.localize(naive_result, is_dst=True),
tzinfo.localize(naive_result, is_dst=False))
except NonExistentTimeError:
while True:
naive_result += datetime.timedelta(minutes=1)
try:
return tzinfo.localize(naive_result, is_dst=None)
except NonExistentTimeError:
pass
frequencies.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def infer_freq(index, warn=True):
"""
Infer the most likely frequency given the input index. If the frequency is
uncertain, a warning will be printed.
Parameters
----------
index : DatetimeIndex or TimedeltaIndex
if passed a Series will use the values of the series (NOT THE INDEX)
warn : boolean, default True
Returns
-------
freq : string or None
None if no discernible frequency
TypeError if the index is not datetime-like
ValueError if there are less than three values.
"""
import pandas as pd
if isinstance(index, com.ABCSeries):
values = index._values
if not (com.is_datetime64_dtype(values) or
com.is_timedelta64_dtype(values) or
values.dtype == object):
raise TypeError("cannot infer freq from a non-convertible "
"dtype on a Series of {0}".format(index.dtype))
index = values
if com.is_period_arraylike(index):
raise TypeError("PeriodIndex given. Check the `freq` attribute "
"instead of using infer_freq.")
elif isinstance(index, pd.TimedeltaIndex):
inferer = _TimedeltaFrequencyInferer(index, warn=warn)
return inferer.get_freq()
if isinstance(index, pd.Index) and not isinstance(index, pd.DatetimeIndex):
if isinstance(index, (pd.Int64Index, pd.Float64Index)):
raise TypeError("cannot infer freq from a non-convertible index "
"type {0}".format(type(index)))
index = index.values
if not isinstance(index, pd.DatetimeIndex):
try:
index = pd.DatetimeIndex(index)
except AmbiguousTimeError:
index = pd.DatetimeIndex(index.asi8)
inferer = _FrequencyInferer(index, warn=warn)
return inferer.get_freq()
def _process_task(self, task):
# 1. Fetch schedule
self.org = self.client.get_organization(
task.data.get("organization_id"))
self.loc = self.org.get_location(task.data.get("location_id"))
self.role = self.loc.get_role(task.data.get("role_id"))
self.sched = self.role.get_schedule(task.data.get("schedule_id"))
self._compute_demand()
self._subtract_existing_shifts_from_demand()
# Run the calculation
s = Splitter(self.demand,
self.sched.data.get("min_shift_length_hour"),
self.sched.data.get("max_shift_length_hour"))
s.calculate()
s.efficiency()
# Naive becuase not yet datetimes
naive_shifts = s.get_shifts()
logger.info("Starting upload of %s shifts", len(naive_shifts))
local_start_time = self._get_local_start_time()
for shift in naive_shifts:
# We have to think of daylight savings time here, so we need to
# guarantee that we don't have any errors. We do this by overshooting
# the timedelta by an extra two hours, then rounding back to midnight.
logger.debug("Processing shift %s", shift)
start_day = normalize_to_midnight(
deepcopy(local_start_time) + timedelta(days=shift["day"]))
# Beware of time changes - duplicate times are possible
try:
start = start_day.replace(hour=shift["start"])
except pytz.AmbiguousTimeError:
# Randomly pick one. Minor tech debt.
start = start_day.replace(hour=shift["start"], is_dst=False)
stop = start + timedelta(hours=shift["length"])
# Convert to the strings we are passing up to the cLoUd
utc_start_str = start.astimezone(self.default_tz).isoformat()
utc_stop_str = stop.astimezone(self.default_tz).isoformat()
logger.info("Creating shift with start %s stop %s", start, stop)
self.role.create_shift(start=utc_start_str, stop=utc_stop_str)
def _subtract_existing_shifts_from_demand(self):
logger.info("Starting demand: %s", self.demand)
demand_copy = deepcopy(self.demand)
search_start = (self._get_local_start_time() - timedelta(
hours=config.MAX_SHIFT_LENGTH_HOURS)).astimezone(self.default_tz)
# 1 week
search_end = (self._get_local_start_time() + timedelta(
days=7, hours=config.MAX_SHIFT_LENGTH_HOURS)
).astimezone(self.default_tz)
shifts = self.role.get_shifts(start=search_start, end=search_end)
logger.info("Checking %s shifts for existing demand", len(shifts))
# Search hour by hour throughout the weeks
for day in range(len(self.demand)):
start_day = normalize_to_midnight(self._get_local_start_time() +
timedelta(days=day))
for start in range(len(self.demand[0])):
# Beware of time changes - duplicate times are possible
try:
start_hour = deepcopy(start_day).replace(hour=start)
except pytz.AmbiguousTimeError:
# Randomly pick one - cause phucket. Welcome to chomp.
start_hour = deepcopy(start_day).replace(
hour=start, is_dst=False)
try:
stop_hour = start_hour + timedelta(hours=1)
except pytz.AmbiguousTimeError:
stop_hour = start_hour + timedelta(hours=1, is_dst=False)
# Find shift
current_staffing_level = 0
for shift in shifts:
shift_start = iso8601.parse_date(
shift.data.get("start")).replace(
tzinfo=self.default_tz)
shift_stop = iso8601.parse_date(
shift.data.get("stop")).replace(tzinfo=self.default_tz)
if ((shift_start <= start_hour and shift_stop > stop_hour)
or
(shift_start >= start_hour and shift_start < stop_hour)
or
(shift_stop > start_hour and shift_stop <= stop_hour)):
# increment staffing level during that bucket
current_staffing_level += 1
logger.debug("Current staffing level at day %s time %s is %s",
day, start, current_staffing_level)
demand_copy[day][start] -= current_staffing_level
# demand cannot be less than zero
if demand_copy[day][start] < 0:
demand_copy[day][start] = 0
logger.info("Demand minus existing shifts: %s", demand_copy)
self.demand = demand_copy