Don't raise a pytest warning upon the asyncio "No event loop" warning

This commit is contained in:
Fabio Manganiello 2022-04-27 23:25:14 +02:00
parent 371fd7e46b
commit 820a1c8184
Signed by untrusted user: blacklight
GPG Key ID: D90FBA7F76362774
2 changed files with 18 additions and 11 deletions

View File

@ -26,14 +26,14 @@ main_bus = None
def register_backends(bus=None, global_scope=False, **kwargs):
""" Initialize the backend objects based on the configuration and returns
"""Initialize the backend objects based on the configuration and returns
a name -> backend_instance map.
Params:
bus -- If specific (it usually should), the messages processed by the
backends will be posted on this bus.
kwargs -- Any additional key-value parameters required to initialize the backends
"""
"""
global main_bus
if bus:
@ -57,8 +57,7 @@ def register_backends(bus=None, global_scope=False, **kwargs):
b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs)
backends[name] = b
except AttributeError as e:
logger.warning('No such class in {}: {}'.format(
module.__name__, cls_name))
logger.warning('No such class in {}: {}'.format(module.__name__, cls_name))
raise RuntimeError(e)
return backends
@ -74,15 +73,15 @@ def register_plugins(bus=None):
def get_backend(name):
""" Returns the backend instance identified by name if it exists """
"""Returns the backend instance identified by name if it exists"""
global backends
return backends.get(name)
def get_plugin(plugin_name, reload=False):
""" Registers a plugin instance by name if not registered already, or
returns the registered plugin instance"""
"""Registers a plugin instance by name if not registered already, or
returns the registered plugin instance"""
global plugins
global plugins_init_locks
@ -104,8 +103,9 @@ def get_plugin(plugin_name, reload=False):
cls_name += token.title()
cls_name += 'Plugin'
plugin_conf = Config.get_plugins()[plugin_name] \
if plugin_name in Config.get_plugins() else {}
plugin_conf = (
Config.get_plugins()[plugin_name] if plugin_name in Config.get_plugins() else {}
)
if 'disabled' in plugin_conf:
if plugin_conf['disabled'] is True:
@ -120,7 +120,9 @@ def get_plugin(plugin_name, reload=False):
try:
plugin_class = getattr(plugin, cls_name)
except AttributeError as e:
logger.warning('No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e)))
logger.warning(
'No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e))
)
raise RuntimeError(e)
with plugins_init_locks[plugin_name]:
@ -137,13 +139,14 @@ def get_bus() -> Bus:
return main_bus
from platypush.bus.redis import RedisBus
return RedisBus()
def get_or_create_event_loop():
try:
loop = asyncio.get_event_loop()
except RuntimeError:
except (DeprecationWarning, RuntimeError):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

View File

@ -2,3 +2,7 @@
skip-string-normalization = true
skip-numeric-underscore-normalization = true
[tool.pytest.ini_options]
filterwarnings = [
'ignore:There is no current event loop:DeprecationWarning',
]