Added support for streaming audio from an input source over HTTP

This commit is contained in:
Fabio Manganiello 2019-07-23 18:22:00 +02:00
parent 426f064459
commit 43ca3a6f94
8 changed files with 316 additions and 77 deletions

View file

@ -0,0 +1,74 @@
import os
import tempfile
from flask import Response, Blueprint, request
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate, send_request
sound = Blueprint('sound', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
sound,
]
# Generates the .wav file header for a given set of samples and specs
# noinspection PyRedundantParentheses
def gen_header(sample_rate, sample_width, channels):
datasize = int(2000 * 1e6) # Arbitrary data size for streaming
o = bytes("RIFF", ' ascii') # (4byte) Marks file as RIFF
o += (datasize + 36).to_bytes(4, 'little') # (4byte) File size in bytes
o += bytes("WAVE", 'ascii') # (4byte) File type
o += bytes("fmt ", 'ascii') # (4byte) Format Chunk Marker
o += (16).to_bytes(4, 'little') # (4byte) Length of above format data
o += (1).to_bytes(2, 'little') # (2byte) Format type (1 - PCM)
o += channels.to_bytes(2, 'little') # (2byte)
o += sample_rate.to_bytes(4, 'little') # (4byte)
o += (sample_rate * channels * sample_width // 8).to_bytes(4, 'little') # (4byte)
o += (channels * sample_width // 8).to_bytes(2, 'little') # (2byte)
o += sample_width.to_bytes(2, 'little') # (2byte)
o += bytes("data", 'ascii') # (4byte) Data Chunk Marker
o += datasize.to_bytes(4, 'little') # (4byte) Data size in bytes
return o
def audio_feed(device, fifo, sample_rate, blocksize, latency, channels):
send_request(action='sound.stream_recording', device=device, sample_rate=sample_rate,
dtype='int16', fifo=fifo, blocksize=blocksize, latency=latency,
channels=channels)
try:
with open(fifo, 'rb') as f:
send_header = True
while True:
audio = f.read(blocksize)
if audio:
if send_header:
audio = gen_header(sample_rate=sample_rate, sample_width=16, channels=channels) + audio
send_header = False
yield audio
finally:
send_request(action='sound.stop_recording')
@sound.route('/sound/stream', methods=['GET'])
@authenticate()
def get_sound_feed():
device = request.args.get('device')
sample_rate = request.args.get('sample_rate', 44100)
blocksize = request.args.get('blocksize', 512)
latency = request.args.get('latency', 0)
channels = request.args.get('channels', 1)
fifo = request.args.get('fifo', os.path.join(tempfile.gettempdir(), 'inputstream'))
return Response(audio_feed(device=device, fifo=fifo, sample_rate=sample_rate,
blocksize=blocksize, latency=latency, channels=channels),
mimetype='audio/x-wav;codec=pcm')
# vim:sw=4:ts=4:et:

View file

@ -1,10 +1,9 @@
import importlib import importlib
import json
import logging import logging
import os import os
from functools import wraps from functools import wraps
from flask import request, redirect, Response from flask import abort, request, redirect, Response
from redis import Redis from redis import Redis
# NOTE: The HTTP service will *only* work on top of a Redis bus. The default # NOTE: The HTTP service will *only* work on top of a Redis bus. The default
@ -77,7 +76,7 @@ def get_websocket_port():
return http_conf.get('websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT) return http_conf.get('websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT)
def send_message(msg): def send_message(msg, wait_for_response=True):
msg = Message.build(msg) msg = Message.build(msg)
if isinstance(msg, Request): if isinstance(msg, Request):
@ -88,7 +87,7 @@ def send_message(msg):
bus().post(msg) bus().post(msg)
if isinstance(msg, Request): if isinstance(msg, Request) and wait_for_response:
response = get_message_response(msg) response = get_message_response(msg)
logger().debug('Processing response on the HTTP backend: {}'. logger().debug('Processing response on the HTTP backend: {}'.
format(response)) format(response))
@ -96,7 +95,7 @@ def send_message(msg):
return response return response
def send_request(action, **kwargs): def send_request(action, wait_for_response=True, **kwargs):
msg = { msg = {
'type': 'request', 'type': 'request',
'action': action 'action': action
@ -105,12 +104,11 @@ def send_request(action, **kwargs):
if kwargs: if kwargs:
msg['args'] = kwargs msg['args'] = kwargs
return send_message(msg) return send_message(msg, wait_for_response=wait_for_response)
def _authenticate_token(): def _authenticate_token():
token = Config.get('token') token = Config.get('token')
user_token = None
if 'X-Token' in request.headers: if 'X-Token' in request.headers:
user_token = request.headers['X-Token'] user_token = request.headers['X-Token']
@ -154,7 +152,6 @@ def _authenticate_session():
def _authenticate_csrf_token(): def _authenticate_csrf_token():
user_manager = UserManager() user_manager = UserManager()
user_session_token = None user_session_token = None
user = None
if 'X-Session-Token' in request.headers: if 'X-Session-Token' in request.headers:
user_session_token = request.headers['X-Session-Token'] user_session_token = request.headers['X-Session-Token']
@ -207,7 +204,6 @@ def authenticate(redirect_page='', skip_auth_methods=None, check_csrf_token=Fals
return redirect('/login?redirect=' + redirect_page, 307) return redirect('/login?redirect=' + redirect_page, 307)
# CSRF token check # CSRF token check
csrf_check_ok = True
if check_csrf_token: if check_csrf_token:
csrf_check_ok = _authenticate_csrf_token() csrf_check_ok = _authenticate_csrf_token()
if not csrf_check_ok: if not csrf_check_ok:

View file

@ -0,0 +1,15 @@
@import 'common/vars';
.sound {
height: 90%;
margin-top: 7%;
overflow: hidden;
display: flex;
flex-direction: column;
align-items: center;
.sound-container {
margin-bottom: 1em;
}
}

View file

@ -41,7 +41,7 @@ Vue.component('camera', {
this.streaming = false; this.streaming = false;
this.capturing = true; this.capturing = true;
this.$refs.frame.setAttribute('src', '/camera/' + this.deviceId + '/frame'); this.$refs.frame.setAttribute('src', '/camera/' + this.deviceId + '/frame?t=' + (new Date()).getTime());
}, },
onFrameLoaded: function(event) { onFrameLoaded: function(event) {

View file

@ -0,0 +1,22 @@
Vue.component('sound', {
template: '#tmpl-sound',
props: ['config'],
data: function() {
return {
bus: new Vue({}),
recording: false,
};
},
methods: {
startRecording: function() {
this.recording = true;
},
stopRecording: function() {
this.recording = false;
},
},
});

View file

@ -9,6 +9,7 @@
'music.mpd': 'fa fa-music', 'music.mpd': 'fa fa-music',
'music.snapcast': 'fa fa-volume-up', 'music.snapcast': 'fa fa-volume-up',
'sensors': 'fas fa-thermometer-half', 'sensors': 'fas fa-thermometer-half',
'sound': 'fas fa-microphone',
'switches': 'fa fa-toggle-on', 'switches': 'fa fa-toggle-on',
'tts': 'fa fa-comment', 'tts': 'fa fa-comment',
'tts.google': 'fa fa-comment', 'tts.google': 'fa fa-comment',

View file

@ -0,0 +1,21 @@
<script type="text/x-template" id="tmpl-sound">
<div class="sound">
<div class="sound-container">
<audio autoplay preload="none" ref="player" v-if="recording">
<source src="/sound/stream" type="audio/x-wav;codec=pcm">
Your browser does not support audio elements
</audio>
</div>
<div class="controls">
<button type="button" @click="startRecording" v-if="!recording">
<i class="fa fa-play"></i>&nbsp; Start streaming audio
</button>
<button type="button" @click="stopRecording" v-else>
<i class="fa fa-stop"></i>&nbsp; Stop streaming audio
</button>
</div>
</div>
</script>

View file

@ -2,10 +2,9 @@
.. moduleauthor:: Fabio Manganiello <blacklight86@gmail.com> .. moduleauthor:: Fabio Manganiello <blacklight86@gmail.com>
""" """
import json
import math
import os import os
import queue import queue
import stat
import tempfile import tempfile
import time import time
@ -15,24 +14,22 @@ from threading import Thread, Event, RLock
from .core import Sound, Mix from .core import Sound, Mix
from platypush.context import get_bus from platypush.context import get_bus
from platypush.message.event.sound import SoundPlaybackStartedEvent, \ from platypush.message.event.sound import \
SoundPlaybackPausedEvent, SoundPlaybackStoppedEvent, \ SoundRecordingStartedEvent, SoundRecordingStoppedEvent
SoundRecordingStartedEvent, SoundRecordingPausedEvent, \
SoundRecordingStoppedEvent
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
class PlaybackState(Enum): class PlaybackState(Enum):
STOPPED='STOPPED', STOPPED = 'STOPPED',
PLAYING='PLAYING', PLAYING = 'PLAYING',
PAUSED='PAUSED' PAUSED = 'PAUSED'
class RecordingState(Enum): class RecordingState(Enum):
STOPPED='STOPPED', STOPPED = 'STOPPED',
RECORDING='RECORDING', RECORDING = 'RECORDING',
PAUSED='PAUSED' PAUSED = 'PAUSED'
class SoundPlugin(Plugin): class SoundPlugin(Plugin):
@ -56,10 +53,12 @@ class SoundPlugin(Plugin):
""" """
_STREAM_NAME_PREFIX = 'platypush-stream-' _STREAM_NAME_PREFIX = 'platypush-stream-'
_default_input_stream_fifo = os.path.join(tempfile.gettempdir(), 'inputstream')
# noinspection PyProtectedMember
def __init__(self, input_device=None, output_device=None, def __init__(self, input_device=None, output_device=None,
input_blocksize=Sound._DEFAULT_BLOCKSIZE, input_blocksize=Sound._DEFAULT_BLOCKSIZE,
output_blocksize=Sound._DEFAULT_BLOCKSIZE, *args, **kwargs): output_blocksize=Sound._DEFAULT_BLOCKSIZE, **kwargs):
""" """
:param input_device: Index or name of the default input device. Use :param input_device: Index or name of the default input device. Use
:meth:`platypush.plugins.sound.query_devices` to get the :meth:`platypush.plugins.sound.query_devices` to get the
@ -82,7 +81,7 @@ class SoundPlugin(Plugin):
:type output_blocksize: int :type output_blocksize: int
""" """
super().__init__(*args, **kwargs) super().__init__(**kwargs)
self.input_device = input_device self.input_device = input_device
self.output_device = output_device self.output_device = output_device
@ -101,7 +100,8 @@ class SoundPlugin(Plugin):
self.stream_index_to_name = {} self.stream_index_to_name = {}
self.completed_callback_events = {} self.completed_callback_events = {}
def _get_default_device(self, category): @staticmethod
def _get_default_device(category):
""" """
Query the default audio devices. Query the default audio devices.
@ -110,8 +110,7 @@ class SoundPlugin(Plugin):
""" """
import sounddevice as sd import sounddevice as sd
return sd.query_hostapis()[0].get('default_' + category.lower() + return sd.query_hostapis()[0].get('default_' + category.lower() + '_device')
'_device')
@action @action
def query_devices(self, category=None): def query_devices(self, category=None):
@ -167,7 +166,8 @@ class SoundPlugin(Plugin):
is_raw_stream = streamtype == sd.RawOutputStream is_raw_stream = streamtype == sd.RawOutputStream
def audio_callback(outdata, frames, time, status): # noinspection PyUnusedLocal
def audio_callback(outdata, frames, frame_time, status):
if self._get_playback_state(stream_index) == PlaybackState.STOPPED: if self._get_playback_state(stream_index) == PlaybackState.STOPPED:
raise sd.CallbackStop raise sd.CallbackStop
@ -202,7 +202,6 @@ class SoundPlugin(Plugin):
return audio_callback return audio_callback
@action @action
def play(self, file=None, sound=None, device=None, blocksize=None, def play(self, file=None, sound=None, device=None, blocksize=None,
bufsize=None, samplerate=None, channels=None, stream_name=None, bufsize=None, samplerate=None, channels=None, stream_name=None,
@ -294,14 +293,18 @@ class SoundPlugin(Plugin):
if not channels: if not channels:
channels = f.channels if f else 1 channels = f.channels if f else 1
mix = None
with self.playback_state_lock: with self.playback_state_lock:
stream_index, is_new_stream = self._get_or_allocate_stream_index( stream_index, is_new_stream = self._get_or_allocate_stream_index(
stream_index=stream_index, stream_name=stream_name) stream_index=stream_index, stream_name=stream_name)
if sound: if sound and stream_index in self.stream_mixes:
mix = self.stream_mixes[stream_index] mix = self.stream_mixes[stream_index]
mix.add(sound) mix.add(sound)
if not mix:
return None, "Unable to allocate the stream"
self.logger.info(('Starting playback of {} to sound device [{}] ' + self.logger.info(('Starting playback of {} to sound device [{}] ' +
'on stream [{}]').format( 'on stream [{}]').format(
file or sound, device, stream_index)) file or sound, device, stream_index))
@ -321,11 +324,10 @@ class SoundPlugin(Plugin):
else: else:
duration = mix.duration() duration = mix.duration()
blocktime = float(blocksize / samplerate) blocktime = float(blocksize / samplerate)
next_t = min(t+blocktime, duration) \ next_t = min(t + blocktime, duration) \
if duration is not None else t+blocktime if duration is not None else t + blocktime
data = mix.get_wave(t_start=t, t_end=next_t, data = mix.get_wave(t_start=t, t_end=next_t, samplerate=samplerate)
samplerate=samplerate)
t = next_t t = next_t
if duration is not None and t >= duration: if duration is not None and t >= duration:
@ -333,7 +335,6 @@ class SoundPlugin(Plugin):
q.put_nowait(data) # Pre-fill the audio queue q.put_nowait(data) # Pre-fill the audio queue
stream = self.active_streams[stream_index] stream = self.active_streams[stream_index]
completed_callback_event = self.completed_callback_events[stream_index] completed_callback_event = self.completed_callback_events[stream_index]
@ -367,8 +368,8 @@ class SoundPlugin(Plugin):
else: else:
duration = mix.duration() duration = mix.duration()
blocktime = float(blocksize / samplerate) blocktime = float(blocksize / samplerate)
next_t = min(t+blocktime, duration) \ next_t = min(t + blocktime, duration) \
if duration is not None else t+blocktime if duration is not None else t + blocktime
data = mix.get_wave(t_start=t, t_end=next_t, data = mix.get_wave(t_start=t, t_end=next_t,
samplerate=samplerate) samplerate=samplerate)
@ -400,10 +401,115 @@ class SoundPlugin(Plugin):
self.stop_playback([stream_index]) self.stop_playback([stream_index])
@action
def stream_recording(self, device=None, fifo=None, duration=None, sample_rate=None,
dtype='float32', blocksize=None, latency=0, channels=1):
"""
Return audio data from an audio source
:param device: Input device (default: default configured device or system default audio input if not configured)
:type device: int or str
:param fifo: Path of the FIFO that will be used to exchange audio samples (default: /tmp/inputstream)
:type fifo: str
:param duration: Recording duration in seconds (default: record until stop event)
:type duration: float
:param sample_rate: Recording sample rate (default: device default rate)
:type sample_rate: int
:param dtype: Data type for the audio samples. Supported types:
'float64', 'float32', 'int32', 'int16', 'int8', 'uint8'. Default: float32
:type dtype: str
:param blocksize: Audio block size (default: configured `input_blocksize` or 2048)
:type blocksize: int
:param latency: Device latency in seconds (default: 0)
:type latency: float
:param channels: Number of channels (default: 1)
:type channels: int
"""
import sounddevice as sd
self.recording_paused_changed.clear()
if device is None:
device = self.input_device
if device is None:
device = self._get_default_device('input')
if sample_rate is None:
dev_info = sd.query_devices(device, 'input')
sample_rate = int(dev_info['default_samplerate'])
if blocksize is None:
blocksize = self.input_blocksize
if not fifo:
fifo = self._default_input_stream_fifo
q = queue.Queue()
# noinspection PyUnusedLocal
def audio_callback(indata, frames, time_duration, status):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
if status:
self.logger.warning('Recording callback status: {}'.format(str(status)))
q.put(indata.copy())
def streaming_thread():
try:
with sd.InputStream(samplerate=sample_rate, device=device,
channels=channels, callback=audio_callback,
dtype=dtype, latency=latency, blocksize=blocksize):
with open(fifo, 'wb') as audio_queue:
self.start_recording()
get_bus().post(SoundRecordingStartedEvent())
self.logger.info('Started recording from device [{}]'.format(device))
recording_started_time = time.time()
while self._get_recording_state() != RecordingState.STOPPED \
and (duration is None or
time.time() - recording_started_time < duration):
while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait()
get_args = {
'block': True,
'timeout': max(0, duration - (time.time() - recording_started_time)),
} if duration is not None else {}
data = q.get(**get_args)
if not len(data):
continue
audio_queue.write(data)
except queue.Empty:
self.logger.warning('Recording timeout: audio callback failed?')
finally:
self.stop_recording()
get_bus().post(SoundRecordingStoppedEvent())
if os.path.exists(fifo):
if stat.S_ISFIFO(os.stat(fifo).st_mode):
self.logger.info('Removing previous input stream FIFO {}'.format(fifo))
os.unlink(fifo)
else:
raise RuntimeError('{} exists and is not a FIFO. Please remove it or rename it'.format(fifo))
os.mkfifo(fifo, 0o644)
Thread(target=streaming_thread).start()
@action @action
def record(self, outfile=None, duration=None, device=None, sample_rate=None, def record(self, outfile=None, duration=None, device=None, sample_rate=None,
blocksize=None, latency=0, channels=1, subtype='PCM_24'): format=None, blocksize=None, latency=0, channels=1, subtype='PCM_24'):
""" """
Records audio to a sound file (support formats: wav, raw) Records audio to a sound file (support formats: wav, raw)
@ -419,6 +525,9 @@ class SoundPlugin(Plugin):
:param sample_rate: Recording sample rate (default: device default rate) :param sample_rate: Recording sample rate (default: device default rate)
:type sample_rate: int :type sample_rate: int
:param format: Audio format (default: WAV)
:type format: str
:param blocksize: Audio block size (default: configured `input_blocksize` or 2048) :param blocksize: Audio block size (default: configured `input_blocksize` or 2048)
:type blocksize: int :type blocksize: int
@ -432,23 +541,22 @@ class SoundPlugin(Plugin):
:type subtype: str :type subtype: str
""" """
def recording_thread(outfile, duration, device, sample_rate, blocksize, def recording_thread(outfile, duration, device, sample_rate, format,
latency, channels, subtype): blocksize, latency, channels, subtype):
import sounddevice as sd import sounddevice as sd
self.recording_paused_changed.clear() self.recording_paused_changed.clear()
if outfile: if outfile:
outfile = os.path.abspath(os.path.expanduser(outfile)) outfile = os.path.abspath(os.path.expanduser(outfile))
if os.path.isfile(outfile):
self.logger.info('Removing existing audio file {}'.format(outfile))
os.unlink(outfile)
else: else:
outfile = tempfile.NamedTemporaryFile( outfile = tempfile.NamedTemporaryFile(
prefix='recording_', suffix='.wav', delete=False, prefix='recording_', suffix='.wav', delete=False,
dir=tempfile.gettempdir()).name dir=tempfile.gettempdir()).name
if os.path.isfile(outfile):
self.logger.info('Removing existing audio file {}'.format(outfile))
os.unlink(outfile)
if device is None: if device is None:
device = self.input_device device = self.input_device
if device is None: if device is None:
@ -463,7 +571,7 @@ class SoundPlugin(Plugin):
q = queue.Queue() q = queue.Queue()
def audio_callback(indata, frames, time, status): def audio_callback(indata, frames, duration, status):
while self._get_recording_state() == RecordingState.PAUSED: while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait() self.recording_paused_changed.wait()
@ -471,15 +579,19 @@ class SoundPlugin(Plugin):
self.logger.warning('Recording callback status: {}'.format( self.logger.warning('Recording callback status: {}'.format(
str(status))) str(status)))
q.put(indata.copy()) q.put({
'timestamp': time.time(),
'frames': frames,
'time': duration,
'data': indata.copy()
})
try: try:
import soundfile as sf import soundfile as sf
import numpy import numpy
with sf.SoundFile(outfile, mode='x', samplerate=sample_rate, with sf.SoundFile(outfile, mode='w', samplerate=sample_rate,
channels=channels, subtype=subtype) as f: format=format, channels=channels, subtype=subtype) as f:
with sd.InputStream(samplerate=sample_rate, device=device, with sd.InputStream(samplerate=sample_rate, device=device,
channels=channels, callback=audio_callback, channels=channels, callback=audio_callback,
latency=latency, blocksize=blocksize): latency=latency, blocksize=blocksize):
@ -498,30 +610,30 @@ class SoundPlugin(Plugin):
get_args = { get_args = {
'block': True, 'block': True,
'timeout': max(0, duration - (time.time() - 'timeout': max(0, duration - (time.time() - recording_started_time)),
recording_started_time))
} if duration is not None else {} } if duration is not None else {}
data = q.get(**get_args) data = q.get(**get_args)
f.write(data) if data and time.time() - data.get('timestamp') <= 1.0:
# Only write the block if the latency is still acceptable
f.write(data['data'])
f.flush() f.flush()
except queue.Empty as e: except queue.Empty:
self.logger.warning('Recording timeout: audio callback failed?') self.logger.warning('Recording timeout: audio callback failed?')
finally: finally:
self.stop_recording() self.stop_recording()
get_bus().post(SoundRecordingStoppedEvent(filename=outfile)) get_bus().post(SoundRecordingStoppedEvent(filename=outfile))
Thread(target=recording_thread, args=( Thread(target=recording_thread,
outfile, duration, device, sample_rate, blocksize, latency, channels, args=(
subtype)).start() outfile, duration, device, sample_rate, format, blocksize, latency, channels, subtype)
).start()
@action @action
def recordplay(self, duration=None, input_device=None, output_device=None, def recordplay(self, duration=None, input_device=None, output_device=None,
sample_rate=None, blocksize=None, latency=0, channels=1, sample_rate=None, blocksize=None, latency=0, channels=1, dtype=None):
dtype=None):
""" """
Records audio and plays it on an output sound device (audio pass-through) Records audio and plays it on an output sound device (audio pass-through)
@ -571,6 +683,7 @@ class SoundPlugin(Plugin):
if blocksize is None: if blocksize is None:
blocksize = self.output_blocksize blocksize = self.output_blocksize
# noinspection PyUnusedLocal
def audio_callback(indata, outdata, frames, time, status): def audio_callback(indata, outdata, frames, time, status):
while self._get_recording_state() == RecordingState.PAUSED: while self._get_recording_state() == RecordingState.PAUSED:
self.recording_paused_changed.wait() self.recording_paused_changed.wait()
@ -581,7 +694,6 @@ class SoundPlugin(Plugin):
outdata[:] = indata outdata[:] = indata
stream_index = None stream_index = None
try: try:
@ -617,7 +729,6 @@ class SoundPlugin(Plugin):
self.stop_playback([stream_index]) self.stop_playback([stream_index])
self.stop_recording() self.stop_recording()
@action @action
def query_streams(self): def query_streams(self):
""" """
@ -638,12 +749,11 @@ class SoundPlugin(Plugin):
stream['playback_state'] = self.playback_state[i].name stream['playback_state'] = self.playback_state[i].name
stream['name'] = self.stream_index_to_name.get(i) stream['name'] = self.stream_index_to_name.get(i)
if i in self.stream_mixes: if i in self.stream_mixes:
stream['mix'] = { j: sound for j, sound in stream['mix'] = {j: sound for j, sound in
enumerate(list(self.stream_mixes[i])) } enumerate(list(self.stream_mixes[i]))}
return streams return streams
def _get_or_allocate_stream_index(self, stream_index=None, stream_name=None, def _get_or_allocate_stream_index(self, stream_index=None, stream_name=None,
completed_callback_event=None): completed_callback_event=None):
stream = None stream = None
@ -673,7 +783,7 @@ class SoundPlugin(Plugin):
stream_index = None stream_index = None
with self.playback_state_lock: with self.playback_state_lock:
for i in range(len(self.active_streams)+1): for i in range(len(self.active_streams) + 1):
if i not in self.active_streams: if i not in self.active_streams:
stream_index = i stream_index = i
break break