# -*- coding: utf-8 -*-
"""
The processpool submodule contains a custom process-pool, with some additional features not present in standard
Python processpool management, e.g. timeouts.
"""
from time import sleep
import datetime
import traceback
import heapq
from multiprocessing import Process, Pipe, cpu_count
from multiprocessing.pool import Pool
exception_debugging = False
[docs]class Future(object):
command = None
args = None
kwargs = None
value = None
error = None
process = None
pool = None
priority = 0
status = None
timeout = 0
started_at = None
def __init__(self):
pass
def __lt__(self, other):
# dummy
return self.priority < other.priority
[docs] def wait(self, time=None):
pass
[docs] def fail(self):
self.status, (self.value, self.error) = True, (None, None)
if self.process:
self.process.terminate()
self.pool.report_broken_process(self.process)
else:
self.pool.future_became_ready(self)
[docs] def ready(self):
if self.status:
return self.status
if self.process is None:
# not scheduled yet
return False
if self.timeout > 0:
now = datetime.datetime.now()
if (now - self.started_at).total_seconds() > self.timeout:
# we reached a hard timeout
try:
self.process.terminate()
# kill(self.process.pid, SIGINT)
sleep(0.25)
except Exception as e:
print(repr(e))
# self.process.terminate()
self.pool.report_broken_process(self.process)
self.status, (self.value, self.error) = \
True, (None, RuntimeError('Process took longer than specified timeout and was terminated.'))
return
if not self.process.is_alive():
exitcode = self.process.exitcode
self.pool.report_broken_process(self.process)
self.status, (self.value, self.error) = \
True, (None, RuntimeError('Process trying to work on this future died. Exitcode: %d' % (exitcode,)))
return
self.status, (self.value, self.error) = self.process.ready()
if self.status:
self.pool.future_became_ready(self)
return self.status
[docs] def get(self):
not_ready = True
while not_ready:
not_ready = not self.ready()
if self.error:
raise self.error
else:
return self.value
[docs] def dispatch(self):
self.started_at = datetime.datetime.now()
self.process.dispatch()
[docs]class WrappedException(RuntimeError):
exception = None
message = None
def __init__(self, exception, message=''):
self.exception = exception
self.message = message
[docs]class FutureProcess(Process):
STARTUP = 0
RUN = 1
STOP = 2
def __init__(self):
super(FutureProcess, self).__init__()
self.future = None
self.pipe_parent_end, self.pipe_child_end = Pipe()
[docs] def run(self):
while True:
command_type, command, args, kwargs = self.pipe_child_end.recv()
if command_type == FutureProcess.STOP:
break
if command_type == FutureProcess.STARTUP and command is None:
continue
result = None
exc = None
try:
result = command(*args, **kwargs)
except Exception as e:
exc = WrappedException(e, traceback.format_exc())
if command_type == FutureProcess.STARTUP:
continue
self.pipe_child_end.send((result, exc,))
[docs] def send_command(self, *args):
self.pipe_parent_end.send(args)
[docs] def dispatch(self):
self.send_command(FutureProcess.RUN, self.future.command, self.future.args, self.future.kwargs)
[docs] def ready(self):
if self.pipe_parent_end.poll():
return True, self.pipe_parent_end.recv()
else:
return False, (None, None,)
[docs]class SimpleProcessPool(object):
[docs] def new_process(self):
p = FutureProcess()
p.start()
p.send_command(*self.startup_message)
return p
[docs] def fill_pool(self):
for _ in range(self.count - (len(self.waiting_processes) + len(self.active_processes))):
self.waiting_processes.add(self.new_process())
def __init__(self, processes=0, initializer=None, initargs=None, initkwargs=None, future_timeout=0):
if initargs is None:
initargs = ()
if initkwargs is None:
initkwargs = {}
self.startup_message = (FutureProcess.STARTUP, initializer, initargs, initkwargs)
if processes == 0:
processes = cpu_count()
self.future_timeout = future_timeout
self.count = processes
self.waiting_processes = set()
self.active_processes = set()
self.fill_pool()
self.active_futures = set()
self.waiting_futures = [] # priority queue
self.closing = False
[docs] def close(self):
self.closing = True
self.schedule()
[docs] def report_broken_process(self, p):
f = p.future
if p in self.active_processes:
self.active_processes.remove(p)
if p in self.waiting_processes:
# print("found a process where it does not belong", p)
self.waiting_processes.remove(p)
if f in self.active_futures:
self.active_futures.remove(f)
if f in self.waiting_futures:
# remove one entry, rebuild
self.waiting_futures.remove(f)
heapq.heapify(self.waiting_futures)
# if f in self.waiting_futures:
# #print("found a future where it does not belong", f)
# self.waiting_futures.remove(f)
self.fill_pool()
self.schedule()
[docs] def apply(self, command, *args, **kwargs):
return self.advanced_apply(command=command, args=args, kwargs=kwargs)
[docs] def advanced_apply(self, command=None, priority=0, args=None, kwargs=None):
if args is None:
args = tuple()
if kwargs is None:
kwargs = {}
f = Future()
f.command = command
f.args = args
f.kwargs = kwargs
f.priority = priority
f.timeout = self.future_timeout
f.pool = self
# self.waiting_futures.add(f)
heapq.heappush(self.waiting_futures, f)
self.schedule()
return f
# ugly signature
[docs] def apply_async(self, fun, args=None, kwargs=None):
if args is None:
args = {}
if kwargs is None:
kwargs = {}
return self.apply(fun, *args, **kwargs)
[docs] def future_became_ready(self, f):
if f in self.active_futures:
self.active_futures.remove(f)
if f.process in self.active_processes:
self.active_processes.remove(f.process)
if f.process:
self.waiting_processes.add(f.process)
self.schedule()
[docs] def schedule(self):
for f in self.active_futures.copy():
f.ready()
while len(self.waiting_processes) > 0:
if len(self.waiting_futures) == 0:
break
# f = self.waiting_futures.pop()
f = heapq.heappop(self.waiting_futures)
p = self.waiting_processes.pop()
f.process = p
p.future = f
self.active_processes.add(p)
self.active_futures.add(f)
f.dispatch()
if self.closing:
while len(self.waiting_processes) > 0:
p = self.waiting_processes.pop()
p.send_command(FutureProcess.STOP, None, [], {})
self.active_processes.add(p)
[docs]class DuckTypedApplyResult(object):
def __init__(self, callable_):
self.value = None
self.called = False
self.callable = callable_
# noinspection PyMethodMayBeStatic
[docs] def ready(self):
return True
[docs] def wait(self, timeout):
pass
[docs] def fail(self):
self.value = None
self.called = True
[docs] def get(self):
if not self.called:
self.value = None
try:
self.value = self.callable()
except Exception as e:
if exception_debugging:
raise
raise WrappedException(e, traceback.format_exc())
return self.value
# noinspection PyAbstractClass,PyUnusedLocal
[docs]class NormalPool(Pool):
[docs] def advanced_apply(self, command, args, **kwargs):
return self.apply(func=command, args=args)
# noinspection PyUnusedLocal
[docs]class InProcessFakePool(object):
[docs] @staticmethod
def advanced_apply(command, args, **kwargs):
def _bind_it(inner_args):
def _perform():
return command(*inner_args)
return _perform
return DuckTypedApplyResult(_bind_it(args))