def _subtract_existing_shifts_from_demand(self):
logger.info("Starting demand: %s", self.demand)
demand_copy = deepcopy(self.demand)
search_start = (self._get_local_start_time() - timedelta(
hours=config.MAX_SHIFT_LENGTH_HOURS)).astimezone(self.default_tz)
# 1 week
search_end = (self._get_local_start_time() + timedelta(
days=7, hours=config.MAX_SHIFT_LENGTH_HOURS)
).astimezone(self.default_tz)
shifts = self.role.get_shifts(start=search_start, end=search_end)
logger.info("Checking %s shifts for existing demand", len(shifts))
# Search hour by hour throughout the weeks
for day in range(len(self.demand)):
start_day = normalize_to_midnight(self._get_local_start_time() +
timedelta(days=day))
for start in range(len(self.demand[0])):
# Beware of time changes - duplicate times are possible
try:
start_hour = deepcopy(start_day).replace(hour=start)
except pytz.AmbiguousTimeError:
# Randomly pick one - cause phucket. Welcome to chomp.
start_hour = deepcopy(start_day).replace(
hour=start, is_dst=False)
try:
stop_hour = start_hour + timedelta(hours=1)
except pytz.AmbiguousTimeError:
stop_hour = start_hour + timedelta(hours=1, is_dst=False)
# Find shift
current_staffing_level = 0
for shift in shifts:
shift_start = iso8601.parse_date(
shift.data.get("start")).replace(
tzinfo=self.default_tz)
shift_stop = iso8601.parse_date(
shift.data.get("stop")).replace(tzinfo=self.default_tz)
if ((shift_start <= start_hour and shift_stop > stop_hour)
or
(shift_start >= start_hour and shift_start < stop_hour)
or
(shift_stop > start_hour and shift_stop <= stop_hour)):
# increment staffing level during that bucket
current_staffing_level += 1
logger.debug("Current staffing level at day %s time %s is %s",
day, start, current_staffing_level)
demand_copy[day][start] -= current_staffing_level
# demand cannot be less than zero
if demand_copy[day][start] < 0:
demand_copy[day][start] = 0
logger.info("Demand minus existing shifts: %s", demand_copy)
self.demand = demand_copy
评论列表
文章目录