Source code for pyhdtoolkit.utils.executors

"""
.. _utils-executors:

Executors Utilities
-------------------

A module providing two classes to execute functions + arguments couples through either a
multiprocessing approach, or a multithreading approach. Before using these, here are a few
tidbits to keep in mind:

* There can only be one thread running at any given time in a python process because of the GIL.
* Multiprocessing is parallelism. Multithreading is concurrency.
* Multiprocessing is for increasing speed. Multithreading is for hiding latency.
* Multiprocessing is best for computations. Multithreading is best for IO.
* If you have CPU heavy tasks, use multiprocessing with `n_process = n_cores` and never more. Never.
* If you have IO heavy tasks, use multithreading with `n_threads = m * n_cores` with `m` a number
  bigger than 1 that you can tweak on your own. Try many values and choose the one with the best
  speedup because there isn't a general rule.

For instance the default value of `m` in `~concurrent.futures.ThreadPoolExecutor` is set to 5 which
I think is quite random.
"""

from concurrent import futures
from typing import Callable, List

from loguru import logger


[docs]class MultiProcessor: """ .. versionadded:: 0.2.0 A class to easily wrap a multi-processing context manager call to a function. .. important:: * Reminder: multiprocessing is good for cpu-heavy tasks. * Reminder: only picklable objects can be executed and returned. Example: .. code-block:: python Processor = MultiProcessor() results_one_tuple_per_run = Processor.execute_function( func=your_cpu_heavy_function, func_args=list_of_args_for_each_call, n_processes=some_int_up_to_you, ) """
[docs] @staticmethod def execute_function(func: Callable, func_args: list, n_processes: int) -> List[tuple]: """ .. versionadded:: 0.2.0 Executes the *function* with the provided arguments *func_args* as multiple processes. .. warning:: Do not fire up more processes than you have cores! Never! Args: func (Callable): the function to call. func_args (list): list of the different parameters for each call. If *function* takes more than one parameter, wrap them up in tuples, e.g.: `[(params, run, one), (params, run, two), (params, run, three)]`. n_processes (int): the number of processes to fire up. No more than your number of cores! If *n_processes* is `None` or not given, `~concurrent.futures.ProcessPoolExecutor` will default it to the number of processors on the machine. Returns: A list of tuples, each `tuple` being the returned value(s) of *function* for the given call, for instance `[(results, run, one), (results, run, two), (results, run, three)]`. Example: .. code-block:: python MultiProcessor.execute_function( func=np.square, func_args=list(range(6)), n_processes=2 ) # returns [0, 1, 4, 9, 16, 25] """ logger.debug(f"Starting multiprocessing with {n_processes} processes") with futures.ProcessPoolExecutor(n_processes) as ex: results = ex.map(func, func_args) logger.debug(f"All {n_processes} processes finished") return list(results)
[docs]class MultiThreader: """ .. versionadded:: 0.2.0 A class to easily wrap a multi-threading context manager call to a function. .. important:: Reminder: multithreading is good for IO-heavy tasks. Example: .. code-block:: python Threader = MultiThreader() results_one_tuple_per_run = Threader.execute_function( func=your_io_heavy_function, func_args=list_of_args_for_each_call, n_processes=some_int_up_to_you, ) """
[docs] @staticmethod def execute_function(func: Callable, func_args: list, n_threads: int) -> List[tuple]: """ .. versionadded:: 0.2.0 Executes the *function* with the provided arguments *func_args* as multiple threads. .. note:: Remember: there is no point of having more threads than the number calls to be executed, the excess threads would be idle and you wouldd lose the time spent to fire them up. Args: func (Callable): the function to call. func_args (list): list of the different parameters for each call. If *function* takes more than one parameter, wrap them up in tuples, e.g.: `[(params, run, one), (params, run, two), (params, run, three)]`. n_threads (int): the number of threads to fire up. No more than your number of tasks. If *n_threads* is `None` or not given, `~concurrent.futures.ThreadPoolExecutor` will default it to the number of processors on the machine multiplied by 5, assuming that it is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for a `~concurrent.futures.ProcessPoolExecutor`. Returns: A list of tuples, each `tuple` being the returned value(s) of *function* for the given call, for instance `[(results, run, one), (results, run, two), (results, run, three)]`. Example: .. code-block:: python MultiThreader.execute_function( func=np.square, func_args=list(range(6)), n_processes=2 ) # returns [0, 1, 4, 9, 16, 25] """ logger.debug(f"Starting multithreading with {n_threads} threads") with futures.ThreadPoolExecutor(n_threads) as ex: results = ex.map(func, func_args) logger.debug(f"All {n_threads} threads finished") return list(results)