
    wiQ                         d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZmZmZmZ d dlmZ d dlmZ d dlmZ dZdZdZd	Zd
ZdZdZ G d d          ZdS )    N)AnyCallableDictListOptionalTupleUnion)RUNNING_WINDOWS)WorkerMapParams)DelayedKeyboardInterrupt c                      e Zd ZdZdZdej        j        dede	ddfdZ
de	fd	Zd\d
Zd\dZd\dZ	 	 d]dededee         de	deeef         f
dZdeeef         fdZd\dZd\dZd\dZd\dZd\dZd\dZd\dZde	fdZd^dee         dedee         ddfdZd_ded e d!ed"e!fd#Z"d^dee         defd$Z#dedefd%Z$deddfd&Z%ded'e	ddfd(Z&dedej'        fd)Z(dede	fd*Z)dededdfd+Z*dedefd,Z+dee         d-e,eee         e	ef                  ddfd.Z-d`d0e	d1ee         defd2Z.deddfd3Z/deddfd4Z0d5e1ddfd6Z2deddfd7Z3de	fd8Z4d1ee         de	fd9Z5defd:Z6d\d;Z7de	fd<Z8d\d=Z9d\d>Z:d\d?Z;deddfd@Z<d\dAZ=de,e         fdBZ>d\dCZ?deddfdDZ@deddfdEZAdede	fdFZBdadGe	ddfdHZCdadGe	ddfdIZDdJeEjF        ddfdKZGd\dLZHdbdMejI        dNe	ddfdOZJeKdbdMejI        dNe	ddfdP            ZLdeddfdQZMdeddfdRZNdeddfdSZOdeddfdTZPdeddfdUZQdeddfdVZRded1ede	fdWZSded1ede	fdXZTded1ede	fdYZUeKdZed1ede	fd[            ZVdS )cWorkerCommsa  
    Class that contains all the inter-process communication objects (locks, events, queues, etc.) and functionality to
    interact with them, except for the worker insights comms.

    Contains:
    - Progress bar comms
    - Tasks & (exit) results comms
    - Exception handling comms
    - Terminating and restarting comms
    
    General overview of how the comms work:
    - When ``map`` or ``imap`` is used, the workers need to return the ``idx`` of the task they just completed. This is
        needed to return the results in order. This is communicated by using the ``_keep_order`` boolean value.
    - The main process assigns tasks to the workers by using their respective task queue (``_task_queues``). When no
        tasks have been completed yet, the main process assigns tasks in order. To determine which worker to assign the
        next task to, the main process uses the ``_task_idx`` counter. When tasks have been completed, the main process
        assigns the next task to the worker that completed the last task. This is communicated by using the
        ``_last_completed_task_worker_id`` deque.
    - Each worker keeps track of whether it is running a task by using the ``_worker_running_task`` boolean value. This
        is used by the main process in case a worker needs to be interrupted (due to an exception somewhere else). 
        When a worker is not busy with any task at the moment, the worker will exit itself because of the 
        ``_exception_thrown`` event that is set in such cases. However, when it is running a task, we want to interrupt
        it. The RLock object is used to ensure that there are no race conditions when accessing the 
        ``_worker_running_task`` boolean value. E.g., when getting and setting the value without another process
        doing something in between.
    - Each worker also keeps track of which job it is working on by using the ``_worker_working_on_job`` array. This is
        needed to assess whether a certain task times out, and we need to know which job to set to failed.
    - The workers communicate their results to the main process by using the results queue (``_results_queue``). Each
        worker keeps track of how many results it has added to the queue (``_results_added``), and the main process 
        keeps track of how many results it has received from each worker (``_results_received``). This is used by the
        workers to know when they can safely exit.
    - Workers can request a restart when a maximum lifespan is configured and reached. This is done by setting the
        ``_worker_restart_array`` boolean array. The main process listens to this array and restarts the worker when
        needed. The ``_worker_restart_condition`` is used to signal the main process that a worker needs to be 
        restarted.
    - The ``_workers_dead`` array is used to keep track of which workers are alive and which are not. Sometimes, a
        worker can be terminated by the OS (e.g., OOM), which we want to pick up on. The main process checks regularly
        whether a worker is still alive according to the OS and according to the worker itself. If the OS says it's 
        dead, but the value in ``_workers_dead`` is still False, we know something went wrong.
    - The ``_workers_time_task_started`` array is used to keep track of when a worker started a task. This is used by
        the main process to check whether a worker times out. 
    - Exceptions are communicated by using the ``_exception_thrown`` event. Both the main process as the workers can set
        this event. The main process will set this when, for example, a timeout has been reached when running a ``map``
        task. The source of the exception is stored in the ``_exception_job_id`` value, which is used by the main 
        process to obtain the exception and raise accordingly.
    - The workers communicate every 0.1 seconds how many tasks they have completed. This is used by the main process to
        update the progress bar. The workers communicate this by using the ``_tasks_completed_array`` array. The 
        ``_progress_bar_last_updated`` datetime object is used to keep track of when the last update was sent. The
        ``_progress_bar_shutdown`` boolean value is used to signal the progress bar handler thread to shut down. The
        ``_progress_bar_complete`` event is used to signal the main process and workers that the progress bar is 
        complete and that it's safe to exit.
    g?ctxn_jobsorder_tasksreturnNc                    || _         || _        || _        d| _        | j                             t
          j        dd          | _        g | _        d| _	        g | _
        t          j                    | _        d| _        d| _        g | _        d| _        d| _        | j                             | j                                                   | _        d| _        d| _        | j                                         | _        | j                                         | _        d| _        | j                             t
          j        dd          | _        d| _        d| _        d| _        d| _         dS )z
        :param ctx: Multiprocessing context
        :param n_jobs: Number of workers
        :param order_tasks: Whether to provide tasks to the workers in order, such that worker 0 will get chunk 0,
            worker 1 will get chunk 1, etc.
        FTlockN)!r   r   r   _initializedValuectypesc_bool_keep_order_task_queues	_task_idx_worker_running_taskcollectionsdeque_last_completed_task_worker_id_worker_working_on_job_results_queue_results_added_results_received_worker_restart_array	ConditionLock_worker_restart_condition_workers_dead_workers_time_task_startedexception_lockEvent_exception_thrown_exception_job_id_kill_signal_received_tasks_completed_array_progress_bar_last_updated_progress_bar_shutdown_progress_bar_complete)selfr   r   r   s       c/var/www/development/aibuddy-work/election-extract/venv/lib/python3.11/site-packages/mpire/comms.py__init__zWorkerComms.__init__Z   sE    &!  8>>&-T>JJ 57(,46!.9.?.A.A+:>#
 ;?)+59 :>")-););DHMMOO)L)L& 26
 ?C' #hmmoo!%!1!159%)X^^FM5t^%T%T" ;?#;?':>#:>###    c                     | j         S )zB
        :return: Whether the comms have been initialized
        r   r;   s    r<   is_initializedzWorkerComms.is_initialized   s       r>   c                     d| _         dS )zm
        Resets initialization state. Note: doesn't actually reset the comms, just resets the state.
        FNr@   rA   s    r<   resetzWorkerComms.reset   s     "r>   c                 .     fdt           j                  D              _         fdt           j                  D              _         j                            d j        d           _         j                                         _        d t           j                  D              _	         j                            d j         j        
                                           _         j                            t          j         j        d           _         j                            t          j         j        d           _        dg j        z   j        dd<    j                            d	 j        d
