import numpy as np
import pandas as pd
from ortools.sat.python import cp_model
from pyworkforce.scheduling.base import BaseShiftScheduler
[docs]
class MinAbsDifference(BaseShiftScheduler):
def __init__(self, num_days: int,
periods: int,
shifts_coverage: dict,
required_resources: list,
max_period_concurrency: int,
max_shift_concurrency: int,
max_search_time: float = 120.0,
num_search_workers=2,
*args, **kwargs):
"""
Minimizes the total absolute difference between required resources per
period and resources scheduled by the solver.
Parameters
----------
num_days: int,
Number of days to schedule.
periods: int,
Number of working periods in a day.
shifts_coverage: dict,
Dictionary of the form ``{"shift_name": shift_array}``, where each
``shift_array`` has length ``periods`` and uses 1 when the shift
covers a period, otherwise 0.
required_resources: list,
Array of size ``[days, periods]``.
max_period_concurrency: int,
Maximum resources allowed in any period and day.
max_shift_concurrency: int,
Maximum resources allowed in the same shift.
max_search_time: float, default = 240
Maximum time, in seconds, to search for a solution.
num_search_workers: int, default = 2
Number of workers used to search for a solution.
"""
super().__init__(num_days,
periods,
shifts_coverage,
required_resources,
max_period_concurrency,
max_shift_concurrency,
max_search_time,
num_search_workers)
[docs]
def solve(self):
"""
Runs the optimization solver
Returns
-------
solution: dict,
Dictionary with optimization status, scheduled resources by day and
shift, and final objective value.
"""
sch_model = cp_model.CpModel()
# Resources: Number of resources assigned in day d to shift s
resources = np.empty(shape=(self.num_days, self.num_shifts), dtype='object')
# transition resources: Variable to change domain coordinates from min |x-a|
# to min t, s.t t>= x-a and t>= a-x
transition_resources = np.empty(shape=(self.num_days, self.num_periods), dtype='object')
# Resources
for d in range(self.num_days):
for s in range(self.num_shifts):
resources[d][s] = sch_model.NewIntVar(0, self.max_shift_concurrency, f'resources_d{d}s{s}')
for d in range(self.num_days):
for p in range(self.num_periods):
transition_resources[d][p] = sch_model.NewIntVar(-self.max_period_concurrency,
self.max_period_concurrency,
f'transition_resources_d{d}p{p}')
# Constrains
# transition must be between x-a and a-x
for d in range(self.num_days):
for p in range(self.num_periods):
sch_model.Add(transition_resources[d][p] >= (
sum(resources[d][s] * self.shifts_coverage_matrix[s][p] for s in range(self.num_shifts)) -
self.required_resources[d][p]))
sch_model.Add(transition_resources[d][p] >= (self.required_resources[d][p]
- sum(resources[d][s] * self.shifts_coverage_matrix[s][p]
for s in range(self.num_shifts))))
# Total programmed resources must be less than or equal to max_period_concurrency for each day and period
for d in range(self.num_days):
for p in range(self.num_periods):
sch_model.Add(
sum(resources[d][s] * self.shifts_coverage_matrix[s][p] for s in range(self.num_shifts)) <=
self.max_period_concurrency)
# Objective Function: Minimize the absolute value of the difference between required and shifted resources
sch_model.Minimize(
sum(transition_resources[d][p] for d in range(self.num_days) for p in range(self.num_periods)))
self.solver.parameters.max_time_in_seconds = self.max_search_time
self.solver.num_search_workers = self.num_search_workers
self.status = self.solver.Solve(sch_model)
if self.status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
resources_shifts = []
for d in range(self.num_days):
for s in range(self.num_shifts):
resources_shifts.append({
"day": d,
"shift": self.shifts[s],
"resources": self.solver.Value(resources[d][s])})
solution = {"status": self.solver.StatusName(self.status),
"cost": self.solver.ObjectiveValue(),
"resources_shifts": resources_shifts}
else:
solution = {"status": self.solver.StatusName(self.status),
"cost": -1,
"resources_shifts": [{'day': -1, 'shift': 'Unknown', 'resources': -1}]}
return solution
[docs]
class MinRequiredResources(BaseShiftScheduler):
def __init__(self, num_days: int,
periods: int,
shifts_coverage: dict,
required_resources: list,
max_period_concurrency: int,
max_shift_concurrency: int,
cost_dict: dict = None,
max_search_time: float = 240.0,
num_search_workers: int = 2,
*args, **kwargs):
"""
Minimizes the weighted number of scheduled resources while ensuring that
every period has at least the required number of resources.
Parameters
----------
num_days: int,
Number of days to schedule.
periods: int,
Number of working periods in a day.
shifts_coverage: dict,
Dictionary of the form ``{"shift_name": shift_array}``, where each
``shift_array`` has length ``periods`` and uses 1 when the shift
covers a period, otherwise 0.
required_resources: list,
Array of size ``[days, periods]``.
max_period_concurrency: int,
Maximum resources allowed in any period and day.
max_shift_concurrency: int,
Maximum resources allowed in the same shift.
cost_dict: dict, default = None
Dictionary of the form ``{shift: cost_value}``. It must contain the
same shifts as ``shifts_coverage``.
max_search_time: float, default = 240
Maximum time, in seconds, to search for a solution.
num_search_workers: int, default = 2
Number of workers used to search for a solution.
"""
super().__init__(num_days,
periods,
shifts_coverage,
required_resources,
max_period_concurrency,
max_shift_concurrency,
max_search_time,
num_search_workers)
if cost_dict is None:
self.cost_dict = dict.fromkeys(self.shifts, 1)
else:
self.cost_dict = cost_dict
if set(sorted(self.shifts)) == set(sorted(list(self.cost_dict.keys()))):
self.df_cost_matrix = pd.DataFrame.from_records([self.cost_dict])
else:
raise KeyError('cost_dict must have the same keys as shifts_coverage')
[docs]
def solve(self):
"""
Runs the optimization solver
Returns
-------
solution: dict,
Dictionary with the status on the optimization, the resources to schedule per day and the
final value of the cost function
"""
sch_model = cp_model.CpModel()
# Resources: Number of resources assigned in day d to shift s
resources = np.empty(shape=(self.num_days, self.num_shifts), dtype='object')
# Resources
for d in range(self.num_days):
for s in range(self.num_shifts):
resources[d][s] = sch_model.NewIntVar(0, self.max_shift_concurrency, f'resources_d{d}s{s}')
# Constrains
# Total programmed resources in day d and period p, must be greater or equals that required resources in d, p
for d in range(self.num_days):
for p in range(self.num_periods):
sch_model.Add(sum(resources[d][s] * self.shifts_coverage_matrix[s][p]
for s in range(self.num_shifts)) >= self.required_resources[d][p])
# Total programmed resources must be less than or equal to max_period_concurrency for each day and period
for d in range(self.num_days):
for p in range(self.num_periods):
sch_model.Add(
sum(resources[d][s] * self.shifts_coverage_matrix[s][p]
for s in range(self.num_shifts)) <= self.max_period_concurrency)
# Objective Function: Minimize the total shifted resources
sch_model.Minimize(sum(resources[d][s] * self.df_cost_matrix[self.shifts[s]].item()
for d in range(self.num_days)
for s in range(self.num_shifts)))
self.solver.parameters.max_time_in_seconds = self.max_search_time
self.solver.num_search_workers = self.num_search_workers
self.status = self.solver.Solve(sch_model)
if self.status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
resources_shifts = []
for d in range(self.num_days):
for s in range(self.num_shifts):
resources_shifts.append({
"day": d,
"shift": self.shifts[s],
"resources": self.solver.Value(resources[d][s])})
solution = {"status": self.solver.StatusName(self.status),
"cost": self.solver.ObjectiveValue(),
"resources_shifts": resources_shifts}
else:
solution = {"status": self.solver.StatusName(self.status),
"cost": -1,
"resources_shifts": [{'day': -1, 'shift': 'Unknown', 'resources': -1}]}
return solution