diff --git a/README.md b/README.md index 77bffba..490e663 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,12 @@ # micstream + Stream an audio input device over HTTP as mp3 + +## Requirements + +`ffmpeg` and `lame` installed on the system, e.g.: + +```bash +[sudo] apt-get install ffmpeg lame +``` + diff --git a/micstream/__init__.py b/micstream/__init__.py new file mode 100644 index 0000000..ad5bbe9 --- /dev/null +++ b/micstream/__init__.py @@ -0,0 +1,5 @@ +from micstream.audio import AudioSource +from micstream.server import Server + + +# vim:sw=4:ts=4:et: diff --git a/micstream/__main__.py b/micstream/__main__.py new file mode 100644 index 0000000..a4b24af --- /dev/null +++ b/micstream/__main__.py @@ -0,0 +1,27 @@ +import logging +import sys + +from micstream import AudioSource, Server + + +def init_logging(): + logging.basicConfig(level=logging.INFO, + stream=sys.stdout, + format='[%(asctime)s] %(name)s|%(levelname)-8s|%(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + +def main(): + init_logging() + + with AudioSource('plughw:3,0') as source, \ + Server() as server: + for sample in source: + server.process_audio(sample) + + +if __name__ == '__main__': + main() + + +# vim:sw=4:ts=4:et: diff --git a/micstream/audio.py b/micstream/audio.py new file mode 100644 index 0000000..50429c4 --- /dev/null +++ b/micstream/audio.py @@ -0,0 +1,102 @@ +import logging +import os +import signal +import subprocess +import sys +import time + + +class AudioSource: + def __init__(self, + device: str, + audio_system: str = 'alsa', + sample_rate: int = 44100, + bitrate: int = 128, + channels: int = 1, + ffmpeg_bin: str = 'ffmpeg', + bufsize: int = 8192, + debug: bool = False): + self.ffmpeg_bin = ffmpeg_bin + self.debug = debug + self.bufsize = bufsize + self.devnull = None + self.ffmpeg = None + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO) + self.ffmpeg_args = ( + ffmpeg_bin, '-f', audio_system, '-i', device, '-vn', '-acodec', 'libmp3lame', + '-b:a', str(bitrate) + 'k', '-ac', str(channels), '-ar', str(sample_rate), + '-f', 'mp3', '-') + + self._ffmpeg_start_time = None + self._first_sample_time = None + self.latency = 0. + + def __iter__(self): + return self + + def __next__(self) -> bytes: + if not self.ffmpeg or self.ffmpeg.poll() is not None: + raise StopIteration + + while True: + data = self.ffmpeg.stdout.read(self.bufsize) + if not data: + break + + if not self._first_sample_time: + self._first_sample_time = time.time() + self.latency = self._first_sample_time - self._ffmpeg_start_time + self.logger.info('Estimated latency: {} msec'.format(int(self.latency * 1000))) + + if time.time() - self._first_sample_time >= self.latency: + return data + + raise StopIteration + + def __enter__(self): + kwargs = dict(stdout=subprocess.PIPE) + if not self.debug: + self.devnull = open(os.devnull, 'w') + kwargs['stderr'] = self.devnull + + self.logger.info('Running FFmpeg: {}'.format(' '.join(self.ffmpeg_args))) + self.ffmpeg = subprocess.Popen(self.ffmpeg_args, **kwargs) + self._ffmpeg_start_time = time.time() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.ffmpeg: + self.ffmpeg.terminate() + try: + self.ffmpeg.wait(timeout=5) + except subprocess.TimeoutExpired: + self.logger.warning('FFmpeg process termination timeout') + + if self.ffmpeg.poll() is None: + self.ffmpeg.kill() + + self.ffmpeg.wait() + self.ffmpeg = None + + if self.devnull: + self.devnull.close() + self.devnull = None + + self._ffmpeg_start_time = None + self._first_sample_time = None + + def pause(self): + if not self.ffmpeg: + return + + self.ffmpeg.send_signal(signal.SIGSTOP) + + def resume(self): + if not self.ffmpeg: + return + + self.ffmpeg.send_signal(signal.SIGCONT) + + +# vim:sw=4:ts=4:et: diff --git a/micstream/server.py b/micstream/server.py new file mode 100644 index 0000000..0e164ba --- /dev/null +++ b/micstream/server.py @@ -0,0 +1,86 @@ +import logging +import threading + +from queue import Queue +from typing import Callable, Dict + +from flask import Flask, Response + + +class Endpoint: + def __init__(self, action): + self.action = action + + def __call__(self, *args, **kwargs): + return Response(self.action(*args, **kwargs), status=200, headers={}) + + +class Server: + def __init__(self, + host: str = '0.0.0.0', + port: int = 8080, + endpoint: str = '/stream.mp3', + debug: bool = False): + self.host = host + self.port = port + self.debug = debug + self.endpoint = endpoint + self.app = Flask(__name__) + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO) + + self._audio_lock = threading.RLock() + self._stream_queues: Dict[int, Queue] = {} + self._next_queue_id = 1 + + self.add_endpoints() + self.thread = threading.Thread(target=self.app.run, + kwargs=dict(host=self.host, port=self.port, + debug=debug, use_reloader=False)) + + def __enter__(self): + self.thread.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass # TODO find a clean way to get Flask to stop + + def process_audio(self, audio: bytes): + with self._audio_lock: + for q in self._stream_queues.values(): + q.put(audio) + + def add_endpoints(self): + self.add_endpoint(self.endpoint, self.stream()) + + def add_endpoint(self, endpoint: str, handler: Callable): + self.app.add_url_rule(endpoint, endpoint, handler) + + def _get_feed(self): + with self._audio_lock: + queue_id = self._next_queue_id + self._stream_queues[queue_id] = Queue() + self._next_queue_id += 1 + q = self._stream_queues[queue_id] + + try: + while True: + audio = q.get() + if not audio: + continue + + self.logger.debug('Got audio sample of length {}'.format(len(audio))) + yield audio + finally: + with self._audio_lock: + del self._stream_queues[queue_id] + + def stream(self): + def endpoint(): + return Response(self._get_feed(), + mimetype='audio/mpeg') + + return endpoint + + +# vim:sw=4:ts=4:et: