Skip to content

MultiThreadingHelper

psengine.helpers.helpers.MultiThreadingHelper

Multithreading class.

multithread_it staticmethod

multithread_it(
    max_workers: int,
    func: Callable,
    *,
    iterator: Iterable,
    **kwargs,
) -> list

Multithreading helper for I/O Operations.

The class can be used in the following way. Given a single thread code like:

PARAMETER DESCRIPTION
max_workers

Number of threads to use.

TYPE: int

func

Function to be executed in parallel.

TYPE: Callable

iterator

The list of elements to be dispatched to the threads.

TYPE: Iterable

Example
1
2
3
4
5
6
7
def _lookup_alert(self, alert_id, index, total_num_of_alerts):
    ...

def all_alerts(self, alerts):
    res = []
    for index, alert_id in enumerate(alert_ids_to_fetch):
        res.append(self._lookup_alert(alert_id, index, len(alert_ids_to_fetch)))

It can be rewritten like:

def _lookup_alert(self, alert_id, index, total_num_of_alerts):
    ...

def all_alerts(self, alerts):
    results = MultiThreadingHelper.multithread_it(
        self.max_workers,
        self._lookup_alert,
        iterator=alert_ids_to_fetch,
        total_num_of_alerts=len(alert_ids_to_fetch)
    )
RETURNS DESCRIPTION
list

List of objects returned by the calling function.

Source code in psengine/helpers/helpers.py
@staticmethod
def multithread_it(
    max_workers: Annotated[int, Doc('Number of threads to use.')],
    func: Annotated[Callable, Doc('Function to be executed in parallel.')],
    *,
    iterator: Annotated[Iterable, Doc('The list of elements to be dispatched to the threads.')],
    **kwargs,
) -> Annotated[list, Doc('List of objects returned by the calling function.')]:
    """Multithreading helper for I/O Operations.

    The class can be used in the following way. Given a single thread code like:

    Example:
        ```python
        def _lookup_alert(self, alert_id, index, total_num_of_alerts):
            ...

        def all_alerts(self, alerts):
            res = []
            for index, alert_id in enumerate(alert_ids_to_fetch):
                res.append(self._lookup_alert(alert_id, index, len(alert_ids_to_fetch)))
        ```

        It can be rewritten like:

        ```python
        def _lookup_alert(self, alert_id, index, total_num_of_alerts):
            ...

        def all_alerts(self, alerts):
            results = MultiThreadingHelper.multithread_it(
                self.max_workers,
                self._lookup_alert,
                iterator=alert_ids_to_fetch,
                total_num_of_alerts=len(alert_ids_to_fetch)
            )
        ```
    """
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = [pool.submit(func, element, **kwargs) for element in iterator]

    return [f.result() for f in futures]