Compare commits

...

5 commits

Author SHA1 Message Date
ef4d0bd38c
[media] Support for generic media downloads.
All checks were successful
continuous-integration/drone/push Build is passing
2024-07-15 04:09:54 +02:00
bd01827b52
[Automatic] Updated components cache 2024-07-15 04:09:54 +02:00
f64d47565d
[Media UI] Support for generic media download. 2024-07-15 04:09:54 +02:00
79ba8deb71
[media] Added support for yt-dlp-compatible URLs to media.download.
Also, added `MediaDownloadEvent`s to keep track of the state of the
download.
2024-07-15 04:09:53 +02:00
84e06e30fe
[core] New architecture for the Redis bus.
- Use pubsub pattern rather than `rpush`/`blpop` - it saves memory, it's
  faster, and it decreases the risk of deadlocks.

- Use a connection pool.

- Propagate `PLATYPUSH_REDIS_QUEUE` environment variable so any
  subprocesses can access it.
2024-07-15 04:09:53 +02:00
32 changed files with 1587 additions and 261 deletions

View file

@ -181,6 +181,8 @@ class Application:
or os.environ.get('PLATYPUSH_REDIS_QUEUE') or os.environ.get('PLATYPUSH_REDIS_QUEUE')
or RedisBus.DEFAULT_REDIS_QUEUE or RedisBus.DEFAULT_REDIS_QUEUE
) )
os.environ['PLATYPUSH_REDIS_QUEUE'] = self.redis_queue
self.config_file = config_file or os.environ.get('PLATYPUSH_CONFIG') self.config_file = config_file or os.environ.get('PLATYPUSH_CONFIG')
self.verbose = verbose self.verbose = verbose
self.db_engine = db or os.environ.get('PLATYPUSH_DB') self.db_engine = db or os.environ.get('PLATYPUSH_DB')

View file

@ -1,24 +1,57 @@
from multiprocessing import Lock
from platypush.bus.redis import RedisBus from platypush.bus.redis import RedisBus
from platypush.context import get_bus
from platypush.config import Config from platypush.config import Config
from platypush.context import get_backend
from platypush.message import Message from platypush.message import Message
from platypush.message.request import Request from platypush.message.request import Request
from platypush.utils import get_redis_conf, get_message_response from platypush.utils import get_message_response
from .logger import logger from .logger import logger
_bus = None
class BusWrapper: # pylint: disable=too-few-public-methods
"""
Lazy singleton wrapper for the bus object.
"""
def __init__(self):
self._redis_queue = None
self._bus = None
self._bus_lock = Lock()
@property
def bus(self) -> RedisBus:
"""
Lazy getter/initializer for the bus object.
"""
with self._bus_lock:
if not self._bus:
self._bus = get_bus()
bus_: RedisBus = self._bus # type: ignore
return bus_
def post(self, msg):
"""
Send a message to the bus.
:param msg: The message to send.
"""
try:
self.bus.post(msg)
except Exception as e:
logger().exception(e)
_bus = BusWrapper()
def bus(): def bus():
""" """
Lazy getter/initializer for the bus object. Lazy getter/initializer for the bus object.
""" """
global _bus # pylint: disable=global-statement return _bus.bus
if _bus is None:
redis_queue = get_backend('http').bus.redis_queue # type: ignore
_bus = RedisBus(**get_redis_conf(), redis_queue=redis_queue)
return _bus
def send_message(msg, wait_for_response=True): def send_message(msg, wait_for_response=True):

View file

@ -27,6 +27,7 @@
:selected-channel="selectedChannel" :selected-channel="selectedChannel"
@add-to-playlist="$emit('add-to-playlist', $event)" @add-to-playlist="$emit('add-to-playlist', $event)"
@back="back" @back="back"
@download="$emit('download', $event)"
@path-change="$emit('path-change', $event)" @path-change="$emit('path-change', $event)"
@play="$emit('play', $event)" @play="$emit('play', $event)"
/> />
@ -48,6 +49,7 @@ export default {
'add-to-playlist', 'add-to-playlist',
'back', 'back',
'create-playlist', 'create-playlist',
'download',
'path-change', 'path-change',
'play', 'play',
'remove-from-playlist', 'remove-from-playlist',

View file

@ -0,0 +1,321 @@
<template>
<Loading v-if="loading" />
<div class="media-downloads fade-in" v-else>
<div class="no-content" v-if="!Object.keys(downloads).length">No media downloads in progress</div>
<div class="no-content" v-else-if="!Object.keys(filteredDownloads).length">No media downloads match the filter</div>
<div class="items" v-else>
<div class="row item"
:class="{selected: selectedItem === i}"
:key="i"
v-for="(media, i) in filteredDownloads"
@click="selectedItem = i"
>
<div class="col-8 left side">
<i class="icon fa" :class="{
'fa-check': media.state.toLowerCase() === 'completed',
'fa-play': media.state.toLowerCase() === 'downloading',
'fa-pause': media.state.toLowerCase() === 'paused',
'fa-times': media.state.toLowerCase() === 'cancelled',
'fa-stop': media.state.toLowerCase() === 'idle',
'fa-hourglass-half': media.state.toLowerCase() === 'started',
}" />
<div class="title" v-text="media.path || media.url" />
</div>
<div class="col-2 right side">
<span v-text="displayProgress[i]" />
</div>
<div class="col-2 right side">
<Dropdown title="Actions" icon-class="fa fa-ellipsis-h" @click="selectedItem = i">
<DropdownItem icon-class="fa fa-play" text="Play"
@click="$emit('play', {url: `file:///${media.path}`})"
v-if="media.state.toLowerCase() === 'completed'" />
<DropdownItem icon-class="fa fa-pause" text="Pause download" @click="pause(media)"
v-if="media.state.toLowerCase() === 'downloading' || media.state.toLowerCase() === 'started'" />
<DropdownItem icon-class="fa fa-rotate-left" text="Resume download" @click="resume(media)"
v-if="media.state.toLowerCase() === 'paused'" />
<DropdownItem icon-class="fa fa-eraser" text="Clear from queue" @click="clear(media)"
v-if="media.state.toLowerCase() === 'completed'" />
<DropdownItem icon-class="fa fa-stop" text="Cancel" @click="cancel(media)"
v-if="media.state.toLowerCase() !== 'completed' && media.state.toLowerCase() !== 'cancelled'" />
<DropdownItem icon-class="fa fa-trash" text="Remove file" @click="onDeleteSelected(media)"
v-if="media.state.toLowerCase() === 'completed' || media.state.toLowerCase() === 'cancelled'" />
<DropdownItem icon-class="fa fa-info" text="Media info" @click="$refs.mediaInfo.isVisible = true" />
</Dropdown>
</div>
</div>
</div>
<Modal ref="mediaInfo" title="Media info" width="80%">
<div class="modal-body media-info" v-if="selectedItem != null && downloads[selectedItem]">
<div class="row" v-if="downloads[selectedItem].name">
<div class="attr">Path</div>
<div class="value" v-text="downloads[selectedItem].path" />
</div>
<div class="row" v-if="downloads[selectedItem].url">
<div class="attr">Remote URL</div>
<div class="value">
<a :href="downloads[selectedItem].url" target="_blank" v-text="downloads[selectedItem].url" />
</div>
</div>
<div class="row" v-if="downloads[selectedItem].path">
<div class="attr">Local URL</div>
<div class="value">
<a :href="localURL(downloads[selectedItem])"
target="_blank" v-text="downloads[selectedItem].path" />
</div>
</div>
<div class="row" v-if="downloads[selectedItem].state">
<div class="attr">State</div>
<div class="value" v-text="downloads[selectedItem].state" />
</div>
<div class="row" v-if="downloads[selectedItem].progress != null">
<div class="attr">Progress</div>
<div class="value" v-text="displayProgress[selectedItem]" />
</div>
<div class="row" v-if="downloads[selectedItem].size != null">
<div class="attr">Size</div>
<div class="value" v-text="convertSize(downloads[selectedItem].size)" />
</div>
<div class="row" v-if="downloads[selectedItem].started_at">
<div class="attr">Started</div>
<div class="value" v-text="formatDateTime(downloads[selectedItem].started_at)" />
</div>
<div class="row" v-if="downloads[selectedItem].ended_at">
<div class="attr">Ended</div>
<div class="value" v-text="formatDateTime(downloads[selectedItem].ended_at)" />
</div>
</div>
</Modal>
<ConfirmDialog
ref="deleteConfirmDialog"
title="Delete file"
@input="rm"
@close="mediaToDelete = null"
>
Are you sure you want to delete the downloaded file?
</ConfirmDialog>
</div>
</template>
<script>
import ConfirmDialog from "@/components/elements/ConfirmDialog";
import Loading from "@/components/Loading";
import Utils from "@/Utils";
import MediaUtils from "@/components/Media/Utils.vue"
import Modal from "@/components/Modal";
import Dropdown from "@/components/elements/Dropdown";
import DropdownItem from "@/components/elements/DropdownItem";
export default {
mixins: [Utils, MediaUtils],
emits: [
'play',
'refresh',
],
components: {
ConfirmDialog,
Dropdown,
DropdownItem,
Loading,
Modal,
},
props: {
downloads: {
type: Object,
default: () => ({}),
},
pluginName: {
type: String,
required: true,
},
filter: {
type: String,
default: '',
},
},
data() {
return {
loading: false,
selectedItem: null,
mediaToDelete: null,
}
},
computed: {
relativeFiles() {
if (this.selectedItem == null || !this.downloads[this.selectedItem]?.files?.length)
return []
return this.downloads[this.selectedItem].files.map((file) => file.split('/').pop())
},
displayProgress() {
return Object.values(this.downloads).reduce((acc, value) => {
let progress = this.round(value.progress, 2)
let percent = progress != null ? `${progress}%` : 'N/A'
if (value.state.toLowerCase() === 'completed')
percent = '100%'
acc[value.path] = percent
return acc
}, {})
},
filteredDownloads() {
const filter = (this.filter || '').trim().toLowerCase()
let downloads = Object.values(this.downloads)
if (filter?.length) {
downloads = downloads.filter((download) => {
return download.path.toLowerCase().includes(filter) ||
download.url.toLowerCase().includes(filter)
})
}
return downloads.reduce((acc, download) => {
acc[download.path] = download
return acc
}, {})
},
},
methods: {
async run(action, media) {
this.loading = true
try {
await this.request(
`${this.pluginName}.${action}`,
{path: media.path}
)
} finally {
this.loading = false
}
},
async pause(media) {
await this.run('pause_download', media)
},
async resume(media) {
await this.run('resume_download', media)
},
async clear(media) {
await this.run('clear_downloads', media)
if (this.downloads[media.path])
delete this.downloads[media.path]
},
async cancel(media) {
await this.run('cancel_download', media)
},
async rm() {
const media = this.mediaToDelete
if (!media)
return
try {
await this.request('file.unlink', {file: media.path})
} finally {
await this.clear(media)
}
},
localURL(media) {
return `${window.location.origin}/file?path=${encodeURIComponent(media.path)}`
},
onDeleteSelected(media) {
this.mediaToDelete = media
this.$refs.deleteConfirmDialog.show()
},
}
}
</script>
<style lang="scss" scoped>
@import "src/style/items";
.media-downloads {
height: 100%;
background: $background-color;
.no-content {
height: 100%;
}
.items {
display: flex;
flex-direction: column;
height: 100%;
flex: 1;
overflow-y: auto;
}
}
:deep(.modal-body) {
.row {
display: flex;
border-bottom: $default-border;
padding: .5em .25em;
border-radius: .5em;
&:hover {
background-color: $hover-bg;
}
.attr {
@extend .col-3;
display: inline-flex;
}
.value {
@extend .col-9;
display: inline-flex;
justify-content: right;
&.nowrap {
overflow: hidden;
white-space: nowrap;
text-overflow: clip;
}
}
}
}
:deep(.modal-body) {
.dropdown-container {
.row {
box-shadow: none;
border: none;
}
button {
border: none;
background: none;
&:hover {
color: $default-hover-fg;
}
}
}
}
</style>

