87 lines
2.4 KiB
Python
87 lines
2.4 KiB
Python
import logging
|
|
import threading
|
|
|
|
from queue import Queue
|
|
from typing import Callable, Dict
|
|
|
|
from flask import Flask, Response
|
|
|
|
|
|
class Endpoint:
|
|
def __init__(self, action):
|
|
self.action = action
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
return Response(self.action(*args, **kwargs), status=200, headers={})
|
|
|
|
|
|
class Server:
|
|
def __init__(self,
|
|
host: str = '0.0.0.0',
|
|
port: int = 8080,
|
|
endpoint: str = '/stream.mp3',
|
|
debug: bool = False):
|
|
self.host = host
|
|
self.port = port
|
|
self.debug = debug
|
|
self.endpoint = endpoint
|
|
self.app = Flask(__name__)
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO)
|
|
|
|
self._audio_lock = threading.RLock()
|
|
self._stream_queues: Dict[int, Queue] = {}
|
|
self._next_queue_id = 1
|
|
|
|
self.add_endpoints()
|
|
self.thread = threading.Thread(target=self.app.run,
|
|
kwargs=dict(host=self.host, port=self.port,
|
|
debug=debug, use_reloader=False))
|
|
|
|
def __enter__(self):
|
|
self.thread.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
pass # TODO find a clean way to get Flask to stop
|
|
|
|
def process_audio(self, audio: bytes):
|
|
with self._audio_lock:
|
|
for q in self._stream_queues.values():
|
|
q.put(audio)
|
|
|
|
def add_endpoints(self):
|
|
self.add_endpoint(self.endpoint, self.stream())
|
|
|
|
def add_endpoint(self, endpoint: str, handler: Callable):
|
|
self.app.add_url_rule(endpoint, endpoint, handler)
|
|
|
|
def _get_feed(self):
|
|
with self._audio_lock:
|
|
queue_id = self._next_queue_id
|
|
self._stream_queues[queue_id] = Queue()
|
|
self._next_queue_id += 1
|
|
q = self._stream_queues[queue_id]
|
|
|
|
try:
|
|
while True:
|
|
audio = q.get()
|
|
if not audio:
|
|
continue
|
|
|
|
self.logger.debug('Got audio sample of length {}'.format(len(audio)))
|
|
yield audio
|
|
finally:
|
|
with self._audio_lock:
|
|
del self._stream_queues[queue_id]
|
|
|
|
def stream(self):
|
|
def endpoint():
|
|
return Response(self._get_feed(),
|
|
mimetype='audio/mpeg')
|
|
|
|
return endpoint
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|