diff --git a/platypush/plugins/media/search/local.py b/platypush/plugins/media/search/local/__init__.py similarity index 62% rename from platypush/plugins/media/search/local.py rename to platypush/plugins/media/search/local/__init__.py index 21603dc1b..628fc7e09 100644 --- a/platypush/plugins/media/search/local.py +++ b/platypush/plugins/media/search/local/__init__.py @@ -1,27 +1,28 @@ import datetime import os import re +import threading import time -from sqlalchemy import ( - create_engine, - Column, - Integer, - String, - DateTime, - PrimaryKeyConstraint, - ForeignKey, -) -from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy import create_engine from sqlalchemy.sql.expression import func -from platypush.common.db import declarative_base from platypush.config import Config from platypush.plugins.media import MediaPlugin from platypush.plugins.media.search import MediaSearcher -Base = declarative_base() -Session = scoped_session(sessionmaker()) +from .db import ( + Base, + MediaDirectory, + MediaFile, + MediaFileToken, + MediaToken, + Session, +) + +from .metadata import get_metadata + +_db_lock = threading.RLock() class LocalMediaSearcher(MediaSearcher): @@ -37,7 +38,7 @@ class LocalMediaSearcher(MediaSearcher): def __init__(self, dirs, *args, **kwargs): super().__init__(*args, **kwargs) self.dirs = dirs - db_dir = os.path.join(Config.get('workdir'), 'media') + db_dir = os.path.join(Config.get_workdir(), 'media') os.makedirs(db_dir, exist_ok=True) self.db_file = os.path.join(db_dir, 'media.db') self._db_engine = None @@ -142,6 +143,7 @@ class LocalMediaSearcher(MediaSearcher): ).delete(synchronize_session='fetch') return + media_files = [] stored_file_records = { f.path: f for f in self._get_file_records(dir_record, session) } @@ -162,7 +164,7 @@ class LocalMediaSearcher(MediaSearcher): ) and not MediaPlugin.is_audio_file(filename): continue - self.logger.debug('Syncing item {}'.format(filepath)) + self.logger.info('Scanning item %s', filepath) tokens = [ _.lower() for _ in re.split(self._filename_separators, filename.strip()) @@ -170,9 +172,9 @@ class LocalMediaSearcher(MediaSearcher): token_records = self._sync_token_records(session, *tokens) file_record = MediaFile.build(directory_id=dir_record.id, path=filepath) - session.add(file_record) session.commit() + file_record = ( session.query(MediaFile) .filter_by(directory_id=dir_record.id, path=filepath) @@ -185,6 +187,8 @@ class LocalMediaSearcher(MediaSearcher): ) session.add(file_token) + media_files.append(file_record) + # stored_file_records should now only contain the records of the files # that have been removed from the directory if stored_file_records: @@ -198,7 +202,7 @@ class LocalMediaSearcher(MediaSearcher): MediaFile.id.in_([record.id for record in stored_file_records.values()]) ).delete(synchronize_session='fetch') - dir_record.last_indexed_at = datetime.datetime.now() + dir_record.last_indexed_at = datetime.datetime.now() # type: ignore self.logger.info( 'Scanned {} in {} seconds'.format( media_dir, int(time.time() - index_start_time) @@ -207,7 +211,32 @@ class LocalMediaSearcher(MediaSearcher): session.commit() - def search(self, query, **kwargs): + # Start the metadata scan in a separate thread + threading.Thread( + target=self._metadata_scan_thread, args=(media_files,), daemon=True + ).start() + + def _metadata_scan_thread(self, records): + """ + Thread that will scan the media files in the given paths and update + their metadata. + """ + paths = [record.path for record in records] + metadata = get_metadata(*paths) + session = self._get_db_session() + + for record, data in zip(records, metadata): + record = session.merge(record) + record.duration = data.get('duration') # type: ignore + record.width = data.get('width') # type: ignore + record.height = data.get('height') # type: ignore + record.image = data.get('image') # type: ignore + record.created_at = data.get('created_at') # type: ignore + session.add(record) + + session.commit() + + def search(self, query, **_): """ Searches in the configured media directories given a query. It uses the built-in SQLite index if available. If any directory has changed since @@ -218,123 +247,46 @@ class LocalMediaSearcher(MediaSearcher): session = self._get_db_session() results = {} - for media_dir in self.dirs: - self.logger.info('Searching {} for "{}"'.format(media_dir, query)) - dir_record = self._get_or_create_dir_entry(session, media_dir) + with _db_lock: + for media_dir in self.dirs: + self.logger.info('Searching {} for "{}"'.format(media_dir, query)) + dir_record = self._get_or_create_dir_entry(session, media_dir) - if self._has_directory_changed_since_last_indexing(dir_record): - self.logger.info( - '{} has changed since last indexing, '.format(media_dir) - + 're-indexing' - ) + if self._has_directory_changed_since_last_indexing(dir_record): + self.logger.info( + '{} has changed since last indexing, '.format(media_dir) + + 're-indexing' + ) - self.scan(media_dir, session=session, dir_record=dir_record) + self.scan(media_dir, session=session, dir_record=dir_record) - query_tokens = [ - _.lower() for _ in re.split(self._filename_separators, query.strip()) - ] + query_tokens = [ + _.lower() + for _ in re.split(self._filename_separators, query.strip()) + ] - for file_record in ( - session.query(MediaFile.path) - .join(MediaFileToken) - .join(MediaToken) - .filter(MediaToken.token.in_(query_tokens)) - .group_by(MediaFile.path) - .having(func.count(MediaFileToken.token_id) >= len(query_tokens)) - ): - if os.path.isfile(file_record.path): - results[file_record.path] = { - 'url': 'file://' + file_record.path, - 'title': os.path.basename(file_record.path), - 'size': os.path.getsize(file_record.path), - } + for file_record in session.query(MediaFile).where( + MediaFile.id.in_( + session.query(MediaFile.id) + .join(MediaFileToken) + .join(MediaToken) + .filter(MediaToken.token.in_(query_tokens)) + .group_by(MediaFile.id) + .having( + func.count(MediaFileToken.token_id) >= len(query_tokens) + ) + ) + ): + if os.path.isfile(file_record.path): + results[file_record.path] = { + 'url': 'file://' + file_record.path, + 'title': os.path.basename(file_record.path), + 'size': os.path.getsize(file_record.path), + 'duration': file_record.duration, + 'width': file_record.width, + 'height': file_record.height, + 'image': file_record.image, + 'created_at': file_record.created_at, + } return results.values() - - -# --- Table definitions - - -class MediaDirectory(Base): - """Models the MediaDirectory table""" - - __tablename__ = 'MediaDirectory' - __table_args__ = {'sqlite_autoincrement': True} - - id = Column(Integer, primary_key=True) - path = Column(String) - last_indexed_at = Column(DateTime) - - @classmethod - def build(cls, path, last_indexed_at=None, id=None): - record = cls() - record.id = id - record.path = path - record.last_indexed_at = last_indexed_at - return record - - -class MediaFile(Base): - """Models the MediaFile table""" - - __tablename__ = 'MediaFile' - __table_args__ = {'sqlite_autoincrement': True} - - id = Column(Integer, primary_key=True) - directory_id = Column( - Integer, ForeignKey('MediaDirectory.id', ondelete='CASCADE'), nullable=False - ) - path = Column(String, nullable=False, unique=True) - indexed_at = Column(DateTime) - - @classmethod - def build(cls, directory_id, path, indexed_at=None, id=None): - record = cls() - record.id = id - record.directory_id = directory_id - record.path = path - record.indexed_at = indexed_at or datetime.datetime.now() - return record - - -class MediaToken(Base): - """Models the MediaToken table""" - - __tablename__ = 'MediaToken' - __table_args__ = {'sqlite_autoincrement': True} - - id = Column(Integer, primary_key=True) - token = Column(String, nullable=False, unique=True) - - @classmethod - def build(cls, token, id=None): - record = cls() - record.id = id - record.token = token - return record - - -class MediaFileToken(Base): - """Models the MediaFileToken table""" - - __tablename__ = 'MediaFileToken' - - file_id = Column( - Integer, ForeignKey('MediaFile.id', ondelete='CASCADE'), nullable=False - ) - token_id = Column( - Integer, ForeignKey('MediaToken.id', ondelete='CASCADE'), nullable=False - ) - - __table_args__ = (PrimaryKeyConstraint(file_id, token_id), {}) - - @classmethod - def build(cls, file_id, token_id, id=None): - record = cls() - record.id = id - record.file_id = file_id - record.token_id = token_id - return record - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/search/local/db.py b/platypush/plugins/media/search/local/db.py new file mode 100644 index 000000000..3aa15aebb --- /dev/null +++ b/platypush/plugins/media/search/local/db.py @@ -0,0 +1,111 @@ +import datetime + +from sqlalchemy import ( + Float, + Column, + Integer, + String, + DateTime, + PrimaryKeyConstraint, + ForeignKey, +) +from sqlalchemy.orm import sessionmaker, scoped_session + +from platypush.common.db import declarative_base + +Base = declarative_base() +Session = scoped_session(sessionmaker()) + + +class MediaDirectory(Base): + """Models the MediaDirectory table""" + + __tablename__ = 'MediaDirectory' + __table_args__ = {'sqlite_autoincrement': True} + + id = Column(Integer, primary_key=True) + path = Column(String) + last_indexed_at = Column(DateTime) + + @classmethod + def build(cls, path, last_indexed_at=None, id=None): + record = cls() + record.id = id + record.path = path + record.last_indexed_at = last_indexed_at + return record + + +class MediaFile(Base): + """Models the MediaFile table""" + + __tablename__ = 'MediaFile' + __table_args__ = {'sqlite_autoincrement': True} + + id = Column(Integer, primary_key=True) + directory_id = Column( + Integer, ForeignKey('MediaDirectory.id', ondelete='CASCADE'), nullable=False + ) + path = Column(String, nullable=False, unique=True) + duration = Column(Float) + width = Column(Integer) + height = Column(Integer) + image = Column(String) + created_at = Column(DateTime) + indexed_at = Column(DateTime) + + @classmethod + def build(cls, directory_id, path, indexed_at=None, id=None, **kwargs): + record = cls() + record.id = id + record.directory_id = directory_id + record.path = path + record.indexed_at = indexed_at or datetime.datetime.now() + + for k, v in kwargs.items(): + setattr(record, k, v) + + return record + + +class MediaToken(Base): + """Models the MediaToken table""" + + __tablename__ = 'MediaToken' + __table_args__ = {'sqlite_autoincrement': True} + + id = Column(Integer, primary_key=True) + token = Column(String, nullable=False, unique=True) + + @classmethod + def build(cls, token, id=None): + record = cls() + record.id = id + record.token = token + return record + + +class MediaFileToken(Base): + """Models the MediaFileToken table""" + + __tablename__ = 'MediaFileToken' + + file_id = Column( + Integer, ForeignKey('MediaFile.id', ondelete='CASCADE'), nullable=False + ) + token_id = Column( + Integer, ForeignKey('MediaToken.id', ondelete='CASCADE'), nullable=False + ) + + __table_args__ = (PrimaryKeyConstraint(file_id, token_id), {}) + + @classmethod + def build(cls, file_id, token_id, id=None): + record = cls() + record.id = id + record.file_id = file_id + record.token_id = token_id + return record + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/search/local/metadata.py b/platypush/plugins/media/search/local/metadata.py new file mode 100644 index 000000000..574590d7b --- /dev/null +++ b/platypush/plugins/media/search/local/metadata.py @@ -0,0 +1,86 @@ +import datetime +import json +import logging +import multiprocessing +import os +import subprocess +from concurrent.futures import ProcessPoolExecutor + +from dateutil.parser import isoparse +import shutil + +logger = logging.getLogger(__name__) + + +def get_file_metadata(path: str): + """ + Retrieves the metadata of a media file using ffprobe. + """ + logger.info('Retrieving metadata for %s', path) + + with subprocess.Popen( + [ + 'ffprobe', + '-v', + 'quiet', + '-print_format', + 'json', + '-show_format', + '-show_streams', + path, + ], + stdout=subprocess.PIPE, + ) as ffprobe: + ret = json.loads(ffprobe.communicate()[0]) + + video_stream = next( + iter( + [ + stream + for stream in ret.get('streams', []) + if stream.get('codec_type') == 'video' + ] + ), + {}, + ) + + creation_time = ret.get('format', {}).get('tags', {}).get('creation_time') + if creation_time: + try: + creation_time = isoparse(creation_time) + except ValueError: + creation_time = None + + if not creation_time: + creation_time = datetime.datetime.fromtimestamp(os.path.getctime(path)) + + return { + 'duration': ret.get('format', {}).get('duration'), + 'width': video_stream.get('width'), + 'height': video_stream.get('height'), + 'created_at': creation_time, + } + + +def get_metadata(*paths: str): + """ + Retrieves the metadata of media files using ffprobe. + """ + logger.info('Retrieving metadata for %d media files', len(paths)) + try: + assert shutil.which( + 'ffprobe' + ), 'ffprobe not found in PATH. Install ffmpeg to retrieve local media metadata.' + + # Run ffprobe in parallel + with ProcessPoolExecutor( + max_workers=multiprocessing.cpu_count() * 2 + ) as executor: + futures = [executor.submit(get_file_metadata, path) for path in paths] + results = [future.result() for future in futures] + + logger.info('Retrieved metadata for %d media files', len(results)) + return results + except Exception as e: + logger.error('Failed to retrieve media metadata: %s: %s', type(e).__name__, e) + return [{} for _ in paths]