forked from platypush/platypush
More robust mechanism for websocket message send section locking
This commit is contained in:
parent
ca030c9b25
commit
c5adc141ea
1 changed files with 12 additions and 6 deletions
|
@ -225,12 +225,18 @@ class HttpBackend(Backend):
|
||||||
raise TimeoutError('Websocket on address {} not ready to receive data'.format(addr))
|
raise TimeoutError('Websocket on address {} not ready to receive data'.format(addr))
|
||||||
|
|
||||||
def _release_websocket_lock(self, ws):
|
def _release_websocket_lock(self, ws):
|
||||||
addr = ws.local_address
|
try:
|
||||||
if addr in self._websocket_locks:
|
acquire_ok = self._websocket_lock.acquire(timeout=self._websocket_lock_timeout)
|
||||||
try:
|
if not acquire_ok:
|
||||||
|
raise TimeoutError('Websocket lock acquire timeout')
|
||||||
|
|
||||||
|
addr = ws.remote_address
|
||||||
|
if addr in self._websocket_locks:
|
||||||
self._websocket_locks[addr].release()
|
self._websocket_locks[addr].release()
|
||||||
except RuntimeError:
|
except Exception as e:
|
||||||
pass
|
self.logger.warning('Unhandled exception while releasing websocket lock: {}'.format(str(e)))
|
||||||
|
finally:
|
||||||
|
self._websocket_lock.release()
|
||||||
|
|
||||||
def notify_web_clients(self, event):
|
def notify_web_clients(self, event):
|
||||||
""" Notify all the connected web clients (over websocket) of a new event """
|
""" Notify all the connected web clients (over websocket) of a new event """
|
||||||
|
@ -246,8 +252,8 @@ class HttpBackend(Backend):
|
||||||
self._release_websocket_lock(ws)
|
self._release_websocket_lock(ws)
|
||||||
|
|
||||||
loop = get_or_create_event_loop()
|
loop = get_or_create_event_loop()
|
||||||
|
|
||||||
wss = self.active_websockets.copy()
|
wss = self.active_websockets.copy()
|
||||||
|
|
||||||
for _ws in wss:
|
for _ws in wss:
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(send_event(_ws))
|
loop.run_until_complete(send_event(_ws))
|
||||||
|
|
Loading…
Reference in a new issue