Commit 9f7eb9a3 by fwkz

Starting workers before data producing. Worker threads will do the job while…

Starting workers before data producing. Worker threads will do the job while data is being produced.
parent ad38942a
...@@ -14,6 +14,7 @@ from .exceptions import StopThreadPoolExecutor ...@@ -14,6 +14,7 @@ from .exceptions import StopThreadPoolExecutor
data_queue = queue.Queue() data_queue = queue.Queue()
data_producing = threading.Event()
class WorkerThread(threading.Thread): class WorkerThread(threading.Thread):
...@@ -22,7 +23,7 @@ class WorkerThread(threading.Thread): ...@@ -22,7 +23,7 @@ class WorkerThread(threading.Thread):
self.name = name self.name = name
def run(self): def run(self):
while not data_queue.empty(): while data_producing.is_set() or not data_queue.empty():
record = data_queue.get() record = data_queue.get()
target = record[0] target = record[0]
args = record[1:] args = record[1:]
...@@ -40,24 +41,28 @@ class ThreadPoolExecutor(object): ...@@ -40,24 +41,28 @@ class ThreadPoolExecutor(object):
def __init__(self, threads): def __init__(self, threads):
self.threads = threads self.threads = threads
self.workers = [] self.workers = []
self.worker = None
self.start_time = None
def __enter__(self): def __enter__(self):
self.workers = [] self.workers = []
data_producing.set()
for worker_id in xrange(int(self.threads)): for worker_id in xrange(int(self.threads)):
worker = WorkerThread( worker = WorkerThread(
name='worker-{}'.format(worker_id), name='worker-{}'.format(worker_id),
) )
worker.start()
self.workers.append(worker) self.workers.append(worker)
self.worker = worker
self.start_time = time.time()
return self return self
def __exit__(self, *args): def __exit__(self, *args):
for worker in self.workers: data_producing.clear()
worker.start()
start = time.time()
try: try:
while worker.isAlive(): while self.worker.isAlive():
worker.join(1) self.worker.join(1)
except KeyboardInterrupt: except KeyboardInterrupt:
utils.print_info() utils.print_info()
utils.print_status("Waiting for already scheduled jobs to finish...") utils.print_status("Waiting for already scheduled jobs to finish...")
...@@ -67,7 +72,7 @@ class ThreadPoolExecutor(object): ...@@ -67,7 +72,7 @@ class ThreadPoolExecutor(object):
worker.join() worker.join()
data_queue.unfinished_tasks = 0 data_queue.unfinished_tasks = 0
utils.print_status('Elapsed time: ', time.time() - start, 'seconds') utils.print_status('Elapsed time: ', time.time() - self.start_time, 'seconds')
def submit(self, *args): def submit(self, *args):
data_queue.put(args) data_queue.put(args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment