from math import exp, ceil, floor
from pyworkforce.utils import ParameterGrid
from joblib import Parallel, delayed
[docs]
class ErlangC:
"""
Computes the number of positions required to handle transactions in an
Erlang C queue system. Implementation inspired by:
https://lucidmanager.org/data-science/call-centre-workforce-planning-erlang-c-in-r/
Parameters
----------
transactions: float,
Total number of transactions arriving in the interval.
aht: float,
Average handling time of a transaction (minutes).
asa: float,
The required average speed of answer (minutes).
interval: int,
Interval length, in minutes.
shrinkage: float,
Fraction of time that an operator unit is not available.
"""
def __init__(self, transactions: float, aht: float, asa: float,
interval: int, shrinkage=0.0,
**kwargs):
if transactions <= 0:
raise ValueError("transactions can't be smaller or equals than 0")
if aht <= 0:
raise ValueError("aht can't be smaller or equals than 0")
if asa <= 0:
raise ValueError("asa can't be smaller or equals than 0")
if interval <= 0:
raise ValueError("interval can't be smaller or equals than 0")
if shrinkage < 0 or shrinkage >= 1:
raise ValueError("shrinkage must be between in the interval [0,1)")
self.n_transactions = transactions
self.aht = aht
self.interval = interval
self.asa = asa
self.intensity = (self.n_transactions / self.interval) * self.aht
self.shrinkage = shrinkage
def _productive_positions(self, positions: int, scale_positions: bool = False):
if scale_positions:
productive_positions = floor((1 - self.shrinkage) * positions)
else:
productive_positions = positions
if productive_positions <= 0:
raise ValueError("productive positions must be greater than 0")
if productive_positions <= self.intensity:
raise ValueError("positions must be greater than traffic intensity")
return productive_positions
[docs]
def waiting_probability(self, positions: int, scale_positions: bool = False):
"""
Returns the probability that a transaction waits in queue.
Parameters
----------
positions: int,
The number of positions available to handle transactions.
Productive positions must be greater than traffic intensity.
scale_positions: bool, default=False
Set to True when ``positions`` includes shrinkage.
"""
productive_positions = self._productive_positions(positions, scale_positions)
erlang_b_inverse = 1
for position in range(1, productive_positions + 1):
erlang_b_inverse = 1 + (erlang_b_inverse * position / self.intensity)
erlang_b = 1 / erlang_b_inverse
return productive_positions * erlang_b / (productive_positions - self.intensity * (1 - erlang_b))
[docs]
def service_level(self, positions: int, scale_positions: bool = False):
"""
Returns the expected service level for a number of positions.
Parameters
----------
positions: int,
The number of positions available to handle transactions.
Productive positions must be greater than traffic intensity.
scale_positions: bool, default = False
Set to True when ``positions`` includes shrinkage.
"""
productive_positions = self._productive_positions(positions, scale_positions)
probability_wait = self.waiting_probability(productive_positions, scale_positions=False)
exponential = exp(-(productive_positions - self.intensity) * (self.asa / self.aht))
return max(0, 1 - (probability_wait * exponential))
[docs]
def achieved_occupancy(self, positions: int, scale_positions: bool = False):
"""
Returns the expected occupancy of positions.
Parameters
----------
positions: int,
The number of raw positions.
Productive positions must be greater than traffic intensity.
scale_positions: bool, default=False
Set to True when ``positions`` includes shrinkage.
"""
productive_positions = self._productive_positions(positions, scale_positions)
return self.intensity / productive_positions
[docs]
def required_positions(self, service_level: float, max_occupancy: float = 1.0):
"""
Computes the required positions for a target service level.
Parameters
----------
service_level: float,
Target service level.
max_occupancy: float,
The maximum fraction of time that a transaction can occupy a position.
Must be greater than 0 and less than or equal to 1.
Returns
-------
raw_positions: int,
Required positions before applying shrinkage.
positions: int,
Positions needed after applying shrinkage.
service_level: float,
Fraction of transactions expected to reach a position before the target ASA.
occupancy: float,
Expected occupancy of positions.
waiting_probability: float,
Probability that a transaction waits in queue.
"""
if service_level < 0 or service_level > 1:
raise ValueError("service_level must be between 0 and 1")
if max_occupancy < 0 or max_occupancy > 1:
raise ValueError("max_occupancy must be between 0 and 1")
if max_occupancy == 0:
raise ValueError("max_occupancy must be greater than 0")
positions = round(self.intensity + 1)
achieved_service_level = self.service_level(positions, scale_positions=False)
while achieved_service_level < service_level:
positions += 1
achieved_service_level = self.service_level(positions, scale_positions=False)
achieved_occupancy = self.achieved_occupancy(positions, scale_positions=False)
raw_positions = ceil(positions)
if achieved_occupancy > max_occupancy:
raw_positions = ceil(self.intensity / max_occupancy)
achieved_occupancy = self.achieved_occupancy(raw_positions)
achieved_service_level = self.service_level(raw_positions)
waiting_probability = self.waiting_probability(positions=raw_positions)
positions = ceil(raw_positions / (1 - self.shrinkage))
return {"raw_positions": raw_positions,
"positions": positions,
"service_level": achieved_service_level,
"occupancy": achieved_occupancy,
"waiting_probability": waiting_probability}
[docs]
class MultiErlangC:
"""
Runs Erlang C calculations over multiple parameter combinations.
This class uses joblib's ``Parallel`` to evaluate every combination from
``param_grid`` and the method-specific argument grid. Its interface is
inspired by scikit-learn's grid search utilities.
Parameters
----------
param_grid: dict,
Dictionary with :class:`ErlangC` initialization parameters. Each key must be an
expected parameter, and each value must be a list of options to iterate over.
example: {"transactions": [100, 200], "aht": [3], "interval": [30], "asa": [20 / 60], "shrinkage": [0.3]}
n_jobs: int, default=2
Maximum number of concurrently running jobs.
If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging.
For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used.
None is a marker for ‘unset’ that will be interpreted as n_jobs=1 (sequential execution)
unless the call is performed under a parallel_backend() context manager that sets another value for n_jobs.
pre_dispatch: {"all", int, or expression}, default='2 * n_jobs'
Number of task batches to pre-dispatch. Default is ``2*n_jobs``.
See joblib's documentation for more details: https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html
Attributes
----------
waiting_probability_params: list[tuple],
Parameters used for each ``waiting_probability`` result, in result order.
service_level_params: list[tuple],
Parameters used for each ``service_level`` result, in result order.
achieved_occupancy_params: list[tuple],
Parameters used for each ``achieved_occupancy`` result, in result order.
required_positions_params: list[tuple],
Parameters used for each ``required_positions`` result, in result order.
"""
def __init__(self, param_grid: dict, n_jobs: int = 2, pre_dispatch: str = '2 * n_jobs'):
self.param_grid = param_grid
self.n_jobs = n_jobs
self.pre_dispatch = pre_dispatch
self.param_list = list(ParameterGrid(self.param_grid))
self.waiting_probability_params = None
self.service_level_params = None
self.achieved_occupancy_params = None
self.required_positions_params = None
[docs]
def waiting_probability(self, arguments_grid):
"""
Returns the probability of waiting in the queue
Returns a list with the solution to all the possible combinations from the arguments_grid
and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.waiting_probability parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"positions": [10, 20, 30], "scale_positions": [True, False]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
self.waiting_probability_params = [(erlang_params, wait_params)
for erlang_params in self.param_list
for wait_params in arguments_list]
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).waiting_probability)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
[docs]
def service_level(self, arguments_grid):
"""
Returns the expected service level given a number of positions
Returns a list with the solution to all the possible combinations from the arguments_grid
and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.service_level parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"positions": [10, 20, 30], "scale_positions": [True, False]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
self.service_level_params = [(erlang_params, sl_params)
for erlang_params in self.param_list
for sl_params in arguments_list]
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).service_level)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
[docs]
def achieved_occupancy(self, arguments_grid):
"""
Returns the expected occupancy of positions
Returns a list with the solution to all the possible combinations from the arguments_grid
and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.achieved_occupancy parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"positions": [10, 20, 30], "scale_positions": [True, False]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).achieved_occupancy)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
[docs]
def required_positions(self, arguments_grid):
"""
Computes the requirements using MultiErlangC
Returns a list with the solution to all the possible combinations from the arguments_grid and the erlangc.rst param_grid
Parameters
----------
arguments_grid: dict,
Dictionary with the erlangc.rst.achieved_occupancy parameters,
each key of the dictionary must be the expected parameter and
the value must be a list with the different options to iterate
example: {"service_level": [0.85, 0.9], "max_occupancy": [0.8, 0.95]}
"""
arguments_list = list(ParameterGrid(arguments_grid))
combinations = len(self.param_list) * len(arguments_list)
results = Parallel(n_jobs=self.n_jobs,
pre_dispatch=self.pre_dispatch)(delayed(ErlangC(**params).required_positions)(**arguments)
for params in self.param_list
for arguments in arguments_list)
self._check_solutions(results, combinations)
return results
def _check_solutions(self, solutions, combinations):
"""
Checks the integrity of the solution in terms of dimensions
"""
if len(solutions) < 1: # noqa
raise ValueError("Could not find any solution, make sure the param_grid is defined correctly")
if len(solutions) != combinations:
raise ValueError('Inconsistent results. Expected {} '
'solutions, got {}'
.format(combinations,
len(solutions))) # noqa