View file

@ -22,6 +22,14 @@
</form> </form>
</div> </div>
<div class="col-s-8 col-m-7 left side" v-else-if="selectedView === 'downloads'">
<form @submit.prevent="$emit('filter-downloads', downloadFilter)">
<label class="search-box">
<input type="search" placeholder="Filter" v-model="downloadFilter">
</label>
</form>
</div>
<div class="col-s-8 col-m-7 left side" v-else-if="selectedView === 'browser'"> <div class="col-s-8 col-m-7 left side" v-else-if="selectedView === 'browser'">
<label class="search-box"> <label class="search-box">
<input type="search" placeholder="Filter" :value="browserFilter" @change="$emit('filter', $event.target.value)" <input type="search" placeholder="Filter" :value="browserFilter" @change="$emit('filter', $event.target.value)"
@ -65,6 +73,7 @@ export default {
components: {Players}, components: {Players},
emits: [ emits: [
'filter', 'filter',
'filter-downloads',
'play-url', 'play-url',
'player-status', 'player-status',
'search', 'search',
@ -119,6 +128,7 @@ export default {
filterVisible: false, filterVisible: false,
query: '', query: '',
torrentURL: '', torrentURL: '',
downloadFilter: '',
} }
}, },

View file

@ -1,15 +1,27 @@
<template> <template>
<keep-alive> <keep-alive>
<div class="media-plugin fade-in"> <div class="media-plugin fade-in">
<MediaView :plugin-name="pluginName" :status="selectedPlayer?.status || {}" :track="selectedPlayer?.status || {}" <MediaView :plugin-name="pluginName"
:buttons="mediaButtons" @play="pause" @pause="pause" @stop="stop" @set-volume="setVolume" :status="selectedPlayer?.status || {}"
@seek="seek" @search="search" @mute="toggleMute" @unmute="toggleMute"> :track="selectedPlayer?.status || {}"
:buttons="mediaButtons"
@play="pause"
@pause="pause"
@stop="stop"
@set-volume="setVolume"
@seek="seek"
@search="search"
@mute="toggleMute"
@unmute="toggleMute"
>
<main> <main>
<div class="nav-container from tablet" :style="navContainerStyle"> <div class="nav-container from tablet" :style="navContainerStyle">
<Nav :selected-view="selectedView" <Nav :selected-view="selectedView"
:torrent-plugin="torrentPlugin" :torrent-plugin="torrentPlugin"
:download-icon-class="downloadIconClass"
@input="onNavInput" @input="onNavInput"
@toggle="forceShowNav = !forceShowNav" /> @toggle="forceShowNav = !forceShowNav"
/>
</div> </div>
<div class="view-container"> <div class="view-container">
@ -20,6 +32,7 @@
:selected-item="selectedItem" :selected-item="selectedItem"
:selected-subtitles="selectedSubtitles" :selected-subtitles="selectedSubtitles"
:browser-filter="browserFilter" :browser-filter="browserFilter"
:downloads-filter="downloadsFilter"
:show-nav-button="!forceShowNav" :show-nav-button="!forceShowNav"
ref="header" ref="header"
@search="search" @search="search"
@ -29,8 +42,10 @@
@show-subtitles="showSubtitlesModal = !showSubtitlesModal" @show-subtitles="showSubtitlesModal = !showSubtitlesModal"
@play-url="showPlayUrlModal" @play-url="showPlayUrlModal"
@filter="browserFilter = $event" @filter="browserFilter = $event"
@filter-downloads="downloadsFilter = $event"
@toggle-nav="forceShowNav = !forceShowNav" @toggle-nav="forceShowNav = !forceShowNav"
@source-toggle="sources[$event] = !sources[$event]" /> @source-toggle="sources[$event] = !sources[$event]"
/>
<div class="body-container" :class="{'expanded-header': $refs.header?.filterVisible}"> <div class="body-container" :class="{'expanded-header': $refs.header?.filterVisible}">
<Results :results="results" <Results :results="results"
@ -44,21 +59,32 @@
@play="play" @play="play"
@view="view" @view="view"
@download="download" @download="download"
v-if="selectedView === 'search'" /> v-if="selectedView === 'search'"
/>
<Transfers :plugin-name="torrentPlugin" <TorrentTransfers :plugin-name="torrentPlugin"
:is-media="true" :is-media="true"
@play="play" @play="play"
v-else-if="selectedView === 'torrents'" /> v-else-if="selectedView === 'torrents'"
/>
<MediaDownloads :plugin-name="pluginName"
:downloads="downloads"
:filter="downloadsFilter"
@play="play"
v-else-if="selectedView === 'downloads'"
/>
<Browser :filter="browserFilter" <Browser :filter="browserFilter"
:selected-playlist="selectedPlaylist" :selected-playlist="selectedPlaylist"
:selected-channel="selectedChannel" :selected-channel="selectedChannel"
@add-to-playlist="addToPlaylistItem = $event" @add-to-playlist="addToPlaylistItem = $event"
@back="selectedResult = null" @back="selectedResult = null"
@download="download"
@path-change="browserFilter = ''" @path-change="browserFilter = ''"
@play="play($event)" @play="play($event)"
v-else-if="selectedView === 'browser'" /> v-else-if="selectedView === 'browser'"
/>
</div> </div>
</div> </div>
</main> </main>
@ -100,13 +126,14 @@ import Utils from "@/Utils";
import Browser from "@/components/panels/Media/Browser"; import Browser from "@/components/panels/Media/Browser";
import Header from "@/components/panels/Media/Header"; import Header from "@/components/panels/Media/Header";
import MediaDownloads from "@/components/panels/Media/Downloads";
import MediaUtils from "@/components/Media/Utils"; import MediaUtils from "@/components/Media/Utils";
import MediaView from "@/components/Media/View"; import MediaView from "@/components/Media/View";
import Nav from "@/components/panels/Media/Nav"; import Nav from "@/components/panels/Media/Nav";
import PlaylistAdder from "@/components/panels/Media/PlaylistAdder"; import PlaylistAdder from "@/components/panels/Media/PlaylistAdder";
import Results from "@/components/panels/Media/Results"; import Results from "@/components/panels/Media/Results";
import Subtitles from "@/components/panels/Media/Subtitles"; import Subtitles from "@/components/panels/Media/Subtitles";
import Transfers from "@/components/panels/Torrent/Transfers"; import TorrentTransfers from "@/components/panels/Torrent/Transfers";
import UrlPlayer from "@/components/panels/Media/UrlPlayer"; import UrlPlayer from "@/components/panels/Media/UrlPlayer";
export default { export default {
@ -115,13 +142,14 @@ export default {
components: { components: {
Browser, Browser,
Header, Header,
MediaDownloads,
MediaView, MediaView,
Modal, Modal,
Nav, Nav,
PlaylistAdder, PlaylistAdder,
Results, Results,
Subtitles, Subtitles,
Transfers, TorrentTransfers,
UrlPlayer, UrlPlayer,
}, },
@ -157,6 +185,7 @@ export default {
awaitingPlayTorrent: null, awaitingPlayTorrent: null,
urlPlay: null, urlPlay: null,
browserFilter: null, browserFilter: null,
downloadsFilter: null,
addToPlaylistItem: null, addToPlaylistItem: null,
torrentPlugin: null, torrentPlugin: null,
torrentPlugins: [ torrentPlugins: [
@ -169,6 +198,8 @@ export default {
'youtube': true, 'youtube': true,
'torrent': true, 'torrent': true,
}, },
downloads: {},
} }
}, },
@ -220,6 +251,28 @@ export default {
return this.results[this.selectedResult] return this.results[this.selectedResult]
}, },
hasPendingDownloads() {
return Object.values(this.downloads).some((download) => {
return !['completed', 'cancelled'].includes(download.state.toLowerCase())
})
},
allDownloadsCompleted() {
return Object.values(this.downloads).length && Object.values(this.downloads).every((download) => {
return ['completed', 'cancelled'].includes(download.state.toLowerCase())
})
},
downloadIconClass() {
if (this.hasPendingDownloads)
return 'glow loop'
if (this.allDownloadsCompleted)
return 'completed'
return ''
},
}, },
methods: { methods: {
@ -291,8 +344,11 @@ export default {
}, },
async download(item) { async download(item) {
if (item?.type === 'torrent') { switch (item.type) {
await this.downloadTorrent(item) case 'torrent':
return await this.downloadTorrent(item)
case 'youtube':
return await this.downloadYoutube(item)
} }
}, },
@ -385,7 +441,32 @@ export default {
return return
} }
return await this.request(`${torrentPlugin}.download`, {torrent: item?.url || item}) if (!item?.url) {
this.notify({
text: 'No torrent URL available',
error: true,
})
return
}
return await this.request(`${torrentPlugin}.download`, {torrent: item.url || item})
},
async downloadYoutube(item) {
if (!item?.url) {
this.notify({
text: 'No YouTube URL available',
error: true,
})
return
}
await this.request(
`${this.pluginName}.download`,
{url: item.url},
)
}, },
async selectSubtitles(item) { async selectSubtitles(item) {
@ -456,15 +537,100 @@ export default {
} }
}, },
async refreshDownloads() {
this.downloads = (await this.request(`${this.pluginName}.get_downloads`)).reduce((acc, download) => {
acc[download.path] = download
return acc
}, {})
},
onNavInput(event) { onNavInput(event) {
this.selectedView = event this.selectedView = event
if (event === 'search') { if (event === 'search') {
this.selectedResult = null this.selectedResult = null
} }
}, },
onDownloadStarted(event) {
this.downloads[event.path] = event
this.notify({
title: 'Media download started',
html: `Saving <b>${event.resource}</b> to <b>${event.path}</b>`,
image: {
iconClass: 'fa fa-download',
}
})
},
onDownloadCompleted(event) {
this.downloads[event.path] = event
this.downloads[event.path].progress = 100
this.notify({
title: 'Media download completed',
html: `Saved <b>${event.resource}</b> to <b>${event.path}</b>`,
image: {
iconClass: 'fa fa-check',
}
})
},
onDownloadError(event) {
this.downloads[event.path] = event
this.notify({
title: 'Media download error',
html: `Error downloading ${event.resource}: <b>${event.error}</b>`,
error: true,
image: {
iconClass: 'fa fa-exclamation-triangle',
}
})
},
onDownloadCancelled(event) {
this.downloads[event.path] = event
this.notify({
title: 'Media download cancelled',
html: `Cancelled download of <b>${event.resource}</b>`,
image: {
iconClass: 'fa fa-times',
}
})
},
onDownloadPaused(event) {
this.downloads[event.path] = event
this.notify({
title: 'Media download paused',
html: `Paused download of <b>${event.resource}</b>`,
image: {
iconClass: 'fa fa-pause',
}
})
},
onDownloadResumed(event) {
this.downloads[event.path] = event
this.notify({
title: 'Media download resumed',
html: `Resumed download of <b>${event.resource}</b>`,
image: {
iconClass: 'fa fa-play',
}
})
},
onDownloadProgress(event) {
this.downloads[event.path] = event
},
onDownloadClear(event) {
if (event.path in this.downloads)
delete this.downloads[event.path]
},
}, },
mounted() { async mounted() {
this.$watch(() => this.selectedPlayer, (player) => { this.$watch(() => this.selectedPlayer, (player) => {
if (player) if (player)
this.refresh() this.refresh()
@ -480,27 +646,55 @@ export default {
}) })
this.torrentPlugin = this.getTorrentPlugin() this.torrentPlugin = this.getTorrentPlugin()
this.subscribe(this.onTorrentQueued,'notify-on-torrent-queued', this.subscribe(this.onTorrentQueued,'on-torrent-queued',
'platypush.message.event.torrent.TorrentQueuedEvent') 'platypush.message.event.torrent.TorrentQueuedEvent')
this.subscribe(this.onTorrentMetadata,'on-torrent-metadata', this.subscribe(this.onTorrentMetadata,'on-torrent-metadata',
'platypush.message.event.torrent.TorrentDownloadedMetadataEvent') 'platypush.message.event.torrent.TorrentDownloadedMetadataEvent')
this.subscribe(this.onTorrentDownloadStart,'notify-on-torrent-download-start', this.subscribe(this.onTorrentDownloadStart,'on-torrent-download-start',
'platypush.message.event.torrent.TorrentDownloadStartEvent') 'platypush.message.event.torrent.TorrentDownloadStartEvent')
this.subscribe(this.onTorrentDownloadCompleted,'notify-on-torrent-download-completed', this.subscribe(this.onTorrentDownloadCompleted,'on-torrent-download-completed',
'platypush.message.event.torrent.TorrentDownloadCompletedEvent') 'platypush.message.event.torrent.TorrentDownloadCompletedEvent')
this.subscribe(this.onDownloadStarted,'on-download-started',
'platypush.message.event.media.MediaDownloadStartedEvent')
this.subscribe(this.onDownloadCompleted,'on-download-completed',
'platypush.message.event.media.MediaDownloadCompletedEvent')
this.subscribe(this.onDownloadError,'on-download-error',
'platypush.message.event.media.MediaDownloadErrorEvent')
this.subscribe(this.onDownloadCancelled,'on-download-cancelled',
'platypush.message.event.media.MediaDownloadCancelledEvent')
this.subscribe(this.onDownloadPaused,'on-download-paused',
'platypush.message.event.media.MediaDownloadPausedEvent')
this.subscribe(this.onDownloadResumed,'on-download-resumed',
'platypush.message.event.media.MediaDownloadResumedEvent')
this.subscribe(this.onDownloadProgress,'on-download-progress',
'platypush.message.event.media.MediaDownloadProgressEvent')
this.subscribe(this.onDownloadClear,'on-download-clear',
'platypush.message.event.media.MediaDownloadClearEvent')
if ('media.plex' in this.$root.config) if ('media.plex' in this.$root.config)
this.sources.plex = true this.sources.plex = true
if ('media.jellyfin' in this.$root.config) if ('media.jellyfin' in this.$root.config)
this.sources.jellyfin = true this.sources.jellyfin = true
await this.refreshDownloads()
}, },
destroy() { destroy() {
this.unsubscribe('notify-on-torrent-queued') this.unsubscribe('on-torrent-queued')
this.unsubscribe('on-torrent-metadata') this.unsubscribe('on-torrent-metadata')
this.unsubscribe('notify-on-torrent-download-start') this.unsubscribe('on-torrent-download-start')
this.unsubscribe('notify-on-torrent-download-completed') this.unsubscribe('on-torrent-download-completed')
this.unsubscribe('on-download-started')
this.unsubscribe('on-download-completed')
this.unsubscribe('on-download-error')
this.unsubscribe('on-download-cancelled')
this.unsubscribe('on-download-paused')
this.unsubscribe('on-download-resumed')
this.unsubscribe('on-download-progress')
this.unsubscribe('on-download-clear')
}, },
} }
</script> </script>

View file

@ -28,23 +28,6 @@
</div> </div>
</div> </div>
<div class="row direct-url" v-else-if="item?.type === 'youtube' && item?.url">
<div class="left side">Direct URL</div>
<div class="right side">
<a :href="youtubeUrl" title="Direct URL" target="_blank" v-if="youtubeUrl">
<i class="fas fa-external-link-alt" />
</a>
<button @click="copyToClipboard(youtubeUrl)" title="Copy URL to clipboard" v-if="youtubeUrl">
<i class="fas fa-clipboard" />
</button>
<Loading v-if="loadingUrl" />
<button @click="getYoutubeUrl" title="Refresh direct URL" v-else>
<i class="fas fa-sync-alt" />
</button>
</div>
</div>
<div class="row" v-if="item?.series"> <div class="row" v-if="item?.series">
<div class="left side">TV Series</div> <div class="left side">TV Series</div>
<div class="right side" v-text="item.series" /> <div class="right side" v-text="item.series" />
@ -185,14 +168,13 @@
<script> <script>
import Utils from "@/Utils"; import Utils from "@/Utils";
import Loading from "@/components/Loading";
import MediaUtils from "@/components/Media/Utils"; import MediaUtils from "@/components/Media/Utils";
import MediaImage from "./MediaImage"; import MediaImage from "./MediaImage";
import Icons from "./icons.json"; import Icons from "./icons.json";
export default { export default {
name: "Info", name: "Info",
components: {Loading, MediaImage}, components: {MediaImage},
mixins: [Utils, MediaUtils], mixins: [Utils, MediaUtils],
emits: ['play'], emits: ['play'],
props: { props: {
@ -255,26 +237,6 @@ export default {
return null return null
}, },
}, },
methods: {
async getYoutubeUrl() {
if (!(this.item?.type === 'youtube' && this.item?.url)) {
return null
}
try {
this.loadingUrl = true
this.youtubeUrl = await this.request(
`${this.pluginName}.get_youtube_url`,
{url: this.item.url},
)
return this.youtubeUrl
} finally {
this.loadingUrl = false
}
},
},
} }
</script> </script>

View file

@ -3,56 +3,42 @@
class="item media-item" class="item media-item"
:class="{selected: selected}" :class="{selected: selected}"
@click.right.prevent="$refs.dropdown.toggle()" @click.right.prevent="$refs.dropdown.toggle()"
v-if="!hidden" v-if="!hidden">
>
<div class="thumbnail"> <div class="thumbnail">
<MediaImage :item="item" @play="play" /> <MediaImage :item="item" @play="$emit('play')" />
</div> </div>
<div class="body"> <div class="body">
<div class="row title"> <div class="row title">
<div class="col-11 left side" v-text="item.title || item.name" @click="select" /> <div class="col-11 left side" v-text="item.title || item.name" @click="$emit('select')" />
<div class="col-1 right side"> <div class="col-1 right side">
<Dropdown title="Actions" icon-class="fa fa-ellipsis-h" ref="dropdown"> <Dropdown title="Actions" icon-class="fa fa-ellipsis-h" ref="dropdown">
<DropdownItem icon-class="fa fa-play" text="Play" @click="play" <DropdownItem icon-class="fa fa-play" text="Play" @click="$emit('play')"
v-if="item.type !== 'torrent' && item.item_type !== 'channel' && item.item_type !== 'playlist'" /> v-if="item.type !== 'torrent'" />
<DropdownItem icon-class="fa fa-download" text="Download" @click="$emit('download')" <DropdownItem icon-class="fa fa-download" text="Download" @click="$emit('download')"
v-if="item.type === 'torrent' && item.item_type !== 'channel' && item.item_type !== 'playlist'" /> v-if="(item.type === 'torrent' || item.type === 'youtube') && item.item_type !== 'channel' && item.item_type !== 'playlist'" />
<DropdownItem icon-class="fa fa-window-maximize" text="View in browser" @click="$emit('view')" <DropdownItem icon-class="fa fa-window-maximize" text="View in browser" @click="$emit('view')"
v-if="item.type === 'file' && item.item_type !== 'channel' && item.item_type !== 'playlist'" /> v-if="item.type === 'file'" />
<DropdownItem icon-class="fa fa-list" text="Add to playlist" @click="$emit('add-to-playlist')" <DropdownItem icon-class="fa fa-info-circle" text="Info" @click="$emit('select')" />
v-if="item.type === 'youtube' && item.item_type !== 'channel' && item.item_type !== 'playlist'" />
<DropdownItem icon-class="fa fa-trash" text="Remove from playlist" @click="$refs.confirmPlaylistRemove.open()"
v-if="playlist && item.item_type !== 'channel' && item.item_type !== 'playlist'" />
<DropdownItem icon-class="fa fa-info-circle" text="Info" @click="select" />
</Dropdown> </Dropdown>
</div> </div>
</div> </div>
<div class="row subtitle" v-if="item.channel_url"> <div class="row subtitle" v-if="item.channel">
<a class="channel" :href="item.channel_url" target="_blank"> <a class="channel" :href="item.channel_url" target="_blank">
<img :src="item.channel_image" class="channel-image" v-if="item.channel_image" /> <img :src="item.channel_image" class="channel-image" v-if="item.channel_image" />
<span class="channel-name" v-text="item.channel" /> <span class="channel-name" v-text="item.channel" />
</a> </a>
</div> </div>
<div class="row subtitle" v-if="item.item_type && item.item_type !== 'video'">
<span class="type" v-text="item.item_type[0].toUpperCase() + item.item_type.slice(1)" />
</div>
<div class="row creation-date" v-if="item.created_at"> <div class="row creation-date" v-if="item.created_at">
{{ formatDateTime(item.created_at, true) }} {{ formatDateTime(item.created_at, true) }}
</div> </div>
</div> </div>
<ConfirmDialog ref="confirmPlaylistRemove" @input="$emit('remove-from-playlist')">
Are you sure you want to remove this item from the playlist?
</ConfirmDialog>
</div> </div>
</template> </template>
<script> <script>
import ConfirmDialog from "@/components/elements/ConfirmDialog";
import Dropdown from "@/components/elements/Dropdown"; import Dropdown from "@/components/elements/Dropdown";
import DropdownItem from "@/components/elements/DropdownItem"; import DropdownItem from "@/components/elements/DropdownItem";
import Icons from "./icons.json"; import Icons from "./icons.json";
@ -60,23 +46,9 @@ import MediaImage from "./MediaImage";
import Utils from "@/Utils"; import Utils from "@/Utils";
export default { export default {
components: {Dropdown, DropdownItem, MediaImage},
mixins: [Utils], mixins: [Utils],
components: { emits: ['play', 'select', 'view', 'download'],
ConfirmDialog,
Dropdown,
DropdownItem,
MediaImage,
},
emits: [
'add-to-playlist',
'download',
'play',
'remove-from-playlist',
'select',
'view',
],
props: { props: {
item: { item: {
type: Object, type: Object,
@ -92,10 +64,6 @@ export default {
type: Boolean, type: Boolean,
default: false, default: false,
}, },
playlist: {
default: null,
},
}, },
data() { data() {
@ -103,21 +71,6 @@ export default {
typeIcons: Icons, typeIcons: Icons,
} }
}, },
methods: {
play() {
if (this.item.item_type === 'playlist' || this.item.item_type === 'channel') {
this.select()
return
}
this.$emit('play');
},
select() {
this.$emit('select');
},
},
} }
</script> </script>
@ -128,16 +81,11 @@ export default {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
align-items: center; align-items: center;
justify-content: space-between;
cursor: initial !important; cursor: initial !important;
margin-bottom: 5px; margin-bottom: 5px;
border: 1px solid transparent; border: 1px solid transparent;
border-bottom: 1px solid transparent !important; border-bottom: 1px solid transparent !important;
@include from($tablet) {
max-height: max(25em, 25%);
}
&.selected { &.selected {
box-shadow: $border-shadow-bottom; box-shadow: $border-shadow-bottom;
background: $selected-bg; background: $selected-bg;
@ -158,7 +106,6 @@ export default {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
align-items: center; align-items: center;
flex: 1;
.row { .row {
width: 100%; width: 100%;

View file

@ -5,7 +5,7 @@
</button> </button>
<li v-for="(view, name) in displayedViews" :key="name" :title="view.displayName" <li v-for="(view, name) in displayedViews" :key="name" :title="view.displayName"
:class="{selected: name === selectedView}" @click="$emit('input', name)"> :class="{selected: name === selectedView, ...customClasses[name]}" @click="$emit('input', name)">
<i :class="view.iconClass" /> <i :class="view.iconClass" />
</li> </li>
</nav> </nav>
@ -28,6 +28,10 @@ export default {
type: String, type: String,
}, },
downloadIconClass: {
type: String,
},
views: { views: {
type: Object, type: Object,
default: () => { default: () => {
@ -42,6 +46,11 @@ export default {
displayName: 'Browser', displayName: 'Browser',
}, },
downloads: {
iconClass: 'fa fa-download',
displayName: 'Downloads',
},
torrents: { torrents: {
iconClass: 'fa fa-magnet', iconClass: 'fa fa-magnet',
displayName: 'Torrents', displayName: 'Torrents',
@ -59,6 +68,15 @@ export default {
return views return views
}, },
customClasses() {
return {
downloads: this.downloadIconClass.split(' ').reduce((acc, cls) => {
acc[cls] = true
return acc
}, {}),
}
},
}, },
} }
</script> </script>
@ -107,12 +125,8 @@ nav {
list-style: none; list-style: none;
padding: .6em; padding: .6em;
opacity: 0.7; opacity: 0.7;
border-radius: 1.2em;
&.selected, margin: 0 0.2em;
&:hover {
border-radius: 1.2em;
margin: 0 0.2em;
}
&:hover { &:hover {
background: $nav-entry-collapsed-hover-bg; background: $nav-entry-collapsed-hover-bg;
@ -122,6 +136,9 @@ nav {
background: $nav-entry-collapsed-selected-bg; background: $nav-entry-collapsed-selected-bg;
} }
&.completed {
color: $ok-fg;
}
} }
} }
</style> </style>

View file

@ -7,6 +7,7 @@ export default {
'add-to-playlist', 'add-to-playlist',
'back', 'back',
'create-playlist', 'create-playlist',
'download',
'path-change', 'path-change',
'play', 'play',
'remove-from-playlist', 'remove-from-playlist',

View file

@ -9,6 +9,7 @@
<div class="body" v-else> <div class="body" v-else>
<Feed :filter="filter" <Feed :filter="filter"
@add-to-playlist="$emit('add-to-playlist', $event)" @add-to-playlist="$emit('add-to-playlist', $event)"
@download="$emit('download', $event)"
@play="$emit('play', $event)" @play="$emit('play', $event)"
v-if="selectedView === 'feed'" v-if="selectedView === 'feed'"
/> />
@ -16,6 +17,7 @@
<Playlists :filter="filter" <Playlists :filter="filter"
:selected-playlist="selectedPlaylist_" :selected-playlist="selectedPlaylist_"
@add-to-playlist="$emit('add-to-playlist', $event)" @add-to-playlist="$emit('add-to-playlist', $event)"
@download="$emit('download', $event)"
@play="$emit('play', $event)" @play="$emit('play', $event)"
@remove-from-playlist="removeFromPlaylist" @remove-from-playlist="removeFromPlaylist"
@select="onPlaylistSelected" @select="onPlaylistSelected"

View file

@ -45,6 +45,7 @@
:filter="filter" :filter="filter"
:selected-result="selectedResult" :selected-result="selectedResult"
ref="results" ref="results"
@download="$emit('download', $event)"
@play="$emit('play', $event)" @play="$emit('play', $event)"
@scroll-end="loadNextPage" @scroll-end="loadNextPage"
@select="selectedResult = $event" @select="selectedResult = $event"
@ -59,7 +60,7 @@ import Results from "@/components/panels/Media/Results";
import Utils from "@/Utils"; import Utils from "@/Utils";
export default { export default {
emits: ['play'], emits: ['download', 'play'],
mixins: [Utils], mixins: [Utils],
components: { components: {
Loading, Loading,

View file

@ -10,6 +10,7 @@
:sources="{'youtube': true}" :sources="{'youtube': true}"
:selected-result="selectedResult" :selected-result="selectedResult"
@add-to-playlist="$emit('add-to-playlist', $event)" @add-to-playlist="$emit('add-to-playlist', $event)"
@download="$emit('download', $event)"
@select="selectedResult = $event" @select="selectedResult = $event"
@play="$emit('play', $event)" @play="$emit('play', $event)"
v-else /> v-else />
@ -26,6 +27,7 @@ export default {
mixins: [Utils], mixins: [Utils],
emits: [ emits: [
'add-to-playlist', 'add-to-playlist',
'download',
'play', 'play',
], ],

View file

@ -50,6 +50,7 @@
:playlist="id" :playlist="id"
:selected-result="selectedResult" :selected-result="selectedResult"
@add-to-playlist="$emit('add-to-playlist', $event)" @add-to-playlist="$emit('add-to-playlist', $event)"
@download="$emit('download', $event)"
@play="$emit('play', $event)" @play="$emit('play', $event)"
@remove-from-playlist="$emit('remove-from-playlist', $event)" @remove-from-playlist="$emit('remove-from-playlist', $event)"
@select="selectedResult = $event" @select="selectedResult = $event"
@ -68,6 +69,7 @@ export default {
mixins: [Utils], mixins: [Utils],
emits: [ emits: [
'add-to-playlist', 'add-to-playlist',
'download',
'play', 'play',
'remove-from-playlist', 'remove-from-playlist',
], ],

View file

@ -31,6 +31,7 @@
:filter="filter" :filter="filter"
:metadata="playlistsById[selectedPlaylist.id] || selectedPlaylist" :metadata="playlistsById[selectedPlaylist.id] || selectedPlaylist"
@add-to-playlist="$emit('add-to-playlist', $event)" @add-to-playlist="$emit('add-to-playlist', $event)"
@download="$emit('download', $event)"
@remove-from-playlist="$emit('remove-from-playlist', {item: $event, playlist_id: selectedPlaylist.id})" @remove-from-playlist="$emit('remove-from-playlist', {item: $event, playlist_id: selectedPlaylist.id})"
@play="$emit('play', $event)" @play="$emit('play', $event)"
/> />
@ -110,6 +111,7 @@ export default {
emits: [ emits: [
'add-to-playlist', 'add-to-playlist',
'create-playlist', 'create-playlist',
'download',
'play', 'play',
'remove-from-playlist', 'remove-from-playlist',
'remove-playlist', 'remove-playlist',

View file

@ -16,6 +16,7 @@
@add-to-queue="$emit('load-tracks', {tracks: $event, play: false})" @add-to-queue="$emit('load-tracks', {tracks: $event, play: false})"
@add-to-queue-and-play="$emit('load-tracks', {tracks: $event, play: true})" @add-to-queue-and-play="$emit('load-tracks', {tracks: $event, play: true})"
@back="$emit('playlist-edit', null)" @back="$emit('playlist-edit', null)"
@download="$emit('download', $event)"
@info="$emit('info', $event)" @info="$emit('info', $event)"
@move="$emit('track-move', {...$event, playlist: editedPlaylist})" @move="$emit('track-move', {...$event, playlist: editedPlaylist})"
@play="$emit('load-tracks', {tracks: [$event], play: true})" @play="$emit('load-tracks', {tracks: [$event], play: true})"
@ -104,6 +105,7 @@ export default {
emits: [ emits: [
'add-to-playlist', 'add-to-playlist',
'download',
'info', 'info',
'load', 'load',
'load-tracks', 'load-tracks',

View file

@ -26,3 +26,30 @@
display: none; display: none;
} }
} }
.glow {
animation-duration: 2s;
-webkit-animation-duration: 2s;
animation-fill-mode: both;
animation-name: glow;
-webkit-animation-name: glow;
}
.loop {
animation-iteration-count: infinite;
-webkit-animation-iteration-count: infinite;
}
@keyframes glow {
0% {opacity: 1; box-shadow: 0 0 5px #fff;}
10% {opacity: 0.9; box-shadow: 0 0 10px $active-glow-fg-1;}
20% {opacity: 0.8; box-shadow: 0 0 20px $active-glow-fg-1;}
30% {opacity: 0.7; box-shadow: 0 0 30px $active-glow-fg-1;}
40% {opacity: 0.6; box-shadow: 0 0 40px $active-glow-fg-1;}
50% {opacity: 0.5; box-shadow: 0 0 50px $active-glow-fg-1;}
60% {opacity: 0.6; box-shadow: 0 0 40px $active-glow-fg-1;}
70% {opacity: 0.7; box-shadow: 0 0 30px $active-glow-fg-1;}
80% {opacity: 0.8; box-shadow: 0 0 20px $active-glow-fg-1;}
90% {opacity: 0.9; box-shadow: 0 0 10px $active-glow-fg-1;}
100% {opacity: 1; box-shadow: 0 0 5px #fff;}
}

View file

@ -87,6 +87,8 @@ $default-link-fg: #5f7869 !default;
/// Active /// Active
$active-glow-bg-1: #d4ffe3 !default; $active-glow-bg-1: #d4ffe3 !default;
$active-glow-bg-2: #9cdfb0 !default; $active-glow-bg-2: #9cdfb0 !default;
$active-glow-fg-1: #32b646 !default;
$active-glow-fg-2: #5f7869 !default;
/// Hover /// Hover
$default-hover-fg: #35b870 !default; $default-hover-fg: #35b870 !default;

View file

@ -3,13 +3,17 @@ export default {
name: "DateTime", name: "DateTime",
methods: { methods: {
formatDate(date, year=false) { formatDate(date, year=false) {
if (typeof date === 'string') if (typeof date === 'number')
date = new Date(date * 1000)
else if (typeof date === 'string')
date = new Date(Date.parse(date)) date = new Date(Date.parse(date))
return date.toDateString().substring(0, year ? 15 : 10) return date.toDateString().substring(0, year ? 15 : 10)
}, },
formatTime(date, seconds=true) { formatTime(date, seconds=true) {
if (typeof date === 'number')
date = new Date(date * 1000)
if (typeof date === 'string') if (typeof date === 'string')
date = new Date(Date.parse(date)) date = new Date(Date.parse(date))
@ -17,6 +21,8 @@ export default {
}, },
formatDateTime(date, year=false, seconds=true, skipTimeIfMidnight=false) { formatDateTime(date, year=false, seconds=true, skipTimeIfMidnight=false) {
if (typeof date === 'number')
date = new Date(date * 1000)
if (typeof date === 'string') if (typeof date === 'string')
date = new Date(Date.parse(date)) date = new Date(Date.parse(date))

View file

@ -114,6 +114,10 @@ export default {
return true return true
}, },
round(value, decimals) {
return Number(Math.round(value+'e'+decimals)+'e-'+decimals);
},
}, },
} }
</script> </script>

View file

@ -1,6 +1,5 @@
import logging import logging
import threading import threading
from typing import Optional
from platypush.bus import Bus from platypush.bus import Bus
from platypush.message import Message from platypush.message import Message
@ -24,25 +23,39 @@ class RedisBus(Bus):
self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE self.redis_queue = redis_queue or self.DEFAULT_REDIS_QUEUE
self.on_message = on_message self.on_message = on_message
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()
self._pubsub = None
self._pubsub_lock = threading.RLock()
def get(self) -> Optional[Message]: @property
def pubsub(self):
with self._pubsub_lock:
if not self._pubsub:
self._pubsub = self.redis.pubsub()
return self._pubsub
def poll(self):
""" """
Reads one message from the Redis queue Polls the Redis queue for new messages
""" """
try: with self.pubsub as pubsub:
if self.should_stop(): pubsub.subscribe(self.redis_queue)
return None try:
for msg in pubsub.listen():
if msg.get('type') != 'message':
continue
msg = self.redis.blpop(self.redis_queue, timeout=1) if self.should_stop():
if not msg or msg[1] is None: break
return None
msg = msg[1].decode('utf-8') try:
return Message.build(msg) data = msg.get('data', b'').decode('utf-8')
except Exception as e: parsed_msg = Message.build(data)
logger.exception(e) if parsed_msg and self.on_message:
self.on_message(parsed_msg)
return None except Exception as e:
logger.exception(e)
finally:
pubsub.unsubscribe(self.redis_queue)
def post(self, msg): def post(self, msg):
""" """
@ -51,15 +64,13 @@ class RedisBus(Bus):
from redis import exceptions from redis import exceptions
try: try:
return self.redis.rpush(self.redis_queue, str(msg)) self.redis.publish(self.redis_queue, str(msg))
except exceptions.ConnectionError as e: except exceptions.ConnectionError as e:
if not self.should_stop(): if not self.should_stop():
# Raise the exception only if the bus it not supposed to be # Raise the exception only if the bus it not supposed to be
# stopped # stopped
raise e raise e
return None
def stop(self): def stop(self):
super().stop() super().stop()
self.redis.close() self.redis.close()

Binary file not shown.

View file

@ -1,6 +1,7 @@
import asyncio import asyncio
import importlib import importlib
import logging import logging
import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from threading import RLock from threading import RLock
@ -194,11 +195,20 @@ def get_bus() -> Bus:
Get or register the main application bus. Get or register the main application bus.
""" """
from platypush.bus.redis import RedisBus from platypush.bus.redis import RedisBus
from platypush.utils import get_redis_conf
if _ctx.bus: if _ctx.bus:
return _ctx.bus return _ctx.bus
_ctx.bus = RedisBus() redis_queue = (
os.environ.get('PLATYPUSH_REDIS_QUEUE') or RedisBus.DEFAULT_REDIS_QUEUE
)
_ctx.bus = RedisBus(
redis_queue=redis_queue,
**get_redis_conf(),
)
return _ctx.bus return _ctx.bus

View file

@ -1,11 +1,13 @@
from abc import ABC
from typing import Optional
from platypush.message.event import Event from platypush.message.event import Event
class MediaEvent(Event): class MediaEvent(Event):
"""Base class for media events""" """Base class for media events"""
def __init__(self, player=None, plugin=None, status=None, *args, **kwargs): def __init__(self, *args, player=None, plugin=None, status=None, **kwargs):
super().__init__(player=player, plugin=plugin, status=status, *args, **kwargs) super().__init__(*args, player=player, plugin=plugin, status=status, **kwargs)
class MediaPlayRequestEvent(MediaEvent): class MediaPlayRequestEvent(MediaEvent):
@ -126,4 +128,107 @@ class NewPlayingMediaEvent(MediaEvent):
) )
class MediaDownloadEvent(MediaEvent, ABC):
"""
Base class for media download events.
"""
def __init__(
self,
*args,
plugin: str,
resource: str,
state: str,
path: str,
player: Optional[str] = None,
size: Optional[int] = None,
timeout: Optional[int] = None,
progress: Optional[float] = None,
started_at: Optional[float] = None,
ended_at: Optional[float] = None,
**kwargs
):
"""
:param resource: File name or URI of the downloaded resource
:param url: Alias for resource
:param path: Path where the resource is downloaded
:param state: Download state
:param size: Size of the downloaded resource in bytes
:param timeout: Download timeout in seconds
:param progress: Download progress in percentage, between 0 and 100
:param started_at: Download start time
:param ended_at: Download end time
"""
kwargs.update(
{
"resource": resource,
"path": path,
"url": resource,
"state": state,
"size": size,
"timeout": timeout,
"progress": progress,
"started_at": started_at,
"ended_at": ended_at,
}
)
super().__init__(*args, player=player, plugin=plugin, **kwargs)
class MediaDownloadStartedEvent(MediaDownloadEvent):
"""
Event triggered when a media download is started.
"""
class MediaDownloadProgressEvent(MediaDownloadEvent):
"""
Event triggered when a media download is in progress.
"""
class MediaDownloadCompletedEvent(MediaDownloadEvent):
"""
Event triggered when a media download is completed.
"""
class MediaDownloadErrorEvent(MediaDownloadEvent):
"""
Event triggered when a media download fails.
"""
def __init__(self, error: str, *args, **kwargs):
"""
:param error: Error message.
"""
super().__init__(*args, error=error, **kwargs)
class MediaDownloadPausedEvent(MediaDownloadEvent):
"""
Event triggered when a media download is paused.
"""
class MediaDownloadResumedEvent(MediaDownloadEvent):
"""
Event triggered when a media download is resumed.
"""
class MediaDownloadCancelledEvent(MediaDownloadEvent):
"""
Event triggered when a media download is cancelled.
"""
class MediaDownloadClearEvent(MediaDownloadEvent):
"""
Event triggered when a download is cleared from the queue.
"""
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,5 +1,3 @@
from dataclasses import dataclass
import enum
import functools import functools
import inspect import inspect
import json import json
@ -10,47 +8,35 @@ import subprocess
import tempfile import tempfile
import threading import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Iterable, Optional, List, Dict, Union from typing import (
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
import requests import requests
from platypush.config import Config from platypush.config import Config
from platypush.context import get_plugin, get_backend from platypush.context import get_plugin, get_backend
from platypush.plugins import Plugin, action from platypush.message.event.media import MediaEvent
from platypush.utils import get_default_downloads_dir from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_default_downloads_dir, get_plugin_name_by_class
from ._download import (
DownloadState,
DownloadThread,
FileDownloadThread,
YouTubeDownloadThread,
)
from ._resource import MediaResource
from ._state import PlayerState
class PlayerState(enum.Enum): class MediaPlugin(RunnablePlugin, ABC):
"""
Models the possible states of a media player
"""
STOP = 'stop'
PLAY = 'play'
PAUSE = 'pause'
IDLE = 'idle'
@dataclass
class MediaResource:
"""
Models a media resource
"""
resource: str
url: str
title: Optional[str] = None
description: Optional[str] = None
filename: Optional[str] = None
image: Optional[str] = None
duration: Optional[float] = None
channel: Optional[str] = None
channel_url: Optional[str] = None
type: Optional[str] = None
resolution: Optional[str] = None
class MediaPlugin(Plugin, ABC):
""" """
Generic plugin to interact with a media player. Generic plugin to interact with a media player.
@ -170,7 +156,7 @@ class MediaPlugin(Plugin, ABC):
env: Optional[Dict[str, str]] = None, env: Optional[Dict[str, str]] = None,
volume: Optional[Union[float, int]] = None, volume: Optional[Union[float, int]] = None,
torrent_plugin: str = 'torrent', torrent_plugin: str = 'torrent',
youtube_format: Optional[str] = 'best[height<=?1080][ext=mp4]', youtube_format: Optional[str] = 'bv[height<=?1080][ext=mp4]+ba',
youtube_dl: str = 'yt-dlp', youtube_dl: str = 'yt-dlp',
**kwargs, **kwargs,
): ):
@ -213,6 +199,7 @@ class MediaPlugin(Plugin, ABC):
media_dirs = [] media_dirs = []
player = None player = None
player_config = {} player_config = {}
self._download_threads: Dict[Tuple[str, str], DownloadThread] = {}
if self.__class__.__name__ == 'MediaPlugin': if self.__class__.__name__ == 'MediaPlugin':
# Abstract class, initialize with the default configured player # Abstract class, initialize with the default configured player
@ -336,22 +323,23 @@ class MediaPlugin(Plugin, ABC):
) )
elif self._is_youtube_resource(resource): elif self._is_youtube_resource(resource):
info = self._get_youtube_info(resource) info = self._get_youtube_info(resource)
url = info.get('url') if info:
if url: url = info.get('url')
resource = url if url:
self._latest_resource = MediaResource( resource = url
resource=resource, self._latest_resource = MediaResource(
url=resource, resource=resource,
title=info.get('title'), url=resource,
description=info.get('description'), title=info.get('title'),
filename=info.get('filename'), description=info.get('description'),
image=info.get('thumbnail'), filename=info.get('filename'),
duration=float(info.get('duration') or 0) or None, image=info.get('thumbnail'),
channel=info.get('channel'), duration=float(info.get('duration') or 0) or None,
channel_url=info.get('channel_url'), channel=info.get('channel'),
resolution=info.get('resolution'), channel_url=info.get('channel_url'),
type=info.get('extractor'), resolution=info.get('resolution'),
) type=info.get('extractor'),
)
elif resource.startswith('magnet:?'): elif resource.startswith('magnet:?'):
self.logger.info( self.logger.info(
'Downloading torrent %s to %s', resource, self.download_dir 'Downloading torrent %s to %s', resource, self.download_dir
@ -399,8 +387,8 @@ class MediaPlugin(Plugin, ABC):
@action @action
@abstractmethod @abstractmethod
def stop(self, **kwargs): def stop(self, *args, **kwargs): # type: ignore
raise self._NOT_IMPLEMENTED_ERR super().stop()
@action @action
@abstractmethod @abstractmethod
@ -683,7 +671,15 @@ class MediaPlugin(Plugin, ABC):
output = ytdl.communicate()[0].decode().strip() output = ytdl.communicate()[0].decode().strip()
ytdl.wait() ytdl.wait()
stream_url, info = output.split('\n') self.logger.debug('yt-dlp output: %s', output)
lines = output.split('\n')
if not lines:
self.logger.warning('No output from yt-dlp')
return None
stream_url = lines[1] if len(lines) > 2 else lines[0]
info = lines[-1]
return { return {
**json.loads(info), **json.loads(info),
'url': stream_url, 'url': stream_url,
@ -708,15 +704,6 @@ class MediaPlugin(Plugin, ABC):
return None return None
@action
def get_youtube_url(self, url, youtube_format: Optional[str] = None):
youtube_id = self.get_youtube_id(url)
if youtube_id:
url = f'https://www.youtube.com/watch?v={youtube_id}'
return self._get_youtube_info(url, youtube_format=youtube_format).get('url')
return None
@action @action
def get_youtube_info(self, url): def get_youtube_info(self, url):
# Legacy conversion for Mopidy YouTube URIs # Legacy conversion for Mopidy YouTube URIs
@ -772,31 +759,248 @@ class MediaPlugin(Plugin, ABC):
@action @action
def download( def download(
self, url: str, filename: Optional[str] = None, directory: Optional[str] = None self,
url: str,
filename: Optional[str] = None,
directory: Optional[str] = None,
timeout: int = 10,
sync: bool = False,
youtube_format: Optional[str] = None,
): ):
""" """
Download a media URL to a local file on the Platypush host. Download a media URL to a local file on the Platypush host (yt-dlp
required for YouTube URLs).
This action is non-blocking and returns the path to the downloaded file
once the download is initiated.
You can then subscribe to these events to monitor the download progress:
- :class:`platypush.message.event.media.MediaDownloadStartedEvent`
- :class:`platypush.message.event.media.MediaDownloadProgressEvent`
- :class:`platypush.message.event.media.MediaDownloadErrorEvent`
- :class:`platypush.message.event.media.MediaDownloadPausedEvent`
- :class:`platypush.message.event.media.MediaDownloadResumedEvent`
- :class:`platypush.message.event.media.MediaDownloadCancelledEvent`
- :class:`platypush.message.event.media.MediaDownloadCompletedEvent`
:param url: Media URL. :param url: Media URL.
:param filename: Media filename (default: inferred from the URL basename). :param filename: Media filename (default: inferred from the URL basename).
:param directory: Destination directory (default: ``download_dir``). :param directory: Destination directory (default: ``download_dir``).
:param timeout: Network timeout in seconds (default: 10).
:param sync: If set to True, the download will be synchronous and the
action will return only when the download is completed.
:param youtube_format: Override the default YouTube format selection.
:return: The absolute path to the downloaded file. :return: The absolute path to the downloaded file.
""" """
path = self._get_download_path(
url, directory=directory, filename=filename, youtube_format=youtube_format
)
if not filename: if self._is_youtube_resource(url):
filename = url.split('/')[-1] dl_thread = self._download_youtube_url(
url, path, youtube_format=youtube_format
)
else:
dl_thread = self._download_url(url, path, timeout=timeout)
if sync:
dl_thread.join()
return path
@action
def pause_download(self, url: Optional[str] = None, path: Optional[str] = None):
"""
Pause a download in progress.
Either the URL or the path must be specified.
:param url: URL of the download.
:param path: Path of the download (default: any path associated with the URL).
"""
for thread in self._get_downloads(url=url, path=path):
thread.pause()
@action
def resume_download(self, url: Optional[str] = None, path: Optional[str] = None):
"""
Resume a paused download.
Either the URL or the path must be specified.
:param url: URL of the download.
:param path: Path of the download (default: any path associated with the URL).
"""
for thread in self._get_downloads(url=url, path=path):
thread.resume()
@action
def cancel_download(self, url: Optional[str] = None, path: Optional[str] = None):
"""
Cancel a download in progress.
Either the URL or the path must be specified.
:param url: URL of the download.
:param path: Path of the download (default: any path associated with the URL).
"""
for thread in self._get_downloads(url=url, path=path):
thread.stop()
@action
def clear_downloads(self, url: Optional[str] = None, path: Optional[str] = None):
"""
Clear completed/cancelled downloads from the queue.
:param url: URL of the download (default: all downloads).
:param path: Path of the download (default: any path associated with the URL).
"""
threads = (
self._get_downloads(url=url, path=path)
if url
else list(self._download_threads.values())
)
for thread in threads:
if thread.state not in (DownloadState.COMPLETED, DownloadState.CANCELLED):
continue
dl = self._download_threads.pop((thread.url, thread.path), None)
if dl:
dl.clear()
@action
def get_downloads(self, url: Optional[str] = None, path: Optional[str] = None):
"""
Get the download threads.
:param url: URL of the download (default: all downloads).
:param path: Path of the download (default: any path associated with the URL).
:return: .. schema:: media.download.MediaDownloadSchema(many=True)
"""
from platypush.schemas.media.download import MediaDownloadSchema
return MediaDownloadSchema().dump(
(
self._get_downloads(url=url, path=path)
if url
else list(self._download_threads.values())
),
many=True,
)
def _get_downloads(self, url: Optional[str] = None, path: Optional[str] = None):
assert url or path, 'URL or path must be specified'
threads = []
if url and path:
path = os.path.expanduser(path)
thread = self._download_threads.get((url, path))
if thread:
threads = [thread]
elif url:
threads = [
thread
for (url_, _), thread in self._download_threads.items()
if url_ == url
]
elif path:
path = os.path.expanduser(path)
threads = [
thread
for (_, path_), thread in self._download_threads.items()
if path_ == path
]
assert threads, f'No matching downloads found for [url={url}, path={path}]'
return threads
def _get_download_path(
self,
url: str,
directory: Optional[str] = None,
filename: Optional[str] = None,
youtube_format: Optional[str] = None,
) -> str:
if not directory: if not directory:
directory = self.download_dir directory = self.download_dir
path = os.path.join(directory, filename) directory = os.path.expanduser(directory)
youtube_format = youtube_format or self.youtube_format
with requests.get(url, timeout=20, stream=True) as r: if self._is_youtube_resource(url):
r.raise_for_status() with subprocess.Popen(
with open(path, 'wb') as f: [
for chunk in r.iter_content(chunk_size=8192): self._ytdl,
f.write(chunk) *(
[
'-f',
youtube_format,
]
if youtube_format
else []
),
'-O',
'%(title)s.%(ext)s',
url,
],
stdout=subprocess.PIPE,
) as proc:
assert proc.stdout, 'yt-dlp stdout is None'
filename = proc.stdout.read().decode()[:-1]
return path if not filename:
filename = url.split('/')[-1]
return os.path.join(directory, filename)
def _download_url(self, url: str, path: str, timeout: int) -> FileDownloadThread:
download_thread = FileDownloadThread(
url=url,
path=path,
timeout=timeout,
on_start=self._on_download_start,
post_event=self._post_event,
stop_event=self._should_stop,
)
self._start_download(download_thread)
return download_thread
def _download_youtube_url(
self, url: str, path: str, youtube_format: Optional[str] = None
) -> YouTubeDownloadThread:
download_thread = YouTubeDownloadThread(
url=url,
path=path,
ytdl=self._ytdl,
youtube_format=youtube_format or self.youtube_format,
on_start=self._on_download_start,
post_event=self._post_event,
stop_event=self._should_stop,
)
self._start_download(download_thread)
return download_thread
def _on_download_start(self, thread: DownloadThread):
self._download_threads[thread.url, thread.path] = thread
def _start_download(self, thread: DownloadThread):
if (thread.url, thread.path) in self._download_threads:
self.logger.warning(
'A download of %s to %s is already in progress', thread.url, thread.path
)
return
thread.start()
def _post_event(self, event_type: Type[MediaEvent], **kwargs):
evt = event_type(
player=get_plugin_name_by_class(self.__class__), plugin=self, **kwargs
)
self._bus.post(evt)
def is_local(self): def is_local(self):
return self._is_local return self._is_local
@ -820,5 +1024,15 @@ class MediaPlugin(Plugin, ABC):
f.write(content) f.write(content)
return f.name return f.name
def main(self):
self.wait_stop()
__all__ = [
'DownloadState',
'MediaPlugin',
'PlayerState',
]
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,341 @@
from abc import ABC, abstractmethod
from contextlib import suppress
from enum import Enum
import json
import logging
import signal
import subprocess
import threading
import time
from typing import Any, Callable, Optional, Type
import requests
from platypush.message.event.media import (
MediaDownloadCancelledEvent,
MediaDownloadClearEvent,
MediaDownloadCompletedEvent,
MediaDownloadErrorEvent,
MediaDownloadEvent,
MediaDownloadPausedEvent,
MediaDownloadProgressEvent,
MediaDownloadResumedEvent,
MediaDownloadStartedEvent,
)
from platypush.utils import wait_for_either
class DownloadState(Enum):
"""
Enum that represents the status of a download.
"""
IDLE = 'idle'
STARTED = 'started'
DOWNLOADING = 'downloading'
PAUSED = 'paused'
COMPLETED = 'completed'
CANCELLED = 'cancelled'
ERROR = 'error'
class DownloadThread(threading.Thread, ABC):
"""
Thread that downloads a URL to a file.
"""
_progress_update_interval = 1
""" Throttle the progress updates to this interval, in seconds. """
def __init__(
self,
path: str,
url: str,
post_event: Callable,
size: Optional[int] = None,
timeout: Optional[int] = 10,
on_start: Callable[['DownloadThread'], None] = lambda _: None,
on_close: Callable[['DownloadThread'], None] = lambda _: None,
stop_event: Optional[threading.Event] = None,
):
super().__init__(name=f'DownloadThread-{path}')
self.path = path
self.url = url
self.size = size
self.timeout = timeout
self.state = DownloadState.IDLE
self.progress = None
self.started_at = None
self.ended_at = None
self._upstream_stop_event = stop_event or threading.Event()
self._stop_event = threading.Event()
self._post_event = post_event
self._on_start = on_start
self._on_close = on_close
self._paused = threading.Event()
self._downloading = threading.Event()
self._last_progress_update_time = 0
self.logger = logging.getLogger(__name__)
def should_stop(self) -> bool:
return self._stop_event.is_set() or self._upstream_stop_event.is_set()
@abstractmethod
def _run(self) -> bool:
pass
def pause(self):
self.state = DownloadState.PAUSED
self._paused.set()
self._downloading.clear()
self.post_event(MediaDownloadPausedEvent)
def resume(self):
self.state = DownloadState.DOWNLOADING
self._paused.clear()
self._downloading.set()
self.post_event(MediaDownloadResumedEvent)
def run(self):
super().run()
interrupted = False
try:
self.on_start()
interrupted = not self._run()
if interrupted:
self.state = DownloadState.CANCELLED
else:
self.state = DownloadState.COMPLETED
except Exception as e:
self.state = DownloadState.ERROR
self.post_event(MediaDownloadErrorEvent, error=str(e))
self.logger.warning('Error while downloading URL: %s', e)
finally:
self.on_close()
def stop(self):
self.state = DownloadState.CANCELLED
self._stop_event.set()
self._downloading.clear()
def on_start(self):
self.state = DownloadState.STARTED
self.started_at = time.time()
self.post_event(MediaDownloadStartedEvent)
self._on_start(self)
def on_close(self):
self.ended_at = time.time()
if self.state == DownloadState.CANCELLED:
self.post_event(MediaDownloadCancelledEvent)
elif self.state == DownloadState.COMPLETED:
self.post_event(MediaDownloadCompletedEvent)
self._on_close(self)
def clear(self):
if self.state not in (DownloadState.COMPLETED, DownloadState.CANCELLED):
self.logger.info(
'Download thread for %s is still active, stopping', self.url
)
self.stop()
self.join(timeout=10)
self.post_event(MediaDownloadClearEvent)
def post_event(self, event_type: Type[MediaDownloadEvent], **kwargs):
kwargs = {
'resource': self.url,
'path': self.path,
'state': self.state.value,
'size': self.size,
'timeout': self.timeout,
'progress': self.progress,
'started_at': self.started_at,
'ended_at': self.ended_at,
**kwargs,
}
self._post_event(event_type, **kwargs)
def __setattr__(self, name: str, value: Optional[Any], /) -> None:
if name == 'progress' and value is not None:
if value < 0 or value > 100:
self.logger.debug('Invalid progress value:%s', value)
return
prev_progress = getattr(self, 'progress', None)
if prev_progress is None or (
int(prev_progress) != int(value)
and (
time.time() - self._last_progress_update_time
>= self._progress_update_interval
)
):
value = round(value, 2)
self._last_progress_update_time = time.time()
self.post_event(MediaDownloadProgressEvent, progress=value)
super().__setattr__(name, value)
class FileDownloadThread(DownloadThread):
"""
Thread that downloads a generic URL to a file.
"""
def _run(self):
interrupted = False
with requests.get(self.url, timeout=self.timeout, stream=True) as response:
response.raise_for_status()
self.size = int(response.headers.get('Content-Length', 0)) or None
with open(self.path, 'wb') as f:
self.on_start()
for chunk in response.iter_content(chunk_size=8192):
if not chunk or self.should_stop():
interrupted = self.should_stop()
if interrupted:
self.stop()
break
self.state = DownloadState.DOWNLOADING
f.write(chunk)
percent = f.tell() / self.size * 100 if self.size else 0
self.progress = percent
if self._paused.is_set():
wait_for_either(self._downloading, self._stop_event)
return not interrupted
class YouTubeDownloadThread(DownloadThread):
"""
Thread that downloads a YouTube URL to a file.
"""
def __init__(
self, *args, ytdl: str, youtube_format: Optional[str] = None, **kwargs
):
super().__init__(*args, **kwargs)
self._ytdl = ytdl
self._youtube_format = youtube_format
self._proc = None
self._proc_lock = threading.Lock()
def _parse_progress(self, line: str):
try:
progress = json.loads(line)
except json.JSONDecodeError:
return
status = progress.get('status')
if not status:
return
if status == 'finished':
self.progress = 100
return
if status == 'paused':
self.state = DownloadState.PAUSED
elif status == 'downloading':
self.state = DownloadState.DOWNLOADING
self.size = int(progress.get('total_bytes_estimate', 0)) or self.size
if self.size:
downloaded = int(progress.get('downloaded_bytes', 0))
self.progress = (downloaded / self.size) * 100
def _run(self):
ytdl_cmd = [
self._ytdl,
'--newline',
'--progress',
'--progress-delta',
str(self._progress_update_interval),
'--progress-template',
'%(progress)j',
*(['-f', self._youtube_format] if self._youtube_format else []),
self.url,
'-o',
self.path,
]
self.logger.info('Executing command %r', ytdl_cmd)
err = None
with subprocess.Popen(ytdl_cmd, stdout=subprocess.PIPE) as self._proc:
if self._proc.stdout:
for line in self._proc.stdout:
self.logger.debug(
'%s output: %s', self._ytdl, line.decode().strip()
)
self._parse_progress(line.decode())
if self.should_stop():
self.stop()
return self._proc.returncode == 0
if self._paused.is_set():
wait_for_either(self._downloading, self._stop_event)
if self._proc.returncode != 0:
err = self._proc.stderr.read().decode() if self._proc.stderr else None
raise RuntimeError(
f'{self._ytdl} failed with return code {self._proc.returncode}: {err}'
)
return True
def pause(self):
with self._proc_lock:
if self._proc:
self._proc.send_signal(signal.SIGSTOP)
super().pause()
def resume(self):
with self._proc_lock:
if self._proc:
self._proc.send_signal(signal.SIGCONT)
super().resume()
def stop(self):
state = None
with suppress(IOError, OSError), self._proc_lock:
if self._proc:
if self._proc.poll() is None:
self._proc.terminate()
self._proc.wait(timeout=3)
if self._proc.returncode is None:
self._proc.kill()
state = DownloadState.CANCELLED
elif self._proc.returncode != 0:
state = DownloadState.ERROR
else:
state = DownloadState.COMPLETED
self._proc = None
super().stop()
if state:
self.state = state
def on_close(self):
self.stop()
super().on_close()

View file

@ -0,0 +1,21 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class MediaResource:
"""
Models a media resource
"""
resource: str
url: str
title: Optional[str] = None
description: Optional[str] = None
filename: Optional[str] = None
image: Optional[str] = None
duration: Optional[float] = None
channel: Optional[str] = None
channel_url: Optional[str] = None
type: Optional[str] = None
resolution: Optional[str] = None

View file

@ -0,0 +1,12 @@
import enum
class PlayerState(enum.Enum):
"""
Models the possible states of a media player
"""
STOP = 'stop'
PLAY = 'play'
PAUSE = 'pause'
IDLE = 'idle'

View file

@ -179,21 +179,6 @@ class MediaKodiPlugin(MediaPlugin):
:param resource: URL or path to the media to be played :param resource: URL or path to the media to be played
""" """
youtube_id = self.get_youtube_id(resource)
if youtube_id:
try:
resource = self.get_youtube_url(youtube_id).output
except Exception as e:
self.logger.warning(
'youtube-dl error, falling back to Kodi YouTube plugin: {}'.format(
str(e)
)
)
resource = (
'plugin://plugin.video.youtube/?action=play_video&videoid='
+ youtube_id
)
if resource.startswith('file://'): if resource.startswith('file://'):
resource = resource[7:] resource = resource[7:]
@ -585,7 +570,7 @@ class MediaKodiPlugin(MediaPlugin):
:type position: float :type position: float
:param player_id: ID of the target player (default: configured/current player). :param player_id: ID of the target player (default: configured/current player).
""" """
return self.seek(position=position, player_id=player_id, *args, **kwargs) return self.seek(*args, position=position, player_id=player_id, **kwargs)
@action @action
def back(self, offset=30, player_id=None, *args, **kwargs): def back(self, offset=30, player_id=None, *args, **kwargs):

View file

@ -0,0 +1,65 @@
from marshmallow import fields
from marshmallow.schema import Schema
from platypush.plugins.media import DownloadState
from platypush.schemas import DateTime
class MediaDownloadSchema(Schema):
"""
Media download schema.
"""
url = fields.URL(
required=True,
metadata={
"description": "Download URL",
"example": "https://example.com/video.mp4",
},
)
path = fields.String(
required=True,
metadata={
"description": "Download path",
"example": "/path/to/download/video.mp4",
},
)
state = fields.Enum(
DownloadState,
required=True,
metadata={
"description": "Download state",
},
)
size = fields.Integer(
nullable=True,
metadata={
"description": "Download size (bytes)",
"example": 1024,
},
)
timeout = fields.Integer(
nullable=True,
metadata={
"description": "Download timeout (seconds)",
"example": 60,
},
)
started_at = DateTime(
nullable=True,
metadata={
"description": "Download start time",
},
)
ended_at = DateTime(
nullable=True,
metadata={
"description": "Download end time",
},
)

View file

@ -22,12 +22,14 @@ from threading import Event, Lock as TLock
from typing import Generator, Optional, Tuple, Type, Union from typing import Generator, Optional, Tuple, Type, Union
from dateutil import parser, tz from dateutil import parser, tz
from redis import Redis from redis import ConnectionPool, Redis
from rsa.key import PublicKey, PrivateKey, newkeys from rsa.key import PublicKey, PrivateKey, newkeys
logger = logging.getLogger('utils') logger = logging.getLogger('utils')
Lock = Union[PLock, TLock] # type: ignore Lock = Union[PLock, TLock] # type: ignore
redis_pools: dict[Tuple[str, int], ConnectionPool] = {}
def get_module_and_method_from_action(action): def get_module_and_method_from_action(action):
""" """
@ -608,6 +610,29 @@ def get_enabled_backends() -> dict:
return backends return backends
def get_redis_pool(*args, **kwargs) -> ConnectionPool:
"""
Get a Redis connection pool on the basis of the Redis configuration.
The Redis configuration can be loaded from:
1. The ``redis`` plugin.
2. The ``backend.redis`` configuration (``redis_args`` attribute)
"""
if not (args or kwargs):
kwargs = get_redis_conf()
pool_key = (kwargs.get('host', 'localhost'), kwargs.get('port', 6379))
pool = redis_pools.get(pool_key)
if not pool:
pool = ConnectionPool(*args, **kwargs)
redis_pools[pool_key] = pool
return pool
def get_redis_conf() -> dict: def get_redis_conf() -> dict:
""" """
Get the Redis connection arguments from the configuration. Get the Redis connection arguments from the configuration.
@ -631,10 +656,7 @@ def get_redis(*args, **kwargs) -> Redis:
2. The ``backend.redis`` configuration (``redis_args`` attribute) 2. The ``backend.redis`` configuration (``redis_args`` attribute)
""" """
if not (args or kwargs): return Redis(connection_pool=get_redis_pool(*args, **kwargs))
kwargs = get_redis_conf()
return Redis(*args, **kwargs)
def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.datetime: def to_datetime(t: Union[str, int, float, datetime.datetime]) -> datetime.datetime:

View file

@ -23,3 +23,4 @@ extend-ignore =
W503 W503
SIM104 SIM104
SIM105 SIM105
SIM115