Source code for pyworkforce.queuing.erlang

from math import exp, ceil, floor
from pyworkforce.utils import ParameterGrid
from joblib import Parallel, delayed


[docs] class ErlangC: """ Computes the number of positions required to handle transactions in an Erlang C queue system. Implementation inspired by: https://lucidmanager.org/data-science/call-centre-workforce-planning-erlang-c-in-r/ Parameters ---------- transactions: float, Total number of transactions arriving in the interval. aht: float, Average handling time of a transaction (minutes). asa: float, The required average speed of answer (minutes). interval: int, Interval length, in minutes. shrinkage: float, Fraction of time that an operator unit is not available. """ def __init__(self, transactions: float, aht: float, asa: float, interval: int, shrinkage=0.0, **kwargs): if transactions <= 0: raise ValueError("transactions can't be smaller or equals than 0") if aht <= 0: raise ValueError("aht can't be smaller or equals than 0") if asa <= 0: raise ValueError("asa can't be smaller or equals than 0") if interval <= 0: raise ValueError("interval can't be smaller or equals than 0") if shrinkage < 0 or shrinkage >= 1: raise ValueError("shrinkage must be between in the interval [0,1)") self.n_transactions = transactions self.aht = aht self.interval = interval self.asa = asa self.intensity = (self.n_transactions / self.interval) * self.aht self.shrinkage = shrinkage def _productive_positions(self, positions: int, scale_positions: bool = False): if scale_positions: productive_positions = floor((1 - self.shrinkage) * positions) else: productive_positions = positions if productive_positions <= 0: raise ValueError("productive positions must be greater than 0") if productive_positions <= self.intensity: raise ValueError("positions must be greater than traffic intensity") return productive_positions
[docs] def waiting_probability(self, positions: int, scale_positions: bool = False): """ Returns the probability that a transaction waits in queue. Parameters ---------- positions: int, The number of positions available to handle transactions. Productive positions must be greater than traffic intensity. scale_positions: bool, default=False Set to True when ``positions`` includes shrinkage. """ productive_positions = self._productive_positions(positions, scale_positions) erlang_b_inverse = 1 for position in range(1, productive_positions + 1): erlang_b_inverse = 1 + (erlang_b_inverse * position / self.intensity) erlang_b = 1 / erlang_b_inverse return productive_positions * erlang_b / (productive_positions - self.intensity * (1 - erlang_b))
[docs] def service_level(self, positions: int, scale_positions: bool = False): """ Returns the expected service level for a number of positions. Parameters ---------- positions: int, The number of positions available to handle transactions. Productive positions must be greater than traffic intensity. scale_positions: bool, default = False Set to True when ``positions`` includes shrinkage. """ productive_positions = self._productive_positions(positions, scale_positions) probability_wait = self.waiting_probability(productive_positions, scale_positions=False) exponential = exp(-(productive_positions - self.intensity) * (self.asa / self.aht)) return max(0, 1 - (probability_wait * exponential))
[docs] def achieved_occupancy(self, positions: int, scale_positions: bool = False): """ Returns the expected occupancy of positions. Parameters ---------- positions: int, The number of raw positions. Productive positions must be greater than traffic intensity. scale_positions: bool, default=False Set to True when ``positions`` includes shrinkage. """ productive_positions = self._productive_positions(positions, scale_positions) return self.intensity / productive_positions
[docs] def required_positions(self, service_level: float, max_occupancy: float = 1.0): """ Computes the required positions for a target service level. Parameters ---------- service_level: float, Target service level. max_occupancy: float, The maximum fraction of time that a transaction can occupy a position. Must be greater than 0 and less than or equal to 1. Returns ------- raw_positions: int, Required positions before applying shrinkage. positions: int, Positions needed after applying shrinkage. service_level: float, Fraction of transactions expected to reach a position before the target ASA. occupancy: float, Expected occupancy of positions. waiting_probability: float, Probability that a transaction waits in queue. """ if service_level < 0 or service_level > 1: raise ValueError("service_level must be between 0 and 1") if max_occupancy < 0 or max_occupancy > 1: raise ValueError("max_occupancy must be between 0 and 1") if max_occupancy == 0: raise ValueError("max_occupancy must be greater than 0") positions = round(self.intensity + 1) achieved_service_level = self.service_level(positions, scale_positions=False) while achieved_service_level < service_level: positions += 1 achieved_service_level = self.service_level(positions, scale_positions=False) achieved_occupancy = self.achieved_occupancy(positions, scale_positions=False) raw_positions = ceil(positions) if achieved_occupancy > max_occupancy: raw_positions = ceil(self.intensity / max_occupancy) achieved_occupancy = self.achieved_occupancy(raw_positions) achieved_service_level = self.service_level(raw_positions) waiting_probability = self.waiting_probability(positions=raw_positions) positions = ceil(raw_positions / (1 - self.shrinkage)) return {"raw_positions": raw_positions, "positions": positions, "service_level": achieved_service_level, "occupancy": achieved_occupancy, "waiting_probability": waiting_probability}
[docs] class MultiErlangC: """ Runs Erlang C calculations over multiple parameter combinations. This class uses joblib's ``Parallel`` to evaluate every combination from ``param_grid`` and the method-specific argument grid. Its interface is inspired by scikit-learn's grid search utilities. Parameters ---------- param_grid: dict, Dictionary with :class:`ErlangC` initialization parameters. Each key must be an expected parameter, and each value must be a list of options to iterate over. example: {"transactions": [100, 200], "aht": [3], "interval": [30], "asa": [20 / 60], "shrinkage": [0.3]} n_jobs: int, default=2 Maximum number of concurrently running jobs. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used. None is a marker for ‘unset’ that will be interpreted as n_jobs=1 (sequential execution) unless the call is performed under a parallel_backend() context manager that sets another value for n_jobs. pre_dispatch: {"all", int, or expression}, default='2 * n_jobs' Number of task batches to pre-dispatch. Default is ``2*n_jobs``. See joblib's documentation for more details: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html Attributes ---------- waiting_probability_params: list[tuple], Parameters used for each ``waiting_probability`` result, in result order. service_level_params: list[tuple], Parameters used for each ``service_level`` result, in result order. achieved_occupancy_params: list[tuple], Parameters used for each ``achieved_occupancy`` result, in result order. required_positions_params: list[tuple], Parameters used for each ``required_positions`` result, in result order. """ def __init__(self, param_grid: dict, n_jobs: int = 2, pre_dispatch: str = '2 * n_jobs'): self.param_grid = param_grid self.n_jobs = n_jobs self.pre_dispatch = pre_dispatch self.param_list = list(ParameterGrid(self.param_grid)) self.waiting_probability_params = None self.service_level_params = None self.achieved_occupancy_params = None self.required_positions_params = None
[docs] def waiting_probability(self, arguments_grid): """ Returns the probability of waiting in the queue Returns a list with the solution to all the possible combinations from the arguments_grid and the erlangc.rst param_grid Parameters ---------- arguments_grid: dict, Dictionary with the erlangc.rst.waiting_probability parameters, each key of the dictionary must be the expected parameter and the value must be a list with the different options to iterate example: {"positions": [10, 20, 30], "scale_positions": [True, False]} """ arguments_list = list(ParameterGrid(arguments_grid)) self.waiting_probability_params = [(erlang_params, wait_params) for erlang_params in self.param_list for wait_params in arguments_list] combinations = len(self.param_list) * len(arguments_list) results = Parallel(n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).waiting_probability)(**arguments) for params in self.param_list for arguments in arguments_list) self._check_solutions(results, combinations) return results
[docs] def service_level(self, arguments_grid): """ Returns the expected service level given a number of positions Returns a list with the solution to all the possible combinations from the arguments_grid and the erlangc.rst param_grid Parameters ---------- arguments_grid: dict, Dictionary with the erlangc.rst.service_level parameters, each key of the dictionary must be the expected parameter and the value must be a list with the different options to iterate example: {"positions": [10, 20, 30], "scale_positions": [True, False]} """ arguments_list = list(ParameterGrid(arguments_grid)) self.service_level_params = [(erlang_params, sl_params) for erlang_params in self.param_list for sl_params in arguments_list] combinations = len(self.param_list) * len(arguments_list) results = Parallel(n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).service_level)(**arguments) for params in self.param_list for arguments in arguments_list) self._check_solutions(results, combinations) return results
[docs] def achieved_occupancy(self, arguments_grid): """ Returns the expected occupancy of positions Returns a list with the solution to all the possible combinations from the arguments_grid and the erlangc.rst param_grid Parameters ---------- arguments_grid: dict, Dictionary with the erlangc.rst.achieved_occupancy parameters, each key of the dictionary must be the expected parameter and the value must be a list with the different options to iterate example: {"positions": [10, 20, 30], "scale_positions": [True, False]} """ arguments_list = list(ParameterGrid(arguments_grid)) combinations = len(self.param_list) * len(arguments_list) results = Parallel(n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).achieved_occupancy)(**arguments) for params in self.param_list for arguments in arguments_list) self._check_solutions(results, combinations) return results
[docs] def required_positions(self, arguments_grid): """ Computes the requirements using MultiErlangC Returns a list with the solution to all the possible combinations from the arguments_grid and the erlangc.rst param_grid Parameters ---------- arguments_grid: dict, Dictionary with the erlangc.rst.achieved_occupancy parameters, each key of the dictionary must be the expected parameter and the value must be a list with the different options to iterate example: {"service_level": [0.85, 0.9], "max_occupancy": [0.8, 0.95]} """ arguments_list = list(ParameterGrid(arguments_grid)) combinations = len(self.param_list) * len(arguments_list) results = Parallel(n_jobs=self.n_jobs, pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).required_positions)(**arguments) for params in self.param_list for arguments in arguments_list) self._check_solutions(results, combinations) return results
def _check_solutions(self, solutions, combinations): """ Checks the integrity of the solution in terms of dimensions """ if len(solutions) < 1: # noqa raise ValueError("Could not find any solution, make sure the param_grid is defined correctly") if len(solutions) != combinations: raise ValueError('Inconsistent results. Expected {} ' 'solutions, got {}' .format(combinations, len(solutions))) # noqa