def _process_task(self, task):
# 1. Fetch schedule
self.org = self.client.get_organization(
task.data.get("organization_id"))
self.loc = self.org.get_location(task.data.get("location_id"))
self.role = self.loc.get_role(task.data.get("role_id"))
self.sched = self.role.get_schedule(task.data.get("schedule_id"))
self._compute_demand()
self._subtract_existing_shifts_from_demand()
# Run the calculation
s = Splitter(self.demand,
self.sched.data.get("min_shift_length_hour"),
self.sched.data.get("max_shift_length_hour"))
s.calculate()
s.efficiency()
# Naive becuase not yet datetimes
naive_shifts = s.get_shifts()
logger.info("Starting upload of %s shifts", len(naive_shifts))
local_start_time = self._get_local_start_time()
for shift in naive_shifts:
# We have to think of daylight savings time here, so we need to
# guarantee that we don't have any errors. We do this by overshooting
# the timedelta by an extra two hours, then rounding back to midnight.
logger.debug("Processing shift %s", shift)
start_day = normalize_to_midnight(
deepcopy(local_start_time) + timedelta(days=shift["day"]))
# Beware of time changes - duplicate times are possible
try:
start = start_day.replace(hour=shift["start"])
except pytz.AmbiguousTimeError:
# Randomly pick one. Minor tech debt.
start = start_day.replace(hour=shift["start"], is_dst=False)
stop = start + timedelta(hours=shift["length"])
# Convert to the strings we are passing up to the cLoUd
utc_start_str = start.astimezone(self.default_tz).isoformat()
utc_stop_str = stop.astimezone(self.default_tz).isoformat()
logger.info("Creating shift with start %s stop %s", start, stop)
self.role.create_shift(start=utc_start_str, stop=utc_stop_str)
评论列表
文章目录