- the 0.1.0 version of the timeoutpool package implementing a process pool with timeout is released
- 100% test coverage
- 10.0 PEP8 conformancy (by pylint)
The package implements a distributed processing pool with the possibility to time-out each job independently.
The use-case to be solved by the package: there is an algorithm to be executed (potentially on various datasets) with various parameterizations. The goal is to discover the operation/limitations of the algorithm under various circumstances. Some parameterizations or some special datasets might lead to extremely long runtimes. The user is interested in the outcome of those cases when the algorithm finishes in N seconds.
In order to utilize all available resources (usually more than one CPU cores), some multiprocessing solution is needed, which continouosly monitors the execution time of the jobs, and if the runtime of a job exceeds the timeout limit, shuts it down and starts the next job immediately.
For various reasons, we found that this functionality is not available out-of-the-box in the popular distributed computing toolkits of Python, like multiprocessing, joblib, concurrent.futures. Although these packages have some timeout functionalities, they are not applied at the job/process level, but for an entire set or pool of jobs, timing out the entire batch even if one job times out.
The timeoutpool package implements the aforementioned functionality in terms of threads from the threading package and processes from multiprocessing package. The user can specify a list of jobs to be executed (through an iterator or generator), the number of worker processes (N) and the timeout limit (T). The TimeoutPool object then starts N threads pooling the jobs to N worker processes, each monitored by watchdog threads. If the runtime of a job exceeds T, the process is killed and None is returned or some predefined method is executed.
For a complete documentation see https://timeoutpool.readthedocs.io/en/latest/
In this example the TimeoutJobBase interface is used to execute jobs in parallel, each subject to timeout.
First we import the neccessary classes and the time module.
import time
from timeoutpool import TimeoutPool, TimeoutJobBase
Creating a job which has one argument: the sleep time. The function execute implements the job to be carried out, in this case sleeping until a specific time. The return value of the execute function is the result of the job, in this case a dictionary, however, any return type can be used. The function timeout is called when the function execute is timed out. The role of the function timeout is to return some default result for the timed-out job.
class SleepJob(TimeoutJobBase):
def __init__(self, sleep):
"""
Constructor of the job
Args:
sleep (float): the time to sleep
"""
TimeoutJobBase.__init__(self)
self.sleep = sleep
def execute(self):
"""
The job to be executed.
Returns:
dict: the result of the job
"""
begin = time.time()
time.sleep(self.sleep)
return {'sleep': self.sleep, 'slept': f'{time.time() - begin:.2f}'}
def timeout(self):
"""
The default result returned when the job times out.
Returns:
dict: the default result
"""
return {'sleep': self.sleep, 'slept': None}
Then, 4 jobs are instantiated. Alternatively, one could use a generator as well instead of a list.
jobs = [SleepJob(sleep) for sleep in range(1, 5)]
The TimeoutPool is created to execute jobs, in this case using 2 worker processes, each one subject to timeout with 2.5 seconds.
tpool = TimeoutPool(n_jobs=2, timeout=2.5)
Finally, the jobs are executed in parallel calling the execute method of the pool object.
begin = time.time()
results = tpool.execute(jobs)
print(results)
# [{'sleep': 1, 'slept': '1.00'}, {'sleep': 2, 'slept': '2.00'},
# {'sleep': 3, 'slept': None}, {'sleep': 4, 'slept': None}]
print(f'runtime: {time.time() - begin:.2f}')
# runtime 4.52
What happens here is that there are 2 worker processes, the first two jobs start executing the first two jobs, with sleep parameter 1 and 2, these are finishing successfully as the sleep times are less than the timeout 2.5s. As soon as the first job finishes after about 1s, another process is started to execute the job with the sleep parameter 3s, and similarly, after 2s when the second job finishes successfully, the fourth job is started with the sleep parameter 4s. Both of these jobs time out after 2.5s, therefore, the complete runtime must be around 4.5 seconds. The results reflect that the first two jobs returned the dictionary by the function execute, while the jobs that timed out returned the dictionary by the function timeout.
The timeoutpool package also offers an apply-like interface, that do not need the definition of job classes. The drawback of this approach is that when timeout happens, the return value for the job is None, which, in many cases is less useful than returning similar objects for both successful and timed out jobs.
In this example, we illustrate the use of the apply interface through the same sleeping job example.
Again, we import the neccessary class and the time module.
import time
from timeoutpool import TimeoutPool
In this case the jobs are represented in a list, like a list of sleep parameters. Altenatively, one could use a generator as well.
jobs = list(range(1, 5))
Then, we define the job as a function which can take one arbitrary parameter.
def job(sleep):
"""
The job to be executed
Returns:
dict: the result of the job
"""
begin = time.time()
time.sleep(sleep)
return {'sleep': sleep, 'slept': f'{time.time() - begin:.2f}'}
The TimeoutPool is created to execute jobs, in this case using 2 worker processes, each one subject to timeout with 2.5 seconds.
tpool = TimeoutPool(n_jobs=2, timeout=2.5)
Finally, the jobs are executed in parallel calling the apply method of the pool object.
begin = time.time()
results = tpool.apply(job, jobs)
print(results)
# [{'sleep': 1, 'slept': '1.00'}, {'sleep': 2, 'slept': '2.00'},
# None, None]
print(f'runtime: {time.time() - begin:.2f}')
# runtime 4.52
What happens in the background is similar to the case of using the SleepJob, however, in this case, the jobs timed out return simply None.
In this example we collect the PIDs (process ID) of the processes started, and use a generator to specify the jobs.
First, the neccessary modules are imported.
import os
from timeoutpool import TimeoutPool
Then, the job function (worker) is defined. Note that the job function always gets the job specification as an argument, therefore, even if it does not use its argument, it needs to be able to get it.
def worker(*_args):
return os.getpid()
Finally, the TimeoutPool is instantiated and the parallel execution of jobs is tarted.
tpool = TimeoutPool(n_jobs=2, timeout=1)
results = tpool.apply(worker, range(0, 10))
print(results)
# [29644, 29647, 29651, 29654, 29657, 29660, 29663, 29666, 29669, 29672]
Naturally, all process IDs are different as each job is executed in a separate process.
In this example random forests with various parameterizations are fitted to a dummy dataset. This example is closely related to the use case the package is designed for. Various parameterizations of a random forest can lead to extremely long, unacceptable execution times. Testing the classifier with various parameterizations and a timeout limit can rationalize the evaluation of the classifier on a dataset.
First, we import the necessary packages.
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from timeoutpool import TimeoutJobBase, TimeoutPool
The dummy dataset consists of 1000 rows of 5 random features.
N = 1000
N_DIM = 5
X = np.random.random_sample((N, N_DIM))
y = np.random.randint(2, size=(N))
We intend to test 6 parameterizations. Note that the larger the depth parameter is and the more estimators are used, the longer it takes to fit the forest. These 6 parameterizations are on the edge of 1 second, however, on larger datasets with even more estimators and deeper trees one can expect that some parameterizations can fit in seconds, others in hours. For the experiment, the number of jobs the random forest uses is set to 1 in order to avoid to inefficiency of excessive parallelization.
rf_params = [
{'max_depth': 3, 'n_estimators': 10, 'n_jobs': 1, 'random_state': 5},
{'max_depth': 3, 'n_estimators': 100, 'n_jobs': 1, 'random_state': 5},
{'max_depth': 3, 'n_estimators': 1000, 'n_jobs': 1, 'random_state': 5},
{'max_depth': 5, 'n_estimators': 10, 'n_jobs': 1, 'random_state': 5},
{'max_depth': 5, 'n_estimators': 100, 'n_jobs': 1, 'random_state': 5},
{'max_depth': 5, 'n_estimators': 1000, 'n_jobs': 1, 'random_state': 5}]
The RFJob object carries out the fitting and evaluation. The use of the timeout function enables the returning of some well-formed responses. The benefit of the approach over the .apply interface is that timeout would return None when the operation times out, which would require further operations to figure out which jobs have timed out.
class RFJob(TimeoutJobBase):
"""
A job fitting and evaluating a random forest parameterization
"""
def __init__(self, params):
"""
The constructor of the job
Args:
params (dict): a random forest parameterization
"""
self.params = params
def execute(self):
"""
Executes the job
Returns:
dict: the result of the evaluation
"""
rf = RandomForestClassifier(**self.params)
preds = rf.fit(X, y).predict_proba(X)
return {'auc': roc_auc_score(y, preds[:, 1]),
'timeout': False,
'params': self.params}
def timeout(self):
"""
The method called when a job times out
Returns:
dict: the timeout result
"""
return {'auc': -1,
'timeout': True,
'params': self.params}
Finally, the TimeoutPool is instantiated and the jobs are executed through a generator. The timeout value of 1s is for the sake of a quick example, in real problems many more parameterizations and higher timeout limit values could be used.
tpool = TimeoutPool(n_jobs=2, timeout=1)
results = tpool.execute((RFJob(param) for param in rf_params))
pd.set_option('display.max_colwidth', 200)
print(pd.DataFrame(results))
# auc timeout params
#0 0.643166 False {'max_depth': 3, 'n_estimators': 10, 'n_jobs': 1, 'random_state': 5}
#1 0.711018 False {'max_depth': 3, 'n_estimators': 100, 'n_jobs': 1, 'random_state': 5}
#2 -1.000000 True {'max_depth': 3, 'n_estimators': 1000, 'n_jobs': 1, 'random_state': 5}
#3 0.753508 False {'max_depth': 5, 'n_estimators': 10, 'n_jobs': 1, 'random_state': 5}
#4 0.842643 False {'max_depth': 5, 'n_estimators': 100, 'n_jobs': 1, 'random_state': 5}
#5 -1.000000 True {'max_depth': 5, 'n_estimators': 1000, 'n_jobs': 1, 'random_state': 5}
Due to the structured outcome in the timeout cases, the results can be summarized easily. The entire operation took less then 3 seconds (executing 2 jobs in parallel 3 times), even though fitting with 1000 estimators could take several seconds.
Any contribution is welcome!
- fork the GitHub repository at http://github.com/gykovacs/timeoutpool,
- open a discussion on GitHub,
- or contact me at [email protected].