z  d           _         j                                          j                            ddd           _        d j        _         j                            d j         j        
                                           _        t1          j                     _         j                            t          j        dd           _         j                                         _                                          d _        dS )a  
        Initialize/Reset comms containers.

        Threading doesn't have a JoinableQueue, so the threading context returns a multiprocessing.JoinableQueue
        instead. However, in the (unlikely) scenario that threading does get one, we explicitly switch to a
        multiprocessing.JoinableQueue for both the exception queue and progress bar tasks completed queue, because the
        progress bar handler needs process-aware objects.
        c                 B    g | ]}j                                         S  )r   JoinableQueue.0_r;   s     r<   
<listcomp>z*WorkerComms.init_comms.<locals>.<listcomp>   s'    RRR!TX3355RRRr>   c                     g | ]@}j                             t          j        d j                                                   AS )Fr   )r   r   r   r    RLockrI   s     r<   rL   z*WorkerComms.init_comms.<locals>.<listcomp>   sH     %
 %
 %
LMDHNN6=%dhnn6F6FNGG%
 %
 %
r>   iTr   c                     g | ]}d S )r   rG   )rJ   rK   s     r<   rL   z*WorkerComms.init_comms.<locals>.<listcomp>   s    ===Qq===r>   LNd   r   F)ranger   r"   r$   r   Arrayr(   rH   r)   r*   rN   r+   r   r    r,   r0   r1   r4   clearr   r5   r6   valuer7   timer8   r9   r3   r:   reset_progressr   rA   s   `r<   
init_commszWorkerComms.init_comms   s0    SRRRuT[?Q?QRRR%
 %
 %
 %
QVW[WbQcQc%
 %
 %
! '+hnnS$+Dn&Q&Q# #h4466==%*<*<===!%T[tx~~GWGW!X!X &*X^^FM4;UY^%Z%Z"!X^^FM4;T^RR!% 4111*.(..dkAoTX.*Y*Y' 	$$&&&!%QT!B!B+0"( '+hnnS$+DHNNL\L\n&]&]#*.)++'&*hnnV]EPTn&U&U#&*hnn&6&6# r>   c                     d| _         | j                                         dg| j        z  | j        dd<   |                                  |                                  dS )zG
        Resets the task_idx and last_completed_task_worker_id
        r   N)r#   r'   rV   r   r7   clear_progress_bar_shutdownclear_progress_bar_completerA   s    r<   rY   zWorkerComms.reset_progress   sd     +11333*+t{):#AAA&((***((*****r>   F	worker_idprogress_bar_last_updatedprogress_bar_n_tasks_completedforce_updatec                     |s|dz  }t          j                     }|s||z
  | j        k    rJ| j                                        5  | j        |xx         |z  cc<   ddd           n# 1 swxY w Y   |}d}||fS )a  
        Signal that we've completed a task every 0.1 seconds, for the progress bar

        :param worker_id: Worker ID
        :param progress_bar_last_updated: Last time the progress bar update was send
        :param progress_bar_n_tasks_completed: Number of tasks completed since last update
        :param force_update: Whether to force an update
        :return: Tuple containing new last updated time and number of tasks completed since last update
           Nr   )rX   progress_bar_update_intervalr7   get_lock)r;   r^   r_   r`   ra   nows         r<   task_completed_progress_barz'WorkerComms.task_completed_progress_bar   s      	0*a/* ikk 	/C";;t?```,5577 Y Y+I666:XX666Y Y Y Y Y Y Y Y Y Y Y Y Y Y Y(+%-.*(*HHHs   A&&A*-A*c                 X   t          j                     | j        z
  }|| j        k     rt          j        | j        |z
             |                                 sN|                                 s:| j        j        s.t          | j	                  }t          j                     | _        |S t          S )z
        Obtain the number of tasks completed by the workers. As the progress bar handler lives inside a thread we don't
        poll continuously, but every 0.1 seconds.

        :return: The number of tasks done or a poison pill
        )rX   r8   rd   sleepexception_thrownkill_signal_receivedr9   rW   sumr7   POISON_PILL)r;   	time_diffn_tasks_completeds      r<    get_tasks_completed_progress_barz,WorkerComms.get_tasks_completed_progress_bar   s     IKK$"AA	t888Jt89DEEE ((** 	%43L3L3N3N 	%.4	% #D$? @ @.2ikkD+$$r>   c                     d| j         _        dS )zH
        Signals the progress bar handling process to shut down
        TNr9   rW   rA   s    r<   signal_progress_bar_shutdownz(WorkerComms.signal_progress_bar_shutdown  s     -1#)))r>   c                 0    | j         d| j         _        dS dS )z9
        Clears the progress bar shutdown signal
        NFrr   rA   s    r<   r\   z'WorkerComms.clear_progress_bar_shutdown  s&     &205D'--- 32r>   c                 8    | j                                          dS )z:
        Signal that the progress bar is complete
        N)r:   setrA   s    r<   signal_progress_bar_completez(WorkerComms.signal_progress_bar_complete  s     	#'')))))r>   c                 J    | j         | j                                          dS dS )z9
        Clear that the progress bar is complete
        N)r:   rV   rA   s    r<   r]   z'WorkerComms.clear_progress_bar_complete  s0     &2'--///// 32r>   c                 J    | j         | j                                          dS dS )z;
        Waits until the progress bar is completed
        N)r:   waitrA   s    r<   #wait_until_progress_bar_is_completez/WorkerComms.wait_until_progress_bar_is_complete!  s0     &2',,..... 32r>   c                     d| j         _        dS )z8
        Set that we need to keep order in mind
        TNr!   rW   rA   s    r<   signal_keep_orderzWorkerComms.signal_keep_order,  s     "&r>   c                     d| j         _        dS )z;
        Forget that we need to keep order in mind
        FNr}   rA   s    r<   clear_keep_orderzWorkerComms.clear_keep_order2  s     "'r>   c                     | j         j        S )z@
        :return: Whether we need to keep order in mind
        r}   rA   s    r<   
