![]() ![]() ![]() This allows us to wait until it's empty using. Args: n_workers: Start this many processes n_tasks: Launch this many tasks max_queue_size: If queue exceeds this size, block when putting items on the queue grace_period: Send SIGINT to processes if they don't exit within this time after SIGINT/SIGTERM kill_period: Send SIGKILL to processes if they don't exit after this many seconds interrupt: If given, send signal SIGTERM to itself after queueing this many tasks """ # The JoinableQueue has an internal counter that increments when an item is put on the queue and # decrements when q.task_done() is called. task_done () def main ( n_workers : int = 2, n_tasks : int = 10, max_queue_size : int = 1, grace_period : int = 2, kill_period : int = 30, interrupt : Optional = None ) -> None : """ Run a process pool of workers. exception ( "Failed to process args %s ", args ) # Can implement some kind of retry handling here finally : q. task_done () break # Do the task try : do_task ( * args ) # Will also catch KeyboardInterrupt except : logger. info ( "Worker exiting because of None on queue" ) q. 01 ) except Empty : # Run next iteration of loop continue # Exit if end of queue if args is None : logger. info ( "Worker exiting because of stop_event" ) break # We set a timeout so we loop past "stop_event" even if the queue is empty try : args = q. info ( "Starting worker." ) while 1 : if stop_event. info ( "Slept for %.3f s", sleep_sec ) return randint ( 0, 10 ) def worker ( q : JoinableQueue, stop_event : Event ): logger. An easy way to handle this is to just catch SIGTERM and raise a KeyboardInterrupt, so that it has the same effect as a SIGINT.ĭef do_task ( sleep_sec : int ) -> int : "Dummy task" sleep ( sleep_sec ) logger. If the process is still running after the "grace period" (30 seconds by default), it is killed the hard way with SIGKILL (which cannot be caught). If you want your application to run in Kubernetes (or other systems that communicate with your process using signals), you'll need to catch SIGTERM, which is k8s' way to say "please shut down gracefully". This is subsequently caught by Python after which a KeyboardInterrupt is raised. When running in a shell, one will tend to stop the main process and all its worker processes/threads by doing CTRL-C which sends a SIGINT to the main process. Stop signals and k8s: 3 ways to handle SIGTERM ¶ ![]() The overhead of starting processes is negligible as these scripts usually run for minutes, not milliseconds. Most of the work I do involves not only I/O but a good amount of compute so I'll use processes to avoid the GIL. However, that's only the first item on my list, the rest still need to be addressed so we'll happily continue making our own Executor. It must be said that a new feature has landed in Python 3.9 that allows to cancel all pending futures on exit. To do that, I'll build a simple demonstration that exhibits these behaviors.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |