Lock timeout and reset in case a plugin takes too long to initialize or the lock is incorrectly marked as locked in multiprocess context

This commit is contained in:
Fabio Manganiello 2018-09-18 20:25:31 +02:00
parent db5d3e6e3f
commit 6bbae19d39

View file

@ -2,6 +2,7 @@ import importlib
import logging import logging
from threading import Lock from threading import Lock
from contextlib import contextmanager
from ..config import Config from ..config import Config
@ -20,6 +21,13 @@ plugins_init_locks = {}
# Reference to the main application bus # Reference to the main application bus
main_bus = None main_bus = None
@contextmanager
def acquire_timeout(lock, timeout):
acquired = lock.acquire(timeout=timeout)
yield acquired
if acquired:
lock.release()
def register_backends(bus=None, global_scope=False, **kwargs): def register_backends(bus=None, global_scope=False, **kwargs):
""" Initialize the backend objects based on the configuration and returns """ Initialize the backend objects based on the configuration and returns
a name -> backend_instance map. a name -> backend_instance map.
@ -98,7 +106,11 @@ def get_plugin(plugin_name, reload=False):
logger.warning('No such class in {}: {}'.format(plugin_name, cls_name)) logger.warning('No such class in {}: {}'.format(plugin_name, cls_name))
raise RuntimeError(e) raise RuntimeError(e)
with plugins_init_locks[plugin_name]: with acquire_timeout(plugins_init_locks[plugin_name], 5) as acquired:
if not acquired:
logger.warning('Lock expired on get_plugin({}), resetting it'.format(plugin_name))
plugins_init_locks[plugin_name] = Lock()
if plugins.get(plugin_name) and not reload: if plugins.get(plugin_name) and not reload:
return plugins[plugin_name] return plugins[plugin_name]
plugins[plugin_name] = plugin_class(**plugin_conf) plugins[plugin_name] = plugin_class(**plugin_conf)