keep_orderzWorkerComms.keep_order8  s     %%r>   job_idtaskc                     |                      |          }t                      5  |||fn|}| j        |                             |d           ddd           dS # 1 swxY w Y   dS )a  
        Add a task to the queue so a worker can process it.

        :param job_id: Job ID or None
        :param task: A tuple of arguments to pass to a worker, which acts upon it
        :param worker_id: If provided, give the task to the worker ID
        NTblock)_get_task_worker_idr   r"   put)r;   r   r   r^   s       r<   add_taskzWorkerComms.add_taskB  s     ,,Y77	%'' 	? 	?%+%7FD>>TDi(,,T,>>>	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	? 	?s   +AA #A rG   funcargskwargsc                     |i }|                                  }|                     dt          |           |                     ||||ff|           dS )ao  
        Add a task to the queue so a worker can process it. First though, add an APPLY_PILL such that the worker knows
        it needs to treat this task differently.

        :param func: Function to apply
        :param job_id: Job ID
        :param args: Arguments to pass to the function
        :param kwargs: Keyword arguments to pass to the function
        N)r   r   
APPLY_PILL)r;   r   r   r   r   r^   s         r<   add_apply_taskzWorkerComms.add_apply_taskO  sZ     >F,,..	dJ	222ftdF^4i@@@@@r>   c                     |G| j         s| j        s | j        | j        z  }| xj        dz  c_        n| j                                        }|S )a^  
        Get the worker ID for the next task.

        When a worker ID is not present, we first check if we need to pass on the tasks in order. If not, we check
        whether we got results already. If so, we give the next task to the worker who completed that task. Otherwise,
        we decide based on order

        :return: Worker ID
        Nrc   )r   r'   r#   r   popleftr;   r^   s     r<   r   zWorkerComms._get_task_worker_id`  s[      Jt'J J NT[8	!# ?GGII	r>   c                     |                                  sL	 | j        |                             dd          S # t          j        $ r Y nw xY w|                                  LdS )z
        Obtain new chunk of tasks. Occasionally we check if an exception has been thrown. If so, we should quit.

        :param worker_id: Worker ID
        :return: Chunk of tasks or None when an exception was thrown
        T{Gz?r   timeoutN)rj   r"   getqueueEmptyr   s     r<   get_taskzWorkerComms.get_tasks  s}     '')) 	(377dD7QQQ;    '')) 	
 ts   !8 A
	A
c                 D    | j         |                                          dS )zY
        Signal that we've completed a task

        :param worker_id: Worker ID
        N)r"   	task_doner   s     r<   r   zWorkerComms.task_done  s$     	)$..00000r>   runningc                 *    || j         |         _        dS )z
        Set the task the worker is currently running

        :param worker_id: Worker ID
        :param running: Whether the worker is running a task
        Nr$   rW   )r;   r^   r   s      r<   set_worker_running_taskz#WorkerComms.set_worker_running_task  s     6=!),222r>   c                 @    | j         |                                         S )zy
        Obtain the lock for the worker running task

        :param worker_id: Worker ID
        :return: RLock
        )r$   re   r   s     r<   get_worker_running_task_lockz(WorkerComms.get_worker_running_task_lock  s     (3<<>>>r>   c                 &    | j         |         j        S )z
        Obtain whether the worker is running a task

        :param worker_id: Worker ID
        :return: Whether the worker is running a task
        r   r   s     r<   get_worker_running_taskz#WorkerComms.get_worker_running_task  s     (399r>   c                     || j         |<   dS )z
        Signal that the worker is working on the job ID

        :param worker_id: Worker ID
        :param job_id: Job ID
        Nr(   )r;   r^   r   s      r<   signal_worker_working_on_jobz(WorkerComms.signal_worker_working_on_job  s     28#I...r>   c                     | j         |         S )zy
        Obtain the job ID the worker is working on

        :param worker_id: Worker ID
        :return: Job ID
        r   r   s     r<   get_worker_working_on_jobz%WorkerComms.get_worker_working_on_job  s     *955r>   resultsc                 l    || j         |xx         dz  cc<   | j                            ||f           dS )z
        Add results to the results queue

        :param worker_id: Worker ID
        :param results: A list of tuples of job ID, success bool, and output from the worker
        Nrc   )r*   r)   r   )r;   r^   r   s      r<   add_resultszWorkerComms.add_results  sK      	***a/***G 455555r>   Tr   r   c                    	 t                      5  | j                            ||          \  }}| j                                         |`| j                                        5  | j        |xx         dz  cc<   ddd           n# 1 swxY w Y   | j                            |           |cddd           S # 1 swxY w Y   dS # t          $ r ddt          fgcY S w xY w)a$  
        Obtain the next result from the results queue

        :param block: Whether to block (wait for results)
        :param timeout: How long to wait for results in case ``block==True``
        :return: The next result from the queue, which is the result of calling the function
        r   Nrc   )
r   r)   r   r   r+   re   r'   appendEOFErrorrm   )r;   r   r   r^   r   s        r<   get_resultszWorkerComms.get_results  s   	/)++  %)%8%<%<5RY%<%Z%Z"	7#--///(/88:: ? ?.y999Q>999? ? ? ? ? ? ? ? ? ? ? ? ? ? ?7>>yIII                   	/ 	/ 	/4-....	/sY   C AB9$B:B9B
	
B9B
	B9,C 9B==C  B=C CCc                     d| j         |<   dS )zi
        Reset the number of results received from a worker

        :param worker_id: Worker ID
        r   N)r+   r   s     r<   reset_results_receivedz"WorkerComms.reset_results_received  s     -.y)))r>   c                     | j         |         | j        |         k    r2t          j        d           | j         |         | j        |         k    0dS dS )z
        Wait for the main process to receive all the results from a specific worker

        :param worker_id: Worker ID
        r   N)r+   r*   rX   ri   r   s     r<   wait_for_all_results_receivedz)WorkerComms.wait_for_all_results_received  sX     $Y/43Fy3QQQJt $Y/43Fy3QQQQQQQr>   
map_paramsc                     t          | j                  D ]5}|                     dt          |           |                     d||           6dS )zc
        Submits new map params for each worker

        :param map_params: New map params
        N)rT   r   r   NEW_MAP_PARAMS_PILL)r;   r   r^   s      r<   add_new_map_paramszWorkerComms.add_new_map_params  sW     t{++ 	7 	7IMM$ 3Y???MM$
I6666	7 	7r>   c                 P    || j         _        | j                                         dS )zf
        Set the exception event

        :param job_id: Job ID which triggered the exception
        N)r5   rW   r4   rv   )r;   r   s     r<   signal_exception_thrownz#WorkerComms.signal_exception_thrown  s*     (.$""$$$$$r>   c                 4    | j                                         S )zP
        :return: Whether an exception was thrown by one of the workers
        )r4   is_setrA   s    r<   rj   zWorkerComms.exception_thrown  s     %,,...r>   c                 8    | j                             |          S )z
        Waits until the exception thrown event is set

        :param timeout: How long to wait before giving up
        :return: True when exception was thrown, False if timeout was reached
        r   )r4   rz   )r;   r   s     r<   wait_for_exception_thrownz%WorkerComms.wait_for_exception_thrown  s     %**7*;;;r>   c                     | j         j        S )z?
        :return: Job ID which triggered the exception
        )r5   rW   rA   s    r<   get_exception_thrown_job_idz'WorkerComms.get_exception_thrown_job_id  s     %++r>   c                     d| j         _        dS )z4
        Set the kill signal received event
        TNr6   rW   rA   s    r<   signal_kill_signal_receivedz'WorkerComms.signal_kill_signal_received  s     ,0"(((r>   c                     | j         j        S )zS
        :return: Whether a kill signal was received in one of the workers
        r   rA   s    r<   rk   z WorkerComms.kill_signal_received  s     )//r>   c                 l    t          | j                  D ]}|                     dt          |           dS )z7
        'Tell' the workers their job is done.
        N)rT   r   r   rm   r   s     r<   insert_poison_pillzWorkerComms.insert_poison_pill#  s@     t{++ 	8 	8IMM$Y7777	8 	8r>   c                 D    |                      dddt          fg           dS )zF
        'Tell' the apply results listener their job is done.
        NT)r   rm   rA   s    r<   #insert_poison_pill_results_listenerz/WorkerComms.insert_poison_pill_results_listener*  s*     	t[ 9:;;;;;r>   c                 l    t          | j                  D ]}|                     dt          |           dS )z
        When ``keep_alive=True``, the workers should stay alive, but they need to wrap up their work (like sending the
        latest progress bar update)
        N)rT   r   r   NON_LETHAL_POISON_PILLr   s     r<   insert_non_lethal_poison_pillz)WorkerComms.insert_non_lethal_poison_pill0  sE    
 t{++ 	C 	CIMM$ 6	BBBB	C 	Cr>   c                     d| j         |<   | j        5  | j                                         ddd           dS # 1 swxY w Y   dS )zx
        Signal to the main process that this worker needs to be restarted

        :param worker_id: Worker ID
        TN)r,   r/   notifyr   s     r<   signal_worker_restartz!WorkerComms.signal_worker_restart8  s     15"9-+ 	4 	4*11333	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4s   9= =c                 x    | j         5  | j                                          ddd           dS # 1 swxY w Y   dS )z
        Signal the condition primitive, such that the worker restart handler thread can continue. This is useful when
        an exception has been thrown and the thread needs to exit.
        N)r/   r   rA   s    r<   signal_worker_restart_conditionz+WorkerComms.signal_worker_restart_conditionB  s    
 + 	4 	4*11333	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4s   /33c                       fd} j         5   |            }|s# j                                           |            }|cddd           S # 1 swxY w Y   dS )a  
        Obtain the worker IDs that need to be restarted. Blocks until at least one worker needs to be restarted.
        It returns an empty list when an exception has been thrown (which also notifies the worker_done_condition)

        :return: List of worker IDs
        c                  @    d t           j                  D             S )Nc                     g | ]	\  }}||
S rG   rG   )rJ   r^   restarts      r<   rL   zQWorkerComms.get_worker_restarts.<locals>._get_worker_restarts.<locals>.<listcomp>R  s#    iii"4)WahiIiiir>   )	enumerater,   rA   s   r<   _get_worker_restartsz=WorkerComms.get_worker_restarts.<locals>._get_worker_restartsQ  s"    ii	$B\8]8]iiiir>   N)r/   rz   )r;   r   
worker_idss   `  r<   get_worker_restartszWorkerComms.get_worker_restartsJ  s    	j 	j 	j 	j 	j + 	 	 .-//J 4.335551133
	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   1AAAc                     d| j         |<   dS )z_
        Worker has been restarted, reset signal.

        :param worker_id: Worker ID
        FN)r,   r   s     r<   reset_worker_restartz WorkerComms.reset_worker_restart^  s     16"9---r>   c                     d| j         |<   dS )zV
        Indicate that a worker is alive

        :param worker_id: Worker ID
        FNr0   r   s     r<   signal_worker_alivezWorkerComms.signal_worker_alivef  s     ).9%%%r>   c                     d| j         |<   dS )zU
`       Indicate that a worker is dead

        :param worker_id: Worker ID
        TNr   r   s     r<   signal_worker_deadzWorkerComms.signal_worker_deadn  s     )-9%%%r>   c                     | j         |          S )z
        Check whether the worker is alive

        :param worker_id: Worker ID
        :return: Whether the worker is alive
        r   r   s     r<   is_worker_alivezWorkerComms.is_worker_alivev  s     %i000r>   
keep_alivec                     | j                                          |s4| j                                          | j                                          dS dS )zs
        Join results and exit results queues

        :param keep_alive: Whether to keep the queues alive
        N)r)   joinclosejoin_threadr;   r   s     r<   join_results_queueszWorkerComms.join_results_queues  s[     	  """ 	.%%'''++-----	. 	.r>   c                 t    d | j         D              |s$d | j         D              d | j         D              dS dS )z_
        Join task queues

        :param keep_alive: Whether to keep the queues alive
        c                 6    g | ]}|                                 S rG   )r   rJ   qs     r<   rL   z0WorkerComms.join_task_queues.<locals>.<listcomp>  s     ---a---r>   c                 6    g | ]}|                                 S rG   )r   r   s     r<   rL   z0WorkerComms.join_task_queues.<locals>.<listcomp>  s     2221QWWYY222r>   c                 6    g | ]}|                                 S rG   )r   r   s     r<   rL   z0WorkerComms.join_task_queues.<locals>.<listcomp>  s     888Q]]__888r>   N)r"   r   s     r<   join_task_queueszWorkerComms.join_task_queues  sa     	.-4,---- 	922 1222288d&7888888	9 	9r>   dont_wait_eventc                     d}	 	 |                      d           |                                 d}-# t          j        t          f$ r |r|                                 Y dS Y dS w xY w)a  
        Drain the results queue without blocking. This is done when terminating workers, while they could still be busy
        putting something in the queues. This function will always be called from within a thread.

        :param dont_wait_event: Event object to indicate whether other termination threads should continue. I.e., when
            we set it to False, threads should wait.
        FTr   N)r   rV   r   r   OSErrorrv   )r;   r   got_resultss      r<   $drain_results_queue_terminate_workerz0WorkerComms.drain_results_queue_terminate_worker  s     	&#  u ---%%'''"# W% 	& 	& 	& &##%%%%%%& & &	&s   .2 ,A%$A%c                 b      fd j         D                                    j                   dS )z0
        Drain tasks and results queues
        c                 :    g | ]}                     |          S rG   )drain_and_join_queue)rJ   r   r;   s     r<   rL   z,WorkerComms.drain_queues.<locals>.<listcomp>  s'    AAA!	"	"1	%	%AAAr>   N)r"   r   r)   rA   s   `r<   drain_queueszWorkerComms.drain_queues  s@     	BAAAt/@AAAA!!$"566666r>   r   r   c                    t           r|                     ||           dS 	 | j                            | j        ||f          }|                                 |                    d           |                                r(|                                 |                                 |r*|                                 |	                                 dS dS # t          $ r Y dS w xY w)ay  
        Drains a queue completely, such that it is joinable. If a timeout is reached, we give up and terminate. So far,
        I've only seen it happen when an exception is thrown when using spawn as start method, and even then it only
        happens once every 1000 runs or so.

        :param q: Queue to join
        :param join: Whether to join the queue or not
        )targetr      r   N)r
   _drain_and_join_queuer   Processstartr   is_alive	terminater   r   r   )r;   r   r   processs       r<   r   z WorkerComms.drain_and_join_queue  s      	&&q$/////(**$2LTUW[S\*]]Q'''##%% #%%'''LLNNN $GGIIIMMOOOOO$ $    s   B3C 
C&%C&c                    | dS d}	 	 |                                   |dz  }# t          t          f$ r Y nw xY w	 |                                 r|dk    r6|                     dd           |dz  }|                                 0|dk    6n(# t          t
          t          j        t          f$ r Y nw xY w|rW	 |                                  | 	                                 | 
                                 dS # t          t          f$ r Y dS w xY wdS )z
        Drains a queue completely, such that it is joinable

        :param q: Queue to join
        :param join: Whether to join the queue or not
        Nr   Trc   g      ?r   )r   r   
ValueErroremptyr   r   r   r   r   r   r   )r   r   ns      r<   r   z!WorkerComms._drain_and_join_queue  sP    9F 	Q $ 	 	 	D		ggii 166D#...Q ggii 166 5;
; 	 	 	D	  				Z(   	 	s0   # 77AB "B10B17<C5 5D
	D
c                 B    t          j                     | j        |dz  <   dS )zs
        Sets the worker_init started timestamp for a specific worker

        :param worker_id: Worker ID
        rS   NrX   r1   r   s     r<   signal_worker_init_startedz&WorkerComms.signal_worker_init_started  s!     :>'	A666r>   c                 H    t          j                     | j        |dz  dz   <   dS )zl
        Sets the task started timestamp for a specific worker

        :param worker_id: Worker ID
        rS   rc   Nr	  r   s     r<   signal_worker_task_startedz&WorkerComms.signal_worker_task_started  '     >BY[['	A(9:::r>   c                 H    t          j                     | j        |dz  dz   <   dS )zs
        Sets the worker_exit started timestamp for a specific worker

        :param worker_id: Worker ID
        rS      Nr	  r   s     r<   signal_worker_exit_startedz&WorkerComms.signal_worker_exit_started  r  r>   c                      d| j         |dz  <   dS )zu
        Resets the worker_init started timestamp for a specific worker

        :param worker_id: Worker ID
        r   rS   Nr1   r   s     r<   signal_worker_init_completedz(WorkerComms.signal_worker_init_completed  s     :;'	A666r>   c                 &    d| j         |dz  dz   <   dS )zn
        Resets the task started timestamp for a specific worker

        :param worker_id: Worker ID
        r   rS   rc   Nr  r   s     r<   signal_worker_task_completedz(WorkerComms.signal_worker_task_completed        >?'	A(9:::r>   c                 &    d| j         |dz  dz   <   dS )zu
        Resets the worker_exit started timestamp for a specific worker

        :param worker_id: Worker ID
        r   rS   r  Nr  r   s     r<   signal_worker_exit_completedz(WorkerComms.signal_worker_exit_completed  r  r>   c                 N    | j         |dz           }|                     ||          S )z
        Checks whether a worker_init takes longer than the timeout value

        :param worker_id: Worker ID
        :param timeout: Timeout in seconds
        :return: True when time has expired, False otherwise
        rS   r1   _has_worker_timed_outr;   r^   r   started_times       r<   has_worker_init_timed_outz%WorkerComms.has_worker_init_timed_out'  s+     6y1}E)),@@@r>   c                 T    | j         |dz  dz            }|                     ||          S )z
        Checks whether a worker task takes longer than the timeout value

        :param worker_id: Worker ID
        :param timeout: Timeout in seconds
        :return: True when time has expired, False otherwise
        rS   rc   r  r  s       r<   has_worker_task_timed_outz%WorkerComms.has_worker_task_timed_out2  0     6y1}q7HI)),@@@r>   c                 T    | j         |dz  dz            }|                     ||          S )z
        Checks whether a worker_exit takes longer than the timeout value

        :param worker_id: Worker ID
        :param timeout: Timeout in seconds
        :return: True when time has expired, False otherwise
        rS   r  r  r  s       r<   has_worker_exit_timed_outz%WorkerComms.has_worker_exit_timed_out=  r!  r>   r  c                 F    | dk    rdnt          j                     | z
  |k    S )z
        Checks whether time has passed beyond the timeout

        :param started_time: Timestamp
        :param timeout: Timeout in seconds
        :return: True when time has expired, False otherwise
        g        F)rX   )r  r   s     r<   r  z!WorkerComms._has_worker_timed_outH  s)     %++uu$)++2LQX1XXr>   )r   N)NF)N)rG   N)TN)F)T)W__name__
__module____qualname____doc__rd   mpcontextBaseContextintboolr=   rB   rD   rZ   rY   floatr   r   rg   r	   strrp   rs   r\   rw   r]   r{   r~   r   r   r   r   r   r   r   r   r   r   r   rN   r   r   r   r   r   r   r   r   r   r   r   r   rj   r   r   r   rk   r   r   r   r   r   r   r   r   r   r   r   r   	threadingr3   r   r   rH   r   staticmethodr   r
  r  r  r  r  r  r  r   r#  r  rG   r>   r<   r   r       s       3 3l $' 5?BJ2 5?C 5?d 5?W[ 5? 5? 5? 5?v! ! ! ! !" " " "'! '! '! '!R+ + + + UY9>I IS IUZ IDLSMI26ICHPSCTI I I I4%S/    *1 1 1 16 6 6 6* * * *0 0 0 0/ / / /& & & &' ' ' '&D & & & &? ?x} ?C ?HSM ?]a ? ? ? ?A AS A A ATX A A A A" Xc] c    &# #    13 14 1 1 1 1= =t = = = = =?c ?bh ? ? ? ?: : : : : :8c 83 84 8 8 8 863 63 6 6 6 6	6Xc] 	6T%QTW[]`H`Ba=b 	6gk 	6 	6 	6 	6/ / /x /RU / / / /*. . . . . .s t    7_ 7 7 7 7 7%c %d % % % %/$ / / / /<% <T < < < <,S , , , ,0 0 0 00d 0 0 0 08 8 8 8< < < <C C C C4s 4t 4 4 4 44 4 4 4T#Y    (6 6 6 6.S .T . . . .-C -D - - - -1 1 1 1 1 1	. 	.d 	.t 	. 	. 	. 	.	9 	94 	9D 	9 	9 	9 	9&IO &X\ & & & &(7 7 7 7 b&6 d d    : # #!1 # # # # # \#REC ED E E E EIC ID I I I IIC ID I I I I;c ;d ; ; ; ;?c ?d ? ? ? ??c ?d ? ? ? ?	A3 	A 	A4 	A 	A 	A 	A	A3 	A 	A4 	A 	A 	A 	A	A3 	A 	A4 	A 	A 	A 	A YE YE Yd Y Y Y \Y Y Yr>   r   )r%   r   multiprocessingr)  r   r0  rX   typingr   r   r   r   r   r   r	   mpire.contextr
   mpire.paramsr   mpire.signalr   rm   r   r   r   MAIN_PROCESS	INIT_FUNC	EXIT_FUNCr   rG   r>   r<   <module>r:     s%                  D D D D D D D D D D D D D D D D D D ) ) ) ) ) ) ( ( ( ( ( ( 1 1 1 1 1 1      
 		qY qY qY qY qY qY qY qY qY qYr>   