from math import exp, ceil, floor
from pyworkforce.utils import ParameterGrid
from joblib import Parallel, delayed
[docs]class ErlangC:
"""
Computes the number of positions required to attend a number of transactions in a
queue system based on erlangc.rst. Implementation inspired on:
https://lucidmanager.org/data-science/call-centre-workforce-planning-erlang-c-in-r/
Parameters
----------
transactions: float,
The number of total transactions that comes in an interval.
aht: float,
Average handling time of a transaction (minutes).
asa: float,
The required average speed of answer (minutes).
interval: int,
Interval length (minutes) where the transactions come in
shrinkage: float,
Percentage of time that an operator unit is not available.
"""
def __init__(self, transactions: float, aht: float, asa: float,
interval: int, shrinkage=0.0,
**kwargs):
if transactions <= 0:
raise ValueError("transactions can't be smaller or equals than 0")
if aht <= 0:
raise ValueError("aht can't be smaller or equals than 0")
if asa <= 0:
raise ValueError("asa can't be smaller or equals than 0")
if interval <= 0:
raise ValueError("interval can't be smaller or equals than 0")
if shrinkage < 0 or shrinkage >= 1:
raise ValueError("shrinkage must be between in the interval [0,1)")
self.n_transactions = transactions
self.aht = aht
self.interval = interval
self.asa = asa
self.intensity = (self.n_transactions / self.interval) * self.aht
self.shrinkage = shrinkage
[docs] def waiting_probability(self, positions: int, scale_positions: bool = False):
"""
Returns the probability of waiting in the queue
Parameters
----------
positions: int,
The number of positions to attend the transactions.
scale_positions: bool, default=False
Set it to True if the positions were calculated using shrinkage.
"""
if scale_positions:
productive_positions = floor((1 - self.shrinkage) * positions)
else:
productive_positions = positions
erlang_b_inverse = 1
for position in range(1, productive_positions + 1):
erlang_b_inverse = 1 + (erlang_b_inverse * position / self.intensity)
erlang_b = 1 / erlang_b_inverse
return productive_positions * erlang_b / (productive_positions - self.intensity * (1 - erlang_b))
[docs] def service_level(self, positions: int, scale_positions: bool = False):
"""
Returns the expected service level given a number of positions
Parameters
----------
positions: int,
The number of positions attending.
scale_positions: bool, default = False
Set it to True if the positions were calculated using shrinkage.
"""
if scale_positions:
productive_positions = floor((1 - self.shrinkage) * positions)
else:
productive_positions = positions
probability_wait = self.waiting_probability(productive_positions, scale_positions=False)
exponential = exp(-(productive_positions - self.intensity) * (self.asa / self.aht))
return max(0, 1 - (probability_wait * exponential))
[docs] def achieved_occupancy(self, positions: int, scale_positions: bool = False):
"""
Returns the expected occupancy of positions
Parameters
----------
positions: int,
The number of raw positions
scale_positions: bool, default=False
Set it to True if the positions were calculated using shrinkage.
"""
if scale_positions:
productive_positions = floor((1 - self.shrinkage) * positions)
else:
productive_positions = positions
return self.intensity / productive_positions
[docs] def required_positions(self, service_level: float, max_occupancy: float = 1.0):
"""
Computes the requirements using erlangc.rst
Parameters
----------
service_level: float,
Target service level
max_occupancy: float,
The maximum fraction of time that a transaction can occupy a position
Returns
-------
raw_positions: int,
The required positions assuming shrinkage = 0
positions: int,
The number of positions needed to ensure the required service level
service_level: float,
The fraction of transactions that are expected to be assigned to a position,
before the asa time
occupancy: float,
The expected occupancy of positions
waiting_probability: float,
The probability of a transaction waiting in the queue
"""
if service_level < 0 or service_level > 1:
raise ValueError("service_level must be between 0 and 1")
if max_occupancy < 0 or max_occupancy > 1:
raise ValueError("max_occupancy must be between 0 and 1")
positions = round(self.intensity + 1)
achieved_service_level = self.service_level(positions, scale_positions=False)
while achieved_service_level < service_level:
positions += 1
achieved_service_level = self.service_level(positions, scale_positions=False)
achieved_occupancy = self.achieved_occupancy(positions, scale_positions=False)
raw_positions = ceil(positions)
if achieved_occupancy > max_occupancy:
raw_positions = ceil(self.intensity / max_occupancy)
achieved_occupancy = self.achieved_occupancy(raw_positions)
achieved_service_level = self.service_level(raw_positions)
waiting_probability = self.waiting_probability(positions=raw_positions)
positions = ceil(raw_positions / (1 - self.shrinkage))
return {"raw_positions": raw_positions,
"positions": positions,
"service_level": achieved_service_level,
"occupancy": achieved_occupancy,
"waiting_probability": waiting_probability}
[docs]class MultiErlangC:
"""
This class uses the erlangc.rst class using joblib's Parallel,
allowing to run multiple scenarios at once.
It finds solutions iterating over all possible combinations provided by the users,
inspired how Sklearn's Grid Search works
Parameters
----------
param_grid: dict,
Dictionary with the erlangc.rst.__init__ parameters, each key of the dictionary must be the
expected parameter and the value must be a list with the different options to iterate
example: {"transactions": [100, 200], "aht": [3], "interval": [30], "asa": [20 / 60], "shrinkage": [0.3]}
n_jobs: int, default=2
The maximum number of concurrently running jobs.
If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging.
For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used.
None is a marker for ‘unset’ that will be interpreted as n_jobs=1 (sequential execution)
unless the call is performed under a parallel_backend() context manager that sets another value for n_jobs.
pre_dispatch: {"all", int, or expression}, default='2 * n_jobs'
The number of batches (of tasks) to be pre-dispatched. Default is ‘2*n_jobs’.
See joblib's documentation for more details: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html
Attributes
----------
waiting_probability_params: list[tuple],
Each tuple of the list represents the used parameters in param_grid for ErlangC and
arguments_grid for waiting_probability method,corresponding to the same order returned
by the MultiErlangC.waiting_probability method.
service_level_params: list[tuple],
Each tuple of the list represents the used parameters in param_grid for ErlangC and
arguments_grid for service_level method,corresponding to the same order returned
by the MultiErlangC.service_level method.
achieved_occupancy_params: list[tuple],
Each tuple of the list represents the used parameters in param_grid for ErlangC and
arguments_grid for achieved_occupancy method,corresponding to the same order returned
by the MultiErlangC.achieved_occupancy method.
required_positions_params: list[tuple],
Each tuple of the list represents the used parameters in param_grid for ErlangC and
arguments_grid for required_positions method,corresponding to the same order returned
by the MultiErlangC.required_positions method.
"""
def __init__(self, param_grid: dict, n_jobs: int = 2, pre_dispatch: str = '2 * n_jobs'):
self.param_grid = param_grid
self.n_jobs = n_jobs
self.pre_dispatch = pre_dispatch
self.param_list = list(ParameterGrid(self.param_grid))
self.waiting_probability_params = None
self.service_level_params = None
self.achieved_occupancy_params = None
self.required_positions_params = None
[docs] def waiting_probability(self, arguments_grid):
"""
Returns the probability of waiting in the queue
Returns a list with the solution to all the possible combinations from the arguments_grid
and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.waiting_probability parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"positions": [10, 20, 30], "scale_positions": [True, False]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
self.waiting_probability_params = [(erlang_params, wait_params)
for erlang_params in self.param_list
for wait_params in arguments_list]
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).waiting_probability)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
[docs] def service_level(self, arguments_grid):
"""
Returns the expected service level given a number of positions
Returns a list with the solution to all the possible combinations from the arguments_grid
and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.service_level parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"positions": [10, 20, 30], "scale_positions": [True, False]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
self.service_level_params = [(erlang_params, sl_params)
for erlang_params in self.param_list
for sl_params in arguments_list]
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).service_level)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
[docs] def achieved_occupancy(self, arguments_grid):
"""
Returns the expected occupancy of positions
Returns a list with the solution to all the possible combinations from the arguments_grid
and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.achieved_occupancy parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"positions": [10, 20, 30], "scale_positions": [True, False]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).achieved_occupancy)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
[docs] def required_positions(self, arguments_grid):
"""
Computes the requirements using MultiErlangC
Returns a list with the solution to all the possible combinations from the arguments_grid and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.achieved_occupancy parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"service_level": [0.85, 0.9], "max_occupancy": [0.8, 0.95]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).required_positions)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
def _check_solutions(self, solutions, combinations):
"""
Checks the integrity of the solution in terms of dimensions
"""
if len(solutions) < 1: # noqa
raise ValueError("Could not find any solution, make sure the param_grid is defined correctly")
if len(solutions) != combinations:
raise ValueError('Inconsistent results. Expected {} '
'solutions, got {}'
.format(len(self.param_list),
len(solutions))) # noqa