Skip to content

Load Balancing

LoadBalancing

Source code in engines/contentFilterEngine/performance_scalability/load_balancing.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class LoadBalancing:
    def __init__(self, num_workers=4):
        """
        Initializes the LoadBalancing with a specified number of worker threads.

        Parameters:
        - num_workers (int): The number of worker threads to spawn.
        """
        self.num_workers = num_workers
        self.task_queue = Queue()
        self.results = []
        self.threads = []
        self._init_workers()
        logger.info(f"LoadBalancing initialized with {self.num_workers} workers.")

    def _init_workers(self):
        """
        Initializes worker threads that continuously process tasks from the queue.
        """
        for i in range(self.num_workers):
            thread = Thread(target=self._worker, name=f"Worker-{i+1}", daemon=True)
            thread.start()
            self.threads.append(thread)
            logger.debug(f"Started {thread.name}.")

    def _worker(self):
        """
        Worker thread that processes tasks from the queue.
        """
        while True:
            func, args, kwargs = self.task_queue.get()
            if func is None:
                # Sentinel found, terminate the thread
                logger.debug(f"{threading.current_thread().name} received sentinel. Exiting.")
                break
            try:
                result = func(*args, **kwargs)
                self.results.append(result)
                logger.debug(f"{threading.current_thread().name} processed a task with result: {result}")
            except Exception as e:
                logger.error(f"Error processing task: {e}")
            finally:
                self.task_queue.task_done()

    def add_task(self, func, *args, **kwargs):
        """
        Adds a new task to the queue.

        Parameters:
        - func (callable): The function to execute.
        - *args: Positional arguments for the function.
        - **kwargs: Keyword arguments for the function.
        """
        self.task_queue.put((func, args, kwargs))
        logger.debug(f"Added task {func.__name__} to the queue.")

    def get_results(self):
        """
        Waits for all tasks to be processed and returns the results.

        Returns:
        - list: A list of results from all tasks.
        """
        self.task_queue.join()
        return self.results

    def shutdown(self):
        """
        Shuts down all worker threads gracefully by sending sentinel tasks.
        """
        for _ in self.threads:
            self.task_queue.put((None, (), {}))  # Sentinel
        for thread in self.threads:
            thread.join()
            logger.debug(f"{thread.name} has terminated.")
        logger.info("LoadBalancing has been shutdown.")

__init__(num_workers=4)

Initializes the LoadBalancing with a specified number of worker threads.

Parameters: - num_workers (int): The number of worker threads to spawn.

Source code in engines/contentFilterEngine/performance_scalability/load_balancing.py
12
13
14
15
16
17
18
19
20
21
22
23
24
def __init__(self, num_workers=4):
    """
    Initializes the LoadBalancing with a specified number of worker threads.

    Parameters:
    - num_workers (int): The number of worker threads to spawn.
    """
    self.num_workers = num_workers
    self.task_queue = Queue()
    self.results = []
    self.threads = []
    self._init_workers()
    logger.info(f"LoadBalancing initialized with {self.num_workers} workers.")

add_task(func, *args, **kwargs)

Adds a new task to the queue.

Parameters: - func (callable): The function to execute. - args: Positional arguments for the function. - *kwargs: Keyword arguments for the function.

Source code in engines/contentFilterEngine/performance_scalability/load_balancing.py
55
56
57
58
59
60
61
62
63
64
65
def add_task(self, func, *args, **kwargs):
    """
    Adds a new task to the queue.

    Parameters:
    - func (callable): The function to execute.
    - *args: Positional arguments for the function.
    - **kwargs: Keyword arguments for the function.
    """
    self.task_queue.put((func, args, kwargs))
    logger.debug(f"Added task {func.__name__} to the queue.")

get_results()

Waits for all tasks to be processed and returns the results.

Returns: - list: A list of results from all tasks.

Source code in engines/contentFilterEngine/performance_scalability/load_balancing.py
67
68
69
70
71
72
73
74
75
def get_results(self):
    """
    Waits for all tasks to be processed and returns the results.

    Returns:
    - list: A list of results from all tasks.
    """
    self.task_queue.join()
    return self.results

shutdown()

Shuts down all worker threads gracefully by sending sentinel tasks.

Source code in engines/contentFilterEngine/performance_scalability/load_balancing.py
77
78
79
80
81
82
83
84
85
86
def shutdown(self):
    """
    Shuts down all worker threads gracefully by sending sentinel tasks.
    """
    for _ in self.threads:
        self.task_queue.put((None, (), {}))  # Sentinel
    for thread in self.threads:
        thread.join()
        logger.debug(f"{thread.name} has terminated.")
    logger.info("LoadBalancing has been shutdown.")