Commit 7352d5f0 by fwkz

Fixing: ThreadPoolExecutor hangs while there is more worker threads than scheduled jobs.

parent 4bb7dd44
from __future__ import print_function
from __future__ import absolute_import from __future__ import absolute_import
import threading import threading
...@@ -41,38 +40,47 @@ class ThreadPoolExecutor(object): ...@@ -41,38 +40,47 @@ class ThreadPoolExecutor(object):
def __init__(self, threads): def __init__(self, threads):
self.threads = threads self.threads = threads
self.workers = [] self.workers = []
self.worker = None self.started_workers = []
self.monitor_worker = None
self.start_time = None self.start_time = None
def __enter__(self): def __enter__(self):
self.workers = [] workers = []
data_producing.set() data_producing.set()
for worker_id in xrange(int(self.threads)): for worker_id in xrange(int(self.threads)):
worker = WorkerThread( worker = WorkerThread(
name='worker-{}'.format(worker_id), name='worker-{}'.format(worker_id),
) )
worker.start() workers.append(worker)
self.workers.append(worker)
self.worker = worker self.monitor_worker = worker
self.workers = iter(workers)
self.start_time = time.time() self.start_time = time.time()
return self return self
def __exit__(self, *args): def __exit__(self, *args):
data_producing.clear() data_producing.clear()
try: try:
while self.worker.isAlive(): while self.monitor_worker.is_alive():
self.worker.join(1) self.monitor_worker.join(1)
except KeyboardInterrupt: except KeyboardInterrupt:
utils.print_info() utils.print_info()
utils.print_status("Waiting for already scheduled jobs to finish...") utils.print_status("Waiting for already scheduled jobs to finish...")
data_queue.queue.clear() data_queue.queue.clear()
finally: finally:
for worker in self.workers: for worker in self.started_workers:
worker.join() worker.join()
data_queue.unfinished_tasks = 0 data_queue.unfinished_tasks = 0
utils.print_status('Elapsed time: ', time.time() - self.start_time, 'seconds') utils.print_status('Elapsed time: ', time.time() - self.start_time, 'seconds')
def submit(self, *args): def submit(self, *args):
try:
worker = next(self.workers)
except StopIteration:
pass
else:
worker.start()
self.started_workers.append(worker)
data_queue.put(args) data_queue.put(args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment