Compare commits

...

8 Commits

Author SHA1 Message Date
Fabio Manganiello 575635fd6b
Defined `set` as a base method for all plugins that implement writeable entities 2023-02-11 04:04:21 +01:00
Fabio Manganiello 4365352331
[WIP] s/set_value/set/g for entities 2023-02-11 03:57:23 +01:00
Fabio Manganiello b0cc80ceb0
Rewriting `bluetooth.ble` plugin to use `bleak` instead of `gattlib`. 2023-02-10 17:40:20 +01:00
Fabio Manganiello f30e077a5a
Support for custom Bluetooth adapter on `switchbot.bluetooth`. 2023-02-08 23:01:05 +01:00
Fabio Manganiello 8469a1027f
Migrated/refactored `switchbot.bluetooth` integration.
- Out `gattlib` + `pybluez`, in `bleak`. It's not platform-dependent, it doesn't
  require libboost and other heavy build dependencies, and it doesn't require the
  user that runs the service from having special privileges to access raw
  Bluetooth sockets.

- Better integration with Platypush native entities. The devices are now mapped
  to write-only `EnumSwitch` entities, and the status returns the serialized
  representation of those entities instead of the previous intermediate
  representation.
2023-02-08 22:42:00 +01:00
Fabio Manganiello 35719b0da9
Let `publish_entities` return the list of transformed_entities 2023-02-08 02:09:34 +01:00
Fabio Manganiello e04870209e
More LINT fixes 2023-02-08 01:50:54 +01:00
Fabio Manganiello a98a5f0980
typo fix 2023-02-08 01:09:25 +01:00
50 changed files with 743 additions and 634 deletions

View File

@ -13,7 +13,6 @@ Backends
platypush/backend/bluetooth.fileserver.rst
platypush/backend/bluetooth.pushserver.rst
platypush/backend/bluetooth.scanner.rst
platypush/backend/bluetooth.scanner.ble.rst
platypush/backend/button.flic.rst
platypush/backend/camera.pi.rst
platypush/backend/chat.telegram.rst
@ -74,6 +73,5 @@ Backends
platypush/backend/weather.openweathermap.rst
platypush/backend/websocket.rst
platypush/backend/wiimote.rst
platypush/backend/zigbee.mqtt.rst
platypush/backend/zwave.rst
platypush/backend/zwave.mqtt.rst

View File

@ -296,6 +296,7 @@ autodoc_mock_imports = [
'aiofiles',
'aiofiles.os',
'async_lru',
'bleak',
]
sys.path.insert(0, os.path.abspath('../..'))

View File

@ -11,6 +11,7 @@ Events
platypush/events/application.rst
platypush/events/assistant.rst
platypush/events/bluetooth.rst
platypush/events/bluetooth.ble.rst
platypush/events/button.flic.rst
platypush/events/camera.rst
platypush/events/chat.slack.rst

View File

@ -1,5 +0,0 @@
``bluetooth.scanner.ble``
===========================================
.. automodule:: platypush.backend.bluetooth.scanner.ble
:members:

View File

@ -1,5 +0,0 @@
``zigbee.mqtt``
=================================
.. automodule:: platypush.backend.zigbee.mqtt
:members:

View File

@ -0,0 +1,5 @@
``bluetooth.ble``
=================
.. automodule:: platypush.message.event.bluetooth.ble
:members:

View File

@ -1,33 +0,0 @@
from typing import Optional
from platypush.backend.bluetooth.scanner import BluetoothScannerBackend
class BluetoothBleScannerBackend(BluetoothScannerBackend):
"""
This backend periodically scans for available bluetooth low-energy devices and returns events when a devices enter
or exits the range.
Triggers:
* :class:`platypush.message.event.bluetooth.BluetoothDeviceFoundEvent` when a new bluetooth device is found.
* :class:`platypush.message.event.bluetooth.BluetoothDeviceLostEvent` when a bluetooth device is lost.
Requires:
* The :class:`platypush.plugins.bluetooth.BluetoothBlePlugin` plugin working.
"""
def __init__(self, interface: Optional[int] = None, scan_duration: int = 10, **kwargs):
"""
:param interface: Bluetooth adapter name to use (default configured on the ``bluetooth.ble`` plugin if None).
:param scan_duration: How long the scan should run (default: 10 seconds).
"""
super().__init__(plugin='bluetooth.ble', plugin_args={
'interface': interface,
'duration': scan_duration,
}, **kwargs)
# vim:sw=4:ts=4:et:

View File

@ -1,10 +0,0 @@
manifest:
events:
platypush.message.event.bluetooth.BluetoothDeviceFoundEvent: when a new bluetooth
device is found.
platypush.message.event.bluetooth.BluetoothDeviceLostEvent: when a bluetooth device
is lost.
install:
pip: []
package: platypush.backend.bluetooth.scanner.ble
type: backend

View File

@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width,initial-scale=1"><link rel="icon" href="/favicon.ico"><link rel="stylesheet" href="/fonts/poppins.css"><title>platypush</title><script defer="defer" type="module" src="/static/js/chunk-vendors.95bedba1.js"></script><script defer="defer" type="module" src="/static/js/app.c6ad79d3.js"></script><link href="/static/css/chunk-vendors.0fcd36f0.css" rel="stylesheet"><link href="/static/css/app.d7cb662c.css" rel="stylesheet"><script defer="defer" src="/static/js/chunk-vendors-legacy.79dede0c.js" nomodule></script><script defer="defer" src="/static/js/app-legacy.0a704e98.js" nomodule></script></head><body><noscript><strong>We're sorry but platypush doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id="app"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width,initial-scale=1"><link rel="icon" href="/favicon.ico"><link rel="stylesheet" href="/fonts/poppins.css"><title>platypush</title><script defer="defer" type="module" src="/static/js/chunk-vendors.95bedba1.js"></script><script defer="defer" type="module" src="/static/js/app.9397ac28.js"></script><link href="/static/css/chunk-vendors.0fcd36f0.css" rel="stylesheet"><link href="/static/css/app.d7cb662c.css" rel="stylesheet"><script defer="defer" src="/static/js/chunk-vendors-legacy.79dede0c.js" nomodule></script><script defer="defer" src="/static/js/app-legacy.22d80610.js" nomodule></script></head><body><noscript><strong>We're sorry but platypush doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id="app"></div></body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,2 +0,0 @@
"use strict";(self["webpackChunkplatypush"]=self["webpackChunkplatypush"]||[]).push([[1897,3459],{3459:function(e,t,l){l.r(t),l.d(t,{default:function(){return v}});var a=l(6252),n=l(3577),o=l(3540);const i={key:0,src:o,class:"loading"},s={key:1,class:"fas fa-circle-exclamation error"};function c(e,t,l,o,c,u){const r=(0,a.up)("Icon");return(0,a.wg)(),(0,a.iD)("div",{class:(0,n.C_)(["entity-icon-container",{"with-color-fill":!!u.colorFill}]),style:(0,n.j5)(u.colorFillStyle)},[l.loading?((0,a.wg)(),(0,a.iD)("img",i)):l.error?((0,a.wg)(),(0,a.iD)("i",s)):((0,a.wg)(),(0,a.j4)(r,(0,n.vs)((0,a.dG)({key:2},u.computedIconNormalized)),null,16))],6)}var u=l(1478),r={name:"EntityIcon",components:{Icon:u.Z},props:{loading:{type:Boolean,default:!1},error:{type:Boolean,default:!1},entity:{type:Object,required:!0},icon:{type:Object,default:()=>{}},hasColorFill:{type:Boolean,default:!1}},data(){return{component:null,modalVisible:!1}},computed:{computedIcon(){let e={...this.entity?.meta?.icon||{}};return Object.keys(this.icon||{}).length&&(e=this.icon),{...e}},colorFill(){return this.hasColorFill&&this.computedIcon.color},colorFillStyle(){return this.colorFill&&!this.error?{background:this.colorFill}:{}},computedIconNormalized(){const e={...this.computedIcon};return this.colorFill&&delete e.color,e},type(){let e=this.entity.type||"";return e.charAt(0).toUpperCase()+e.slice(1)}}},d=l(3744);const p=(0,d.Z)(r,[["render",c],["__scopeId","data-v-4fad24e6"]]);var v=p},1897:function(e,t,l){l.r(t),l.d(t,{default:function(){return b}});var a=l(6252),n=l(3577),o=l(9963);const i={class:"entity switch-container"},s={class:"col-1 icon"},c={class:"col-s-8 col-m-9 label"},u=["textContent"],r={class:"col-s-3 col-m-2 buttons pull-right"},d=["textContent"],p={class:"row"},v={class:"input"},h=["disabled"],y={key:0,value:"",selected:""},g=["value","selected","textContent"];function f(e,t,l,f,m,w){const _=(0,a.up)("EntityIcon");return(0,a.wg)(),(0,a.iD)("div",i,[(0,a._)("div",{class:(0,n.C_)(["head",{collapsed:e.collapsed}])},[(0,a._)("div",s,[(0,a.Wm)(_,{entity:e.value,loading:e.loading,error:e.error},null,8,["entity","loading","error"])]),(0,a._)("div",c,[(0,a._)("div",{class:"name",textContent:(0,n.zw)(e.value.name)},null,8,u)]),(0,a._)("div",r,[w.hasValues?((0,a.wg)(),(0,a.iD)("button",{key:0,onClick:t[0]||(t[0]=(0,o.iM)((t=>e.collapsed=!e.collapsed),["stop"]))},[(0,a._)("i",{class:(0,n.C_)(["fas",{"fa-angle-up":!e.collapsed,"fa-angle-down":e.collapsed}])},null,2)])):(0,a.kq)("",!0),null!=e.value?.value?((0,a.wg)(),(0,a.iD)("span",{key:1,class:"value",textContent:(0,n.zw)(e.value.values[e.value.value]||e.value.value)},null,8,d)):(0,a.kq)("",!0)])],2),e.collapsed?(0,a.kq)("",!0):((0,a.wg)(),(0,a.iD)("div",{key:0,class:"body",onClick:t[2]||(t[2]=(0,o.iM)(((...e)=>w.prevent&&w.prevent(...e)),["stop"]))},[(0,a._)("div",p,[(0,a._)("div",v,[(0,a._)("select",{onInput:t[1]||(t[1]=(...e)=>w.setValue&&w.setValue(...e)),ref:"values",disabled:e.loading},[e.value.is_write_only?((0,a.wg)(),(0,a.iD)("option",y,"--")):(0,a.kq)("",!0),((0,a.wg)(!0),(0,a.iD)(a.HY,null,(0,a.Ko)(w.displayValues,((t,l)=>((0,a.wg)(),(0,a.iD)("option",{value:l,selected:l==e.value.value,key:l,textContent:(0,n.zw)(t)},null,8,g)))),128))],40,h)])])]))])}var m=l(7909),w=l(3459),_={name:"EnumSwitch",components:{EntityIcon:w["default"]},mixins:[m["default"]],computed:{hasValues(){return!!Object.values(this?.value?.values||{}).length},displayValues(){return this.value?.values instanceof Array?this.value.values.reduce(((e,t)=>(e[t]=t,e)),{}):this.value?.values||{}}},methods:{prevent(e){return e.stopPropagation(),!1},async setValue(e){if(e.target.value?.length){if(this.$emit("loading",!0),this.value.is_write_only){const e=this;setTimeout((()=>{e.$refs.values.value=""}),1e3)}try{await this.request("entities.execute",{id:this.value.id,action:"set_value",data:e.target.value})}finally{this.$emit("loading",!1)}}}}},k=l(3744);const C=(0,k.Z)(_,[["render",f],["__scopeId","data-v-3404ecf7"]]);var b=C},3540:function(e,t,l){e.exports=l.p+"static/img/spinner.c0bee445.gif"}}]);
//# sourceMappingURL=1897.d577146b.js.map

View File

@ -1,2 +1,2 @@
"use strict";(self["webpackChunkplatypush"]=self["webpackChunkplatypush"]||[]).push([[1897,3459],{3459:function(e,t,n){n.r(t),n.d(t,{default:function(){return f}});var l=n(6252),o=n(3577),i=n(3540),a={key:0,src:i,class:"loading"},u={key:1,class:"fas fa-circle-exclamation error"};function r(e,t,n,i,r,s){var c=(0,l.up)("Icon");return(0,l.wg)(),(0,l.iD)("div",{class:(0,o.C_)(["entity-icon-container",{"with-color-fill":!!s.colorFill}]),style:(0,o.j5)(s.colorFillStyle)},[n.loading?((0,l.wg)(),(0,l.iD)("img",a)):n.error?((0,l.wg)(),(0,l.iD)("i",u)):((0,l.wg)(),(0,l.j4)(c,(0,o.vs)((0,l.dG)({key:2},s.computedIconNormalized)),null,16))],6)}var s=n(4648),c=(n(7941),n(7042),n(1478)),d={name:"EntityIcon",components:{Icon:c.Z},props:{loading:{type:Boolean,default:!1},error:{type:Boolean,default:!1},entity:{type:Object,required:!0},icon:{type:Object,default:function(){}},hasColorFill:{type:Boolean,default:!1}},data:function(){return{component:null,modalVisible:!1}},computed:{computedIcon:function(){var e,t,n=(0,s.Z)({},(null===(e=this.entity)||void 0===e||null===(t=e.meta)||void 0===t?void 0:t.icon)||{});return Object.keys(this.icon||{}).length&&(n=this.icon),(0,s.Z)({},n)},colorFill:function(){return this.hasColorFill&&this.computedIcon.color},colorFillStyle:function(){return this.colorFill&&!this.error?{background:this.colorFill}:{}},computedIconNormalized:function(){var e=(0,s.Z)({},this.computedIcon);return this.colorFill&&delete e.color,e},type:function(){var e=this.entity.type||"";return e.charAt(0).toUpperCase()+e.slice(1)}}},v=n(3744);const p=(0,v.Z)(d,[["render",r],["__scopeId","data-v-4fad24e6"]]);var f=p},1897:function(e,t,n){n.r(t),n.d(t,{default:function(){return x}});n(8309),n(1539),n(3948);var l=n(6252),o=n(3577),i=n(9963),a={class:"entity switch-container"},u={class:"col-1 icon"},r={class:"col-s-8 col-m-9 label"},s=["textContent"],c={class:"col-s-3 col-m-2 buttons pull-right"},d=["textContent"],v={class:"row"},p={class:"input"},f=["disabled"],h={key:0,value:"",selected:""},y=["value","selected","textContent"];function g(e,t,n,g,m,w){var k,_=(0,l.up)("EntityIcon");return(0,l.wg)(),(0,l.iD)("div",a,[(0,l._)("div",{class:(0,o.C_)(["head",{collapsed:e.collapsed}])},[(0,l._)("div",u,[(0,l.Wm)(_,{entity:e.value,loading:e.loading,error:e.error},null,8,["entity","loading","error"])]),(0,l._)("div",r,[(0,l._)("div",{class:"name",textContent:(0,o.zw)(e.value.name)},null,8,s)]),(0,l._)("div",c,[w.hasValues?((0,l.wg)(),(0,l.iD)("button",{key:0,onClick:t[0]||(t[0]=(0,i.iM)((function(t){return e.collapsed=!e.collapsed}),["stop"]))},[(0,l._)("i",{class:(0,o.C_)(["fas",{"fa-angle-up":!e.collapsed,"fa-angle-down":e.collapsed}])},null,2)])):(0,l.kq)("",!0),null!=(null===(k=e.value)||void 0===k?void 0:k.value)?((0,l.wg)(),(0,l.iD)("span",{key:1,class:"value",textContent:(0,o.zw)(e.value.values[e.value.value]||e.value.value)},null,8,d)):(0,l.kq)("",!0)])],2),e.collapsed?(0,l.kq)("",!0):((0,l.wg)(),(0,l.iD)("div",{key:0,class:"body",onClick:t[2]||(t[2]=(0,i.iM)((function(){return w.prevent&&w.prevent.apply(w,arguments)}),["stop"]))},[(0,l._)("div",v,[(0,l._)("div",p,[(0,l._)("select",{onInput:t[1]||(t[1]=function(){return w.setValue&&w.setValue.apply(w,arguments)}),ref:"values",disabled:e.loading},[e.value.is_write_only?((0,l.wg)(),(0,l.iD)("option",h,"--")):(0,l.kq)("",!0),((0,l.wg)(!0),(0,l.iD)(l.HY,null,(0,l.Ko)(w.displayValues,(function(t,n){return(0,l.wg)(),(0,l.iD)("option",{value:n,selected:n==e.value.value,key:n,textContent:(0,o.zw)(t)},null,8,y)})),128))],40,f)])])]))])}var m=n(8534),w=(n(5666),n(2479),n(7909)),k=n(3459),_={name:"EnumSwitch",components:{EntityIcon:k["default"]},mixins:[w["default"]],computed:{hasValues:function(){var e;return!!Object.values((null===this||void 0===this||null===(e=this.value)||void 0===e?void 0:e.values)||{}).length},displayValues:function(){var e,t;return(null===(e=this.value)||void 0===e?void 0:e.values)instanceof Array?this.value.values.reduce((function(e,t){return e[t]=t,e}),{}):(null===(t=this.value)||void 0===t?void 0:t.values)||{}}},methods:{prevent:function(e){return e.stopPropagation(),!1},setValue:function(e){var t=this;return(0,m.Z)(regeneratorRuntime.mark((function n(){var l,o;return regeneratorRuntime.wrap((function(n){while(1)switch(n.prev=n.next){case 0:if(null!==(l=e.target.value)&&void 0!==l&&l.length){n.next=2;break}return n.abrupt("return");case 2:return t.$emit("loading",!0),t.value.is_write_only&&(o=t,setTimeout((function(){o.$refs.values.value=""}),1e3)),n.prev=4,n.next=7,t.request("entities.execute",{id:t.value.id,action:"set_value",data:e.target.value});case 7:return n.prev=7,t.$emit("loading",!1),n.finish(7);case 10:case"end":return n.stop()}}),n,null,[[4,,7,10]])})))()}}},b=n(3744);const C=(0,b.Z)(_,[["render",g],["__scopeId","data-v-3404ecf7"]]);var x=C},3540:function(e,t,n){e.exports=n.p+"static/img/spinner.c0bee445.gif"}}]);
//# sourceMappingURL=1897-legacy.657985da.js.map
"use strict";(self["webpackChunkplatypush"]=self["webpackChunkplatypush"]||[]).push([[3398,3459],{3459:function(e,t,n){n.r(t),n.d(t,{default:function(){return f}});var l=n(6252),o=n(3577),i=n(3540),a={key:0,src:i,class:"loading"},u={key:1,class:"fas fa-circle-exclamation error"};function r(e,t,n,i,r,s){var c=(0,l.up)("Icon");return(0,l.wg)(),(0,l.iD)("div",{class:(0,o.C_)(["entity-icon-container",{"with-color-fill":!!s.colorFill}]),style:(0,o.j5)(s.colorFillStyle)},[n.loading?((0,l.wg)(),(0,l.iD)("img",a)):n.error?((0,l.wg)(),(0,l.iD)("i",u)):((0,l.wg)(),(0,l.j4)(c,(0,o.vs)((0,l.dG)({key:2},s.computedIconNormalized)),null,16))],6)}var s=n(4648),c=(n(7941),n(7042),n(1478)),d={name:"EntityIcon",components:{Icon:c.Z},props:{loading:{type:Boolean,default:!1},error:{type:Boolean,default:!1},entity:{type:Object,required:!0},icon:{type:Object,default:function(){}},hasColorFill:{type:Boolean,default:!1}},data:function(){return{component:null,modalVisible:!1}},computed:{computedIcon:function(){var e,t,n=(0,s.Z)({},(null===(e=this.entity)||void 0===e||null===(t=e.meta)||void 0===t?void 0:t.icon)||{});return Object.keys(this.icon||{}).length&&(n=this.icon),(0,s.Z)({},n)},colorFill:function(){return this.hasColorFill&&this.computedIcon.color},colorFillStyle:function(){return this.colorFill&&!this.error?{background:this.colorFill}:{}},computedIconNormalized:function(){var e=(0,s.Z)({},this.computedIcon);return this.colorFill&&delete e.color,e},type:function(){var e=this.entity.type||"";return e.charAt(0).toUpperCase()+e.slice(1)}}},v=n(3744);const p=(0,v.Z)(d,[["render",r],["__scopeId","data-v-4fad24e6"]]);var f=p},3398:function(e,t,n){n.r(t),n.d(t,{default:function(){return x}});n(8309),n(1539),n(3948);var l=n(6252),o=n(3577),i=n(9963),a={class:"entity switch-container"},u={class:"col-1 icon"},r={class:"col-s-8 col-m-9 label"},s=["textContent"],c={class:"col-s-3 col-m-2 buttons pull-right"},d=["textContent"],v={class:"row"},p={class:"input"},f=["disabled"],h={key:0,value:"",selected:""},y=["value","selected","textContent"];function g(e,t,n,g,m,w){var k,_=(0,l.up)("EntityIcon");return(0,l.wg)(),(0,l.iD)("div",a,[(0,l._)("div",{class:(0,o.C_)(["head",{collapsed:e.collapsed}])},[(0,l._)("div",u,[(0,l.Wm)(_,{entity:e.value,loading:e.loading,error:e.error},null,8,["entity","loading","error"])]),(0,l._)("div",r,[(0,l._)("div",{class:"name",textContent:(0,o.zw)(e.value.name)},null,8,s)]),(0,l._)("div",c,[w.hasValues?((0,l.wg)(),(0,l.iD)("button",{key:0,onClick:t[0]||(t[0]=(0,i.iM)((function(t){return e.collapsed=!e.collapsed}),["stop"]))},[(0,l._)("i",{class:(0,o.C_)(["fas",{"fa-angle-up":!e.collapsed,"fa-angle-down":e.collapsed}])},null,2)])):(0,l.kq)("",!0),null!=(null===(k=e.value)||void 0===k?void 0:k.value)?((0,l.wg)(),(0,l.iD)("span",{key:1,class:"value",textContent:(0,o.zw)(e.value.values[e.value.value]||e.value.value)},null,8,d)):(0,l.kq)("",!0)])],2),e.collapsed?(0,l.kq)("",!0):((0,l.wg)(),(0,l.iD)("div",{key:0,class:"body",onClick:t[2]||(t[2]=(0,i.iM)((function(){return w.prevent&&w.prevent.apply(w,arguments)}),["stop"]))},[(0,l._)("div",v,[(0,l._)("div",p,[(0,l._)("select",{onInput:t[1]||(t[1]=function(){return w.setValue&&w.setValue.apply(w,arguments)}),ref:"values",disabled:e.loading},[e.value.is_write_only?((0,l.wg)(),(0,l.iD)("option",h,"--")):(0,l.kq)("",!0),((0,l.wg)(!0),(0,l.iD)(l.HY,null,(0,l.Ko)(w.displayValues,(function(t,n){return(0,l.wg)(),(0,l.iD)("option",{value:n,selected:n==e.value.value,key:n,textContent:(0,o.zw)(t)},null,8,y)})),128))],40,f)])])]))])}var m=n(8534),w=(n(5666),n(2479),n(7909)),k=n(3459),_={name:"EnumSwitch",components:{EntityIcon:k["default"]},mixins:[w["default"]],computed:{hasValues:function(){var e;return!!Object.values((null===this||void 0===this||null===(e=this.value)||void 0===e?void 0:e.values)||{}).length},displayValues:function(){var e,t;return(null===(e=this.value)||void 0===e?void 0:e.values)instanceof Array?this.value.values.reduce((function(e,t){return e[t]=t,e}),{}):(null===(t=this.value)||void 0===t?void 0:t.values)||{}}},methods:{prevent:function(e){return e.stopPropagation(),!1},setValue:function(e){var t=this;return(0,m.Z)(regeneratorRuntime.mark((function n(){var l,o;return regeneratorRuntime.wrap((function(n){while(1)switch(n.prev=n.next){case 0:if(null!==(l=e.target.value)&&void 0!==l&&l.length){n.next=2;break}return n.abrupt("return");case 2:return t.$emit("loading",!0),t.value.is_write_only&&(o=t,setTimeout((function(){o.$refs.values.value=""}),1e3)),n.prev=4,n.next=7,t.request("entities.execute",{id:t.value.id,action:"set",value:e.target.value});case 7:return n.prev=7,t.$emit("loading",!1),n.finish(7);case 10:case"end":return n.stop()}}),n,null,[[4,,7,10]])})))()}}},b=n(3744);const C=(0,b.Z)(_,[["render",g],["__scopeId","data-v-ba5c657c"]]);var x=C},3540:function(e,t,n){e.exports=n.p+"static/img/spinner.c0bee445.gif"}}]);
//# sourceMappingURL=3398-legacy.f60988fb.js.map

View File

@ -0,0 +1,2 @@
"use strict";(self["webpackChunkplatypush"]=self["webpackChunkplatypush"]||[]).push([[3398,3459],{3459:function(e,t,l){l.r(t),l.d(t,{default:function(){return v}});var n=l(6252),a=l(3577),o=l(3540);const i={key:0,src:o,class:"loading"},s={key:1,class:"fas fa-circle-exclamation error"};function c(e,t,l,o,c,u){const r=(0,n.up)("Icon");return(0,n.wg)(),(0,n.iD)("div",{class:(0,a.C_)(["entity-icon-container",{"with-color-fill":!!u.colorFill}]),style:(0,a.j5)(u.colorFillStyle)},[l.loading?((0,n.wg)(),(0,n.iD)("img",i)):l.error?((0,n.wg)(),(0,n.iD)("i",s)):((0,n.wg)(),(0,n.j4)(r,(0,a.vs)((0,n.dG)({key:2},u.computedIconNormalized)),null,16))],6)}var u=l(1478),r={name:"EntityIcon",components:{Icon:u.Z},props:{loading:{type:Boolean,default:!1},error:{type:Boolean,default:!1},entity:{type:Object,required:!0},icon:{type:Object,default:()=>{}},hasColorFill:{type:Boolean,default:!1}},data(){return{component:null,modalVisible:!1}},computed:{computedIcon(){let e={...this.entity?.meta?.icon||{}};return Object.keys(this.icon||{}).length&&(e=this.icon),{...e}},colorFill(){return this.hasColorFill&&this.computedIcon.color},colorFillStyle(){return this.colorFill&&!this.error?{background:this.colorFill}:{}},computedIconNormalized(){const e={...this.computedIcon};return this.colorFill&&delete e.color,e},type(){let e=this.entity.type||"";return e.charAt(0).toUpperCase()+e.slice(1)}}},d=l(3744);const p=(0,d.Z)(r,[["render",c],["__scopeId","data-v-4fad24e6"]]);var v=p},3398:function(e,t,l){l.r(t),l.d(t,{default:function(){return C}});var n=l(6252),a=l(3577),o=l(9963);const i={class:"entity switch-container"},s={class:"col-1 icon"},c={class:"col-s-8 col-m-9 label"},u=["textContent"],r={class:"col-s-3 col-m-2 buttons pull-right"},d=["textContent"],p={class:"row"},v={class:"input"},h=["disabled"],y={key:0,value:"",selected:""},g=["value","selected","textContent"];function f(e,t,l,f,m,w){const k=(0,n.up)("EntityIcon");return(0,n.wg)(),(0,n.iD)("div",i,[(0,n._)("div",{class:(0,a.C_)(["head",{collapsed:e.collapsed}])},[(0,n._)("div",s,[(0,n.Wm)(k,{entity:e.value,loading:e.loading,error:e.error},null,8,["entity","loading","error"])]),(0,n._)("div",c,[(0,n._)("div",{class:"name",textContent:(0,a.zw)(e.value.name)},null,8,u)]),(0,n._)("div",r,[w.hasValues?((0,n.wg)(),(0,n.iD)("button",{key:0,onClick:t[0]||(t[0]=(0,o.iM)((t=>e.collapsed=!e.collapsed),["stop"]))},[(0,n._)("i",{class:(0,a.C_)(["fas",{"fa-angle-up":!e.collapsed,"fa-angle-down":e.collapsed}])},null,2)])):(0,n.kq)("",!0),null!=e.value?.value?((0,n.wg)(),(0,n.iD)("span",{key:1,class:"value",textContent:(0,a.zw)(e.value.values[e.value.value]||e.value.value)},null,8,d)):(0,n.kq)("",!0)])],2),e.collapsed?(0,n.kq)("",!0):((0,n.wg)(),(0,n.iD)("div",{key:0,class:"body",onClick:t[2]||(t[2]=(0,o.iM)(((...e)=>w.prevent&&w.prevent(...e)),["stop"]))},[(0,n._)("div",p,[(0,n._)("div",v,[(0,n._)("select",{onInput:t[1]||(t[1]=(...e)=>w.setValue&&w.setValue(...e)),ref:"values",disabled:e.loading},[e.value.is_write_only?((0,n.wg)(),(0,n.iD)("option",y,"--")):(0,n.kq)("",!0),((0,n.wg)(!0),(0,n.iD)(n.HY,null,(0,n.Ko)(w.displayValues,((t,l)=>((0,n.wg)(),(0,n.iD)("option",{value:l,selected:l==e.value.value,key:l,textContent:(0,a.zw)(t)},null,8,g)))),128))],40,h)])])]))])}var m=l(7909),w=l(3459),k={name:"EnumSwitch",components:{EntityIcon:w["default"]},mixins:[m["default"]],computed:{hasValues(){return!!Object.values(this?.value?.values||{}).length},displayValues(){return this.value?.values instanceof Array?this.value.values.reduce(((e,t)=>(e[t]=t,e)),{}):this.value?.values||{}}},methods:{prevent(e){return e.stopPropagation(),!1},async setValue(e){if(e.target.value?.length){if(this.$emit("loading",!0),this.value.is_write_only){const e=this;setTimeout((()=>{e.$refs.values.value=""}),1e3)}try{await this.request("entities.execute",{id:this.value.id,action:"set",value:e.target.value})}finally{this.$emit("loading",!1)}}}}},_=l(3744);const b=(0,_.Z)(k,[["render",f],["__scopeId","data-v-ba5c657c"]]);var C=b},3540:function(e,t,l){e.exports=l.p+"static/img/spinner.c0bee445.gif"}}]);
//# sourceMappingURL=3398.4f840e02.js.map

View File

@ -80,8 +80,8 @@ export default {
try {
await this.request('entities.execute', {
id: this.value.id,
action: 'set_value',
data: +event.target.value,
action: 'set',
value: +event.target.value,
})
} finally {
this.$emit('loading', false)

View File

@ -88,8 +88,8 @@ export default {
try {
await this.request('entities.execute', {
id: this.value.id,
action: 'set_value',
data: event.target.value,
action: 'set',
value: event.target.value,
})
} finally {
this.$emit('loading', false)

View File

@ -2,7 +2,7 @@ import hashlib
import json
import os
import threading
from typing import Optional, List, Callable
from typing import Any, Dict, Optional, List, Callable
import paho.mqtt.client as mqtt
@ -16,6 +16,10 @@ from platypush.plugins.mqtt import MqttPlugin as MQTTPlugin
class MqttClient(mqtt.Client, threading.Thread):
"""
Wrapper class for an MQTT client executed in a separate thread.
"""
def __init__(
self,
*args,
@ -78,7 +82,7 @@ class MqttClient(mqtt.Client, threading.Thread):
def unsubscribe(self, *topics, **kwargs):
"""
Client unsubscription handler.
Client unsubscribe handler.
"""
if not topics:
topics = self.topics
@ -127,9 +131,10 @@ class MqttBackend(Backend):
def __init__(
self,
*args,
host: Optional[str] = None,
port: int = _default_mqtt_port,
topic='platypush_bus_mq',
topic: str = 'platypush_bus_mq',
subscribe_default_topic: bool = True,
tls_cafile: Optional[str] = None,
tls_certfile: Optional[str] = None,
@ -141,7 +146,6 @@ class MqttBackend(Backend):
password: Optional[str] = None,
client_id: Optional[str] = None,
listeners=None,
*args,
**kwargs,
):
"""
@ -202,7 +206,7 @@ class MqttBackend(Backend):
self.tls_insecure = tls_insecure
self.username = username
self.password = password
self.client_id: str = client_id or Config.get('device_id') # type: ignore
self.client_id: str = client_id or Config.get('device_id')
else:
client = get_plugin('mqtt')
assert (
@ -219,14 +223,14 @@ class MqttBackend(Backend):
self.tls_insecure = client.tls_insecure
self.username = client.username
self.password = client.password
self.client_id: str = client_id or client.client_id # type: ignore
self.client_id = client_id or client.client_id
self.topic = '{}/{}'.format(topic, self.device_id)
self.topic = f'{topic}/{self.device_id}'
self.subscribe_default_topic = subscribe_default_topic
self._listeners = {} # client_id -> MqttClient map
self._listeners: Dict[str, MqttClient] = {} # client_id -> MqttClient map
self.listeners_conf = listeners or []
def send_message(self, msg, topic: Optional[str] = None, **kwargs):
def send_message(self, msg, *_, topic: Optional[str] = None, **kwargs):
try:
client = get_plugin('mqtt')
client.send_message(
@ -252,9 +256,8 @@ class MqttBackend(Backend):
return os.path.abspath(os.path.expanduser(path)) if path else path
def add_listeners(self, *listeners):
# noinspection PyShadowingNames,PyUnusedLocal
for i, listener in enumerate(listeners):
host = listener.get('host')
host = listener.get('host', self.host)
if host:
port = listener.get('port', self._default_mqtt_port)
username = listener.get('username')
@ -280,7 +283,7 @@ class MqttBackend(Backend):
topics = listener.get('topics')
if not topics:
self.logger.warning(
'No list of topics specified for listener n.{}'.format(i + 1)
'No list of topics specified for listener n.%d', i + 1
)
continue
@ -308,21 +311,21 @@ class MqttBackend(Backend):
port: int,
topics: Optional[List[str]] = None,
client_id: Optional[str] = None,
on_message: Optional[bool] = None,
on_message: Optional[Callable[[MqttClient, Any, mqtt.MQTTMessage], Any]] = None,
) -> str:
return '{client_id}-{client_hash}'.format(
client_id=client_id or self.client_id,
client_hash=hashlib.sha1(
'|'.join(
[
host,
str(port),
json.dumps(sorted(topics or [])),
str(id(on_message)),
]
).encode()
).hexdigest(),
)
client_id = client_id or self.client_id
client_hash = hashlib.sha1(
'|'.join(
[
host,
str(port),
json.dumps(sorted(topics or [])),
str(id(on_message)),
]
).encode()
).hexdigest()
return f'{client_id}-{client_hash}'
def _get_client(
self,
@ -367,47 +370,45 @@ class MqttBackend(Backend):
on_message=on_message,
)
client.subscribe(*topics)
if topics:
client.subscribe(*topics)
return client
def on_mqtt_message(self):
def handler(client, __, msg):
def handler(client: MqttClient, _, msg: mqtt.MQTTMessage):
data = msg.payload
# noinspection PyBroadException
try:
data = data.decode('utf-8')
data = json.loads(data)
except Exception as e:
self.logger.debug(str(e))
# noinspection PyProtectedMember
self.bus.post(
MQTTMessageEvent(
host=client._host, port=client._port, topic=msg.topic, msg=data
host=client.host, port=client.port, topic=msg.topic, msg=data
)
)
return handler
def on_exec_message(self):
def handler(_, __, msg):
# noinspection PyShadowingNames
def handler(_, __, msg: mqtt.MQTTMessage):
def response_thread(msg):
response = self.get_message_response(msg)
if not response:
return
response_topic = '{}/responses/{}'.format(self.topic, msg.id)
response_topic = f'{self.topic}/responses/{msg.id}'
self.logger.info(
'Processing response on the MQTT topic {}: {}'.format(
response_topic, response
)
'Processing response on the MQTT topic %s: %s',
response_topic,
response,
)
self.send_message(response, topic=response_topic)
msg = msg.payload.decode('utf-8')
# noinspection PyBroadException
try:
msg = json.loads(msg)
msg = Message.build(msg)
@ -417,7 +418,7 @@ class MqttBackend(Backend):
if not msg:
return
self.logger.info('Received message on the MQTT backend: {}'.format(msg))
self.logger.info('Received message on the MQTT backend: %s', msg)
try:
self.on_message(msg)
@ -457,9 +458,10 @@ class MqttBackend(Backend):
client.start()
self.logger.info(
'Initialized MQTT backend on host {}:{}, topic {}'.format(
self.host, self.port, self.topic
)
'Initialized MQTT backend on host %s:%d, topic=%s',
self.host,
self.port,
self.topic,
)
self.add_listeners(*self.listeners_conf)
@ -471,7 +473,7 @@ class MqttBackend(Backend):
try:
listener.stop()
except Exception as e:
self.logger.warning(f'Could not stop MQTT listener: {e}')
self.logger.warning('Could not stop MQTT listener: %s', e)
self.logger.info('MQTT backend terminated')

View File

@ -3,7 +3,11 @@ from typing import Collection, Optional
from ._base import Entity, get_entities_registry, init_entities_db
from ._engine import EntitiesEngine
from ._managers import register_entity_manager, get_plugin_entity_registry
from ._managers import (
EntityManager,
get_plugin_entity_registry,
register_entity_manager,
)
from ._managers.lights import LightEntityManager
from ._managers.sensors import SensorEntityManager
from ._managers.switches import (
@ -50,6 +54,7 @@ __all__ = (
'DimmerEntityManager',
'EntitiesEngine',
'Entity',
'EntityManager',
'EnumSwitchEntityManager',
'LightEntityManager',
'SensorEntityManager',

View File

@ -43,7 +43,7 @@ class EntitiesQueue(Queue):
def put(self, *entities: Entity, block=True, timeout=None):
"""
This methood is called by an entity manager to update and persist the
This method is called by an entity manager to update and persist the
state of some entities.
"""
for entity in entities:

View File

@ -78,7 +78,9 @@ class EntityManager(ABC):
return entities
def publish_entities(self, entities: Optional[Collection[Any]]):
def publish_entities(
self, entities: Optional[Collection[Any]]
) -> Collection[Entity]:
"""
Publishes a list of entities. The downstream consumers include:
@ -99,6 +101,7 @@ class EntityManager(ABC):
)
publish_entities(transformed_entities)
return transformed_entities
def register_entity_manager(cls: Type[EntityManager]):
@ -106,11 +109,7 @@ def register_entity_manager(cls: Type[EntityManager]):
Associates a plugin as a manager for a certain entity type.
You usually don't have to call this method directly.
"""
entity_managers = [
c
for c in inspect.getmro(cls)
if issubclass(c, EntityManager) and c not in {cls, EntityManager}
]
entity_managers = [c for c in inspect.getmro(cls) if issubclass(c, EntityManager)]
plugin_name = get_plugin_name_by_class(cls) or ''
redis = get_redis()

View File

@ -1,9 +1,30 @@
from abc import ABC, abstractmethod
from typing import Any, Optional
from typing_extensions import override
from . import EntityManager
class SwitchEntityManager(EntityManager, ABC):
class WriteableEntityManager(EntityManager, ABC):
"""
Base class for integrations that support entities whose values can be set.
"""
@abstractmethod
def set(self, entity: str, value: Any, attribute: Optional[str] = None, **kwargs):
"""
Set the value of an entity.
:param entity: The entity to set the value for. It's usually the ID of
the entity provided by the plugin.
:param value: The value to set the entity to.
:param attribute: The name of the attribute to set for the entity, if
required by the integration.
"""
raise NotImplementedError()
class SwitchEntityManager(WriteableEntityManager, ABC):
"""
Base class for integrations that support binary switches.
"""
@ -23,31 +44,19 @@ class SwitchEntityManager(EntityManager, ABC):
"""Toggle the state of a device (on->off or off->on)"""
raise NotImplementedError()
class MultiLevelSwitchEntityManager(EntityManager, ABC):
"""
Base class for integrations that support dimmers/multi-level/enum switches.
Don't extend this class directly. Instead, use on of the available
intermediate abstract classes - like ``DimmerEntityManager`` or
``EnumSwitchEntityManager``.
"""
@abstractmethod
def set_value( # pylint: disable=redefined-builtin
self, *_, property=None, data=None, **__
):
"""Set a value"""
raise NotImplementedError()
@override
def set(self, entity: str, value: Any, attribute: Optional[str] = None, **kwargs):
method = self.on if value else self.off
return method(entity, **kwargs)
class DimmerEntityManager(MultiLevelSwitchEntityManager, ABC):
class DimmerEntityManager(WriteableEntityManager, ABC):
"""
Base class for integrations that support dimmers/multi-level switches.
"""
class EnumSwitchEntityManager(MultiLevelSwitchEntityManager, ABC):
class EnumSwitchEntityManager(WriteableEntityManager, ABC):
"""
Base class for integrations that support switches with a pre-defined,
enum-like set of possible values.

View File

@ -1,74 +0,0 @@
from typing import Optional
from platypush.message.event import Event
class BluetoothEvent(Event):
pass
class BluetoothDeviceFoundEvent(Event):
"""
Event triggered when a bluetooth device is found during a scan.
"""
def __init__(self, address: str, name: Optional[str] = None, *args, **kwargs):
super().__init__(*args, address=address, name=name, **kwargs)
class BluetoothDeviceLostEvent(Event):
"""
Event triggered when a bluetooth device previously scanned is lost.
"""
def __init__(self, address: str, name: Optional[str] = None, *args, **kwargs):
super().__init__(*args, address=address, name=name, **kwargs)
class BluetoothDeviceConnectedEvent(Event):
"""
Event triggered on bluetooth device connection
"""
def __init__(self, address: str = None, port: str = None, *args, **kwargs):
super().__init__(*args, address=address, port=port, **kwargs)
class BluetoothDeviceDisconnectedEvent(Event):
"""
Event triggered on bluetooth device disconnection
"""
def __init__(self, address: str = None, port: str = None, *args, **kwargs):
super().__init__(*args, address=address, port=port, **kwargs)
class BluetoothConnectionRejectedEvent(Event):
"""
Event triggered on bluetooth device connection rejected
"""
def __init__(self, address: str = None, port: str = None, *args, **kwargs):
super().__init__(*args, address=address, port=port, **kwargs)
class BluetoothFilePutRequestEvent(Event):
"""
Event triggered on bluetooth device file transfer put request
"""
def __init__(self, address: str = None, port: str = None, *args, **kwargs):
super().__init__(*args, address=address, port=port, **kwargs)
class BluetoothFileGetRequestEvent(Event):
"""
Event triggered on bluetooth device file transfer get request
"""
def __init__(self, address: str = None, port: str = None, *args, **kwargs):
super().__init__(*args, address=address, port=port, **kwargs)
class BluetoothFileReceivedEvent(Event):
"""
Event triggered on bluetooth device file transfer put request
"""
def __init__(self, path: str = None, *args, **kwargs):
super().__init__(*args, path=path, **kwargs)
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,75 @@
from typing import Optional
from platypush.message.event import Event
class BluetoothEvent(Event):
"""
Base class for Bluetooth events.
"""
def __init__(self, address: str, *args, name: Optional[str] = None, **kwargs):
super().__init__(*args, address=address, name=name, **kwargs)
class BluetoothWithPortEvent(BluetoothEvent):
"""
Base class for Bluetooth events that include a communication port.
"""
def __init__(self, *args, port: Optional[str] = None, **kwargs):
super().__init__(*args, port=port, **kwargs)
class BluetoothDeviceFoundEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is found during a scan.
"""
class BluetoothDeviceLostEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device previously scanned is lost.
"""
class BluetoothDeviceConnectedEvent(BluetoothWithPortEvent):
"""
Event triggered when a Bluetooth device is connected.
"""
class BluetoothDeviceDisconnectedEvent(BluetoothWithPortEvent):
"""
Event triggered when a Bluetooth device is disconnected.
"""
class BluetoothConnectionRejectedEvent(BluetoothWithPortEvent):
"""
Event triggered when a Bluetooth connection is rejected.
"""
class BluetoothFilePutRequestEvent(BluetoothWithPortEvent):
"""
Event triggered when a file put request is received.
"""
class BluetoothFileGetRequestEvent(BluetoothWithPortEvent):
"""
Event triggered when a file get request is received.
"""
class BluetoothFileReceivedEvent(BluetoothEvent):
"""
Event triggered when a file transfer is completed.
"""
def __init__(self, *args, path: str, **kwargs):
super().__init__(*args, path=path, **kwargs)
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,105 @@
from typing import Collection, Optional
from platypush.message.event import Event
class BluetoothEvent(Event):
"""
Base class for Bluetooth Low-Energy device events.
"""
def __init__(
self,
*args,
address: str,
connected: bool,
paired: bool,
trusted: bool,
blocked: bool,
name: Optional[str] = None,
service_uuids: Optional[Collection[str]] = None,
**kwargs
):
"""
:param address: The Bluetooth address of the device.
:param connected: Whether the device is connected.
:param paired: Whether the device is paired.
:param trusted: Whether the device is trusted.
:param blocked: Whether the device is blocked.
:param name: The name of the device.
:param service_uuids: The service UUIDs of the device.
"""
super().__init__(
*args,
address=address,
name=name,
connected=connected,
paired=paired,
blocked=blocked,
trusted=trusted,
service_uuids=service_uuids or [],
**kwargs
)
class BluetoothDeviceFoundEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is discovered during a scan.
"""
class BluetoothDeviceLostEvent(BluetoothEvent):
"""
Event triggered when a previously discovered Bluetooth device is lost.
"""
class BluetoothDeviceConnectedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is connected.
"""
class BluetoothDeviceDisconnectedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is disconnected.
"""
class BluetoothDevicePairedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is paired.
"""
class BluetoothDeviceUnpairedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is unpaired.
"""
class BluetoothDeviceBlockedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is blocked.
"""
class BluetoothDeviceUnblockedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is unblocked.
"""
class BluetoothDeviceTrustedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is trusted.
"""
class BluetoothDeviceUntrustedEvent(BluetoothEvent):
"""
Event triggered when a Bluetooth device is untrusted.
"""
# vim:sw=4:ts=4:et:

View File

@ -206,9 +206,13 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC):
self._loop = asyncio.new_event_loop()
if self._should_start_runner:
self._run_listener()
self.wait_stop()
while not self.should_stop():
try:
self._run_listener()
finally:
self.wait_stop(self.poll_interval)
else:
self.wait_stop()
def stop(self):
if self._loop and self._loop.is_running():

View File

@ -1,275 +1,352 @@
import base64
import os
import subprocess
import sys
import time
from typing import Optional, Dict
from contextlib import asynccontextmanager
from threading import RLock
from typing import AsyncGenerator, Collection, List, Optional, Dict, Type, Union
from uuid import UUID
from platypush.plugins import action
from platypush.plugins.sensor import SensorPlugin
from platypush.message.response.bluetooth import BluetoothScanResponse, BluetoothDiscoverPrimaryResponse, \
BluetoothDiscoverCharacteristicsResponse
from bleak import BleakClient, BleakScanner
from bleak.backends.device import BLEDevice
from typing_extensions import override
from platypush.context import get_bus, get_or_create_event_loop
from platypush.entities import Entity, EntityManager
from platypush.entities.devices import Device
from platypush.message.event.bluetooth.ble import (
BluetoothDeviceBlockedEvent,
BluetoothDeviceConnectedEvent,
BluetoothDeviceDisconnectedEvent,
BluetoothDeviceFoundEvent,
BluetoothDeviceLostEvent,
BluetoothDevicePairedEvent,
BluetoothDeviceTrustedEvent,
BluetoothDeviceUnblockedEvent,
BluetoothDeviceUnpairedEvent,
BluetoothDeviceUntrustedEvent,
BluetoothEvent,
)
from platypush.plugins import AsyncRunnablePlugin, action
UUIDType = Union[str, UUID]
class BluetoothBlePlugin(SensorPlugin):
class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
"""
Bluetooth BLE (low-energy) plugin
Plugin to interact with BLE (Bluetooth Low-Energy) devices.
Note that the support for Bluetooth low-energy devices requires a Bluetooth
adapter compatible with the Bluetooth 5.0 specification or higher.
Requires:
* **pybluez** (``pip install pybluez``)
* **gattlib** (``pip install gattlib``)
* **bleak** (``pip install bleak``)
Note that the support for bluetooth low-energy devices on Linux requires:
* A bluetooth adapter compatible with the bluetooth 5.0 specification or higher;
* To run platypush with root privileges (which is usually a very bad idea), or to set the raw net
capabilities on the Python executable (which is also a bad idea, because any Python script will
be able to access the kernel raw network API, but it's probably better than running a network
server that can execute system commands as root user). If you don't want to set special permissions
on the main Python executable and you want to run the bluetooth LTE plugin then the advised approach
is to install platypush in a virtual environment and set the capabilities on the venv python executable,
or run your platypush instance in Docker.
You can set the capabilities on the Python executable through the following shell command::
[sudo] setcap 'cap_net_raw,cap_net_admin+eip' /path/to/your/python
TODO: Write supported events.
"""
def __init__(self, interface: str = 'hci0', **kwargs):
# Default connection timeout (in seconds)
_default_connect_timeout = 5
def __init__(
self,
interface: Optional[str] = None,
connect_timeout: float = _default_connect_timeout,
device_names: Optional[Dict[str, str]] = None,
service_uuids: Optional[Collection[UUIDType]] = None,
**kwargs,
):
"""
:param interface: Default adapter device to be used (default: 'hci0')
:param interface: Name of the Bluetooth interface to use (e.g. ``hci0``
on Linux). Default: first available interface.
:param connect_timeout: Timeout in seconds for the connection to a
Bluetooth device. Default: 5 seconds.
:param service_uuids: List of service UUIDs to discover. Default: all.
:param device_names: Bluetooth address -> device name mapping. If not
specified, the device's advertised name will be used, or its
Bluetooth address. Example:
.. code-block:: json
{
"00:11:22:33:44:55": "Switchbot",
"00:11:22:33:44:56": "Headphones",
"00:11:22:33:44:57": "Button"
}
"""
super().__init__(**kwargs)
self.interface = interface
self._req_by_addr = {}
@staticmethod
def _get_python_interpreter() -> str:
exe = sys.executable
self._interface = interface
self._connect_timeout = connect_timeout
self._service_uuids = service_uuids
self._scan_lock = RLock()
self._connections: Dict[str, BleakClient] = {}
self._devices: Dict[str, BLEDevice] = {}
self._device_name_by_addr = device_names or {}
self._device_addr_by_name = {
name: addr for addr, name in self._device_name_by_addr.items()
}
while os.path.islink(exe):
target = os.readlink(exe)
if not os.path.isabs(target):
target = os.path.abspath(os.path.join(os.path.dirname(exe), target))
exe = target
return exe
@staticmethod
def _python_has_ble_capabilities(exe: str) -> bool:
getcap = subprocess.Popen(['getcap', exe], stdout=subprocess.PIPE)
output = getcap.communicate()[0].decode().split('\n')
if not output:
return False
caps = output[0]
return ('cap_net_raw+eip' in caps or 'cap_net_raw=eip' in caps) and 'cap_net_admin' in caps
def _check_ble_support(self):
# Check if the script is running as root or if the Python executable
# has 'cap_net_admin,cap_net_raw+eip' capabilities
exe = self._get_python_interpreter()
assert os.getuid() == 0 or self._python_has_ble_capabilities(exe), '''
You are not running platypush as root and the Python interpreter has no
capabilities/permissions to access the BLE stack. Set the permissions on
your Python interpreter through:
[sudo] setcap "cap_net_raw,cap_net_admin+eip" {}'''.format(exe)
@action
def scan(self, interface: Optional[str] = None, duration: int = 10) -> BluetoothScanResponse:
async def _get_device(self, device: str) -> BLEDevice:
"""
Scan for nearby bluetooth low-energy devices
:param interface: Bluetooth adapter name to use (default configured if None)
:param duration: Scan duration in seconds
Utility method to get a device by name or address.
"""
from bluetooth.ble import DiscoveryService
addr = (
self._device_addr_by_name[device]
if device in self._device_addr_by_name
else device
)
if interface is None:
interface = self.interface
if addr not in self._devices:
self.logger.info('Scanning for unknown device "%s"', device)
await self._scan()
self._check_ble_support()
svc = DiscoveryService(interface)
devices = svc.discover(duration)
return BluetoothScanResponse(devices)
dev = self._devices.get(addr)
assert dev is not None, f'Unknown device: "{device}"'
return dev
@action
def get_measurement(self, interface: Optional[str] = None, duration: Optional[int] = 10, *args, **kwargs) \
-> Dict[str, dict]:
"""
Wrapper for ``scan`` that returns bluetooth devices in a format usable by sensor backends.
def _get_device_name(self, device: BLEDevice) -> str:
return (
self._device_name_by_addr.get(device.address)
or device.name
or device.address
)
:param interface: Bluetooth adapter name to use (default configured if None)
:param duration: Scan duration in seconds
:return: Device address -> info map.
"""
devices = self.scan(interface=interface, duration=duration).output
return {device['addr']: device for device in devices}
def _post_event(
self, event_type: Type[BluetoothEvent], device: BLEDevice, **kwargs
):
props = device.details.get('props', {})
get_bus().post(
event_type(
address=device.address,
name=self._get_device_name(device),
connected=props.get('Connected', False),
paired=props.get('Paired', False),
blocked=props.get('Blocked', False),
trusted=props.get('Trusted', False),
service_uuids=device.metadata.get('uuids', []),
**kwargs,
)
)
# noinspection PyArgumentList
@action
def connect(self, device: str, interface: str = None, wait: bool = True, channel_type: str = 'public',
security_level: str = 'low', psm: int = 0, mtu: int = 0, timeout: float = 10.0):
"""
Connect to a bluetooth LE device
def _on_device_event(self, device: BLEDevice, _):
event_types: List[Type[BluetoothEvent]] = []
existing_device = self._devices.get(device.address)
:param device: Device address to connect to
:param interface: Bluetooth adapter name to use (default configured if None)
:param wait: If True then wait for the connection to be established before returning (no timeout)
:param channel_type: Channel type, usually 'public' or 'random'
:param security_level: Security level - possible values: ['low', 'medium', 'high']
:param psm: PSM value (default: 0)
:param mtu: MTU value (default: 0)
:param timeout: Connection timeout if wait is not set (default: 10 seconds)
"""
from gattlib import GATTRequester
if existing_device:
old_props = existing_device.details.get('props', {})
new_props = device.details.get('props', {})
req = self._req_by_addr.get(device)
if req:
if req.is_connected():
self.logger.info('Device {} is already connected'.format(device))
return
if old_props.get('Paired') != new_props.get('Paired'):
event_types.append(
BluetoothDevicePairedEvent
if new_props.get('Paired')
else BluetoothDeviceUnpairedEvent
)
self._req_by_addr[device] = None
if old_props.get('Connected') != new_props.get('Connected'):
event_types.append(
BluetoothDeviceConnectedEvent
if new_props.get('Connected')
else BluetoothDeviceDisconnectedEvent
)
if not interface:
interface = self.interface
if interface:
req = GATTRequester(device, False, interface)
if old_props.get('Blocked') != new_props.get('Blocked'):
event_types.append(
BluetoothDeviceBlockedEvent
if new_props.get('Blocked')
else BluetoothDeviceUnblockedEvent
)
if old_props.get('Trusted') != new_props.get('Trusted'):
event_types.append(
BluetoothDeviceTrustedEvent
if new_props.get('Trusted')
else BluetoothDeviceUntrustedEvent
)
else:
req = GATTRequester(device, False)
event_types.append(BluetoothDeviceFoundEvent)
self.logger.info('Connecting to {}'.format(device))
connect_start_time = time.time()
req.connect(wait, channel_type, security_level, psm, mtu)
self._devices[device.address] = device
if not wait:
while not req.is_connected():
if time.time() - connect_start_time > timeout:
raise TimeoutError('Connection to {} timed out'.format(device))
time.sleep(0.1)
if event_types:
for event_type in event_types:
self._post_event(event_type, device)
self.publish_entities([device])
self.logger.info('Connected to {}'.format(device))
self._req_by_addr[device] = req
@asynccontextmanager
async def _connect(
self,
device: str,
interface: Optional[str] = None,
timeout: Optional[float] = None,
) -> AsyncGenerator[BleakClient, None]:
dev = await self._get_device(device)
async with BleakClient(
dev.address,
adapter=interface or self._interface,
timeout=timeout or self._connect_timeout,
) as client:
self._connections[dev.address] = client
yield client
self._connections.pop(dev.address)
@action
def read(self, device: str, interface: str = None, uuid: str = None, handle: int = None,
binary: bool = False, disconnect_on_recv: bool = True, **kwargs) -> str:
"""
Read a message from a device
:param device: Device address to connect to
:param interface: Bluetooth adapter name to use (default configured if None)
:param uuid: Service UUID. Either the UUID or the device handle must be specified
:param handle: Device handle. Either the UUID or the device handle must be specified
:param binary: Set to true to return data as a base64-encoded binary string
:param disconnect_on_recv: If True (default) disconnect when the response is received
:param kwargs: Extra arguments to be passed to :meth:`connect`
"""
if interface is None:
interface = self.interface
if not (uuid or handle):
raise AttributeError('Specify either uuid or handle')
self.connect(device, interface=interface, **kwargs)
req = self._req_by_addr[device]
if uuid:
data = req.read_by_uuid(uuid)[0]
else:
data = req.read_by_handle(handle)[0]
if binary:
data = base64.encodebytes(data.encode() if isinstance(data, str) else data).decode().strip()
if disconnect_on_recv:
self.disconnect(device)
async def _read(
self,
device: str,
service_uuid: UUIDType,
interface: Optional[str] = None,
connect_timeout: Optional[float] = None,
) -> bytearray:
async with self._connect(device, interface, connect_timeout) as client:
data = await client.read_gatt_char(service_uuid)
return data
async def _write(
self,
device: str,
data: bytes,
service_uuid: UUIDType,
interface: Optional[str] = None,
connect_timeout: Optional[float] = None,
):
async with self._connect(device, interface, connect_timeout) as client:
await client.write_gatt_char(service_uuid, data)
async def _scan(
self,
duration: Optional[float] = None,
service_uuids: Optional[Collection[UUIDType]] = None,
publish_entities: bool = False,
) -> Collection[Entity]:
with self._scan_lock:
timeout = duration or self.poll_interval or 5
devices = await BleakScanner.discover(
adapter=self._interface,
timeout=timeout,
service_uuids=list(
map(str, service_uuids or self._service_uuids or [])
),
detection_callback=self._on_device_event,
)
# TODO Infer type from device.metadata['manufacturer_data']
self._devices.update({dev.address: dev for dev in devices})
if publish_entities:
entities = self.publish_entities(devices)
else:
entities = self.transform_entities(devices)
return entities
@action
def write(self, device: str, data, handle: int = None, interface: str = None, binary: bool = False,
disconnect_on_recv: bool = True, **kwargs) -> str:
def scan(
self,
duration: Optional[float] = None,
service_uuids: Optional[Collection[UUIDType]] = None,
):
"""
Scan for Bluetooth devices nearby.
:param duration: Scan duration in seconds (default: same as the plugin's
`poll_interval` configuration parameter)
:param service_uuids: List of service UUIDs to discover. Default: all.
"""
loop = get_or_create_event_loop()
loop.run_until_complete(
self._scan(duration, service_uuids, publish_entities=True)
)
@action
def read(
self,
device: str,
service_uuid: UUIDType,
interface: Optional[str] = None,
connect_timeout: Optional[float] = None,
) -> str:
"""
Read a message from a device.
:param device: Name or address of the device to read from.
:param service_uuid: Service UUID.
:param interface: Bluetooth adapter name to use (default configured if None).
:param connect_timeout: Connection timeout in seconds (default: same as the
configured `connect_timeout`).
:return: The base64-encoded response received from the device.
"""
loop = get_or_create_event_loop()
data = loop.run_until_complete(
self._read(device, service_uuid, interface, connect_timeout)
)
return base64.b64encode(data).decode()
@action
def write(
self,
device: str,
data: Union[str, bytes],
service_uuid: UUIDType,
interface: Optional[str] = None,
connect_timeout: Optional[float] = None,
):
"""
Writes data to a device
:param device: Device address to connect to
:param data: Data to be written (str or bytes)
:param device: Name or address of the device to read from.
:param data: Data to be written, either as bytes or as a base64-encoded string.
:param service_uuid: Service UUID.
:param interface: Bluetooth adapter name to use (default configured if None)
:param handle: Device handle. Either the UUID or the device handle must be specified
:param binary: Set to true if data is a base64-encoded binary string
:param disconnect_on_recv: If True (default) disconnect when the response is received
:param kwargs: Extra arguments to be passed to :meth:`connect`
:param connect_timeout: Connection timeout in seconds (default: same as the
configured `connect_timeout`).
"""
if interface is None:
interface = self.interface
if binary:
data = base64.decodebytes(data.encode() if isinstance(data, str) else data)
loop = get_or_create_event_loop()
if isinstance(data, str):
data = base64.b64decode(data.encode())
self.connect(device, interface=interface, **kwargs)
req = self._req_by_addr[device]
data = req.write_by_handle(handle, data)[0]
if binary:
data = base64.encodebytes(data.encode() if isinstance(data, str) else data).decode().strip()
if disconnect_on_recv:
self.disconnect(device)
return data
loop.run_until_complete(
self._write(device, data, service_uuid, interface, connect_timeout)
)
@override
@action
def disconnect(self, device: str):
def status(self, *_, **__) -> Collection[Entity]:
"""
Disconnect from a connected device
:param device: Device address
Alias for :meth:`.scan`.
"""
req = self._req_by_addr.get(device)
if not req:
self.logger.info('Device {} not connected'.format(device))
return self.scan().output
req.disconnect()
self.logger.info('Device {} disconnected'.format(device))
@override
def transform_entities(self, entities: Collection[BLEDevice]) -> Collection[Device]:
return [
Device(
id=dev.address,
name=self._get_device_name(dev),
)
for dev in entities
]
@action
def discover_primary(self, device: str, interface: str = None, **kwargs) -> BluetoothDiscoverPrimaryResponse:
"""
Discover the primary services advertised by a LE bluetooth device
@override
async def listen(self):
device_addresses = set()
:param device: Device address to connect to
:param interface: Bluetooth adapter name to use (default configured if None)
:param kwargs: Extra arguments to be passed to :meth:`connect`
"""
if interface is None:
interface = self.interface
while True:
entities = await self._scan()
new_device_addresses = {e.id for e in entities}
missing_device_addresses = device_addresses - new_device_addresses
missing_devices = [
dev
for addr, dev in self._devices.items()
if addr in missing_device_addresses
]
self.connect(device, interface=interface, **kwargs)
req = self._req_by_addr[device]
services = req.discover_primary()
self.disconnect(device)
return BluetoothDiscoverPrimaryResponse(services=services)
for dev in missing_devices:
self._post_event(BluetoothDeviceLostEvent, dev)
self._devices.pop(dev.address, None)
@action
def discover_characteristics(self, device: str, interface: str = None, **kwargs) \
-> BluetoothDiscoverCharacteristicsResponse:
"""
Discover the characteristics of a LE bluetooth device
:param device: Device address to connect to
:param interface: Bluetooth adapter name to use (default configured if None)
:param kwargs: Extra arguments to be passed to :meth:`connect`
"""
if interface is None:
interface = self.interface
self.connect(device, interface=interface, **kwargs)
req = self._req_by_addr[device]
characteristics = req.discover_characteristics()
self.disconnect(device)
return BluetoothDiscoverCharacteristicsResponse(characteristics=characteristics)
device_addresses = new_device_addresses
# vim:sw=4:ts=4:et:

View File

@ -2,7 +2,6 @@ manifest:
events: {}
install:
pip:
- pybluez
- gattlib
- bleak
package: platypush.plugins.bluetooth.ble
type: plugin

View File

@ -33,6 +33,7 @@ from platypush.utils import camel_case_to_snake_case
from ._mappers import DeviceMapper, device_mappers
# pylint: disable=too-many-ancestors
class SmartthingsPlugin(
RunnablePlugin,
DimmerEntityManager,
@ -817,18 +818,18 @@ class SmartthingsPlugin(
:param device: Device ID or name.
:param level: Level, usually a percentage value between 0 and 1.
:param kwarsg: Extra arguments that should be passed to :meth:`.execute`.
:param kwargs: Extra arguments that should be passed to :meth:`.execute`.
"""
return self.set_value(device, Capability.switch_level, level, **kwargs)
def _set_value( # pylint: disable=redefined-builtin
self, device: str, property: Optional[str] = None, data=None, **kwargs
self, device: str, property: Optional[str] = None, value: Any = None, **kwargs
):
if not property:
device, property = self._to_device_and_property(device)
assert property, 'No property name specified'
assert data is not None, 'No value specified'
assert value is not None, 'No value specified'
entity_id = f'{device}:{property}'
entity = self._entities_by_id.get(entity_id)
assert entity, f'No such entity ID: {entity_id}'
@ -837,13 +838,15 @@ class SmartthingsPlugin(
iter([m for m in device_mappers if m.attribute == property]), None
)
assert mapper, f'No mappers found to set {property}={data} on device "{device}"'
assert (
mapper
), f'No mappers found to set {property}={value} on device "{device}"'
assert (
mapper.set_command
), f'The property "{property}" on the device "{device}" cannot be set'
command = (
mapper.set_command(data)
mapper.set_command(value)
if callable(mapper.set_command)
else mapper.set_command
)
@ -852,16 +855,20 @@ class SmartthingsPlugin(
device,
mapper.capability,
command,
args=mapper.set_value_args(data), # type: ignore
args=mapper.set_value_args(value), # type: ignore
**kwargs,
)
return self.status(device)
@action
# pylint: disable=redefined-builtin,arguments-differ
def set_value(
self, device: str, *_, property: Optional[str] = None, data=None, **kwargs
def set(self, entity: str, value: Any, attribute: Optional[str] = None, **kwargs):
super().set(entity, value, attribute, **kwargs)
return self.set_value(entity, property=attribute, value=value, **kwargs)
@action
def set_value( # pylint: disable=redefined-builtin
self, device: str, property: Optional[str] = None, value=None, **kwargs
):
"""
Set the value of a device. It is compatible with the generic
@ -871,10 +878,11 @@ class SmartthingsPlugin(
``device_id:property``.
:param property: Name of the property to be set. If not specified here
then it should be specified on the ``device`` level.
:param data: Value to be set.
:param value: Value to set.
"""
assert device, 'No device specified'
try:
return self._set_value(device, property, data, **kwargs)
return self._set_value(device, property, value, **kwargs)
except Exception as e:
self.logger.exception(e)
raise AssertionError(e) from e

View File

@ -1104,10 +1104,21 @@ class SwitchbotPlugin(
return self._run('post', 'scenes', scenes[0]['id'], 'execute')
@action
# pylint: disable=redefined-builtin,arguments-differ
def set_value(self, device: str, *_, property=None, data=None, **__):
# pylint: disable=redefined-builtin
def set_value(
self, device: str, property: Optional[str] = None, value: Any = None, **__
):
"""
Set the value of a property of a device.
:param device: Device name or ID, or entity (external) ID.
:param property: Property to set. It should be present if you are
passing a root device ID to ``device`` and not an atomic entity in
the format ``<device_id>:<property_name>``.
:param value: Value to set.
"""
entity = self._to_entity(device, property)
assert entity, f'No such entity: "{device}"'
assert entity, f'No such device: "{device}"'
dt = entity.data.get('device_type')
assert dt, f'Could not infer the device type for "{device}"'
@ -1117,7 +1128,11 @@ class SwitchbotPlugin(
assert setter_class, f'No setters found for device type "{device_type}"'
setter = setter_class(entity)
return setter(property=property, value=data)
return setter(property=property, value=value)
@action
def set(self, entity: str, value: Any, attribute: Optional[str] = None, **kwargs):
return self.set_value(entity, property=attribute, value=value, **kwargs)
def _to_entity(
self,

View File

@ -1,12 +1,25 @@
import enum
import time
from typing import Any, Collection, Dict, List, Optional
from typing import Any, Collection, Optional
from uuid import UUID
from bleak.backends.device import BLEDevice
from typing_extensions import override
from platypush.context import get_or_create_event_loop
from platypush.entities import EnumSwitchEntityManager
from platypush.entities.switches import EnumSwitch
from platypush.message.response.bluetooth import BluetoothScanResponse
from platypush.plugins import action
from platypush.plugins.bluetooth.ble import BluetoothBlePlugin
from platypush.plugins.bluetooth.ble import BluetoothBlePlugin, UUIDType
class Command(enum.Enum):
"""
Supported commands.
"""
PRESS = b'\x57\x01\x00'
ON = b'\x57\x01\x01'
OFF = b'\x57\x01\x02'
# pylint: disable=too-many-ancestors
@ -15,202 +28,117 @@ class SwitchbotBluetoothPlugin(BluetoothBlePlugin, EnumSwitchEntityManager):
Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and
programmatically control switches over a Bluetooth interface.
See :class:`platypush.plugins.bluetooth.ble.BluetoothBlePlugin` for how to enable BLE permissions for
the platypush user (a simple solution may be to run it as root, but that's usually NOT a good idea).
Note that this plugin currently only supports Switchbot "bot" devices
(mechanical switch pressers). For support for other devices, you may want
the :class:`platypush.plugins.switchbot.SwitchbotPlugin` integration
(which requires a Switchbot hub).
Requires:
* **pybluez** (``pip install pybluez``)
* **gattlib** (``pip install gattlib``)
* **libboost** (on Debian ```apt-get install libboost-python-dev libboost-thread-dev``)
* **bleak** (``pip install bleak``)
"""
uuid = 'cba20002-224d-11e6-9fb8-0002a5d5c51b'
handle = 0x16
# Map of service names -> UUID prefixes exposed by SwitchBot devices
_uuid_prefixes = {
'tx': '002',
'rx': '003',
'service': 'd00',
}
class Command(enum.Enum):
"""
Base64 encoded commands
"""
# Static list of Bluetooth service UUIDs commonly exposed by SwitchBot
# devices.
_uuids = {
service: UUID(f'cba20{prefix}-224d-11e6-9fb8-0002a5d5c51b')
for service, prefix in _uuid_prefixes.items()
}
# \x57\x01\x00
PRESS = 'VwEA'
# # \x57\x01\x01
ON = 'VwEB'
# # \x57\x01\x02
OFF = 'VwEC'
def __init__(self, *args, **kwargs):
super().__init__(*args, service_uuids=self._uuids.values(), **kwargs)
def __init__(
async def _run(
self,
interface=None,
connect_timeout=None,
scan_timeout=2,
devices=None,
**kwargs
device: str,
command: Command,
service_uuid: UUIDType = _uuids['tx'],
):
"""
:param interface: Bluetooth interface to use (e.g. hci0) default: first available one
:type interface: str
:param connect_timeout: Timeout for the connection to the Switchbot device - default: None
:type connect_timeout: float
:param scan_timeout: Timeout for the scan operations
:type scan_timeout: float
:param devices: Devices to control, as a MAC address -> name map
:type devices: dict
"""
super().__init__(interface=interface, **kwargs)
self.connect_timeout = connect_timeout if connect_timeout else 5
self.scan_timeout = scan_timeout if scan_timeout else 2
self.configured_devices = devices or {}
self.configured_devices_by_name = {
name: addr for addr, name in self.configured_devices.items()
}
def _run(self, device: str, command: Command):
device = self.configured_devices_by_name.get(device, '')
n_tries = 1
try:
self.write(
device,
command.value,
handle=self.handle,
channel_type='random',
binary=True,
)
except Exception as e:
self.logger.exception(e)
n_tries -= 1
if n_tries == 0:
raise e
time.sleep(5)
return self.status(device)
await self._write(device, command.value, service_uuid)
@action
def press(self, device):
def press(self, device: str):
"""
Send a press button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.PRESS)
loop = get_or_create_event_loop()
return loop.run_until_complete(self._run(device, Command.PRESS))
@action
def toggle(self, device, **_):
return self.press(device)
@action
def on(self, device, **_):
def on(self, device: str, **_):
"""
Send a press-on button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.ON)
loop = get_or_create_event_loop()
return loop.run_until_complete(self._run(device, Command.ON))
@action
def off(self, device, **_):
def off(self, device: str, **_):
"""
Send a press-off button command to a device
:param device: Device name or address
:type device: str
"""
return self._run(device, self.Command.OFF)
loop = get_or_create_event_loop()
return loop.run_until_complete(self._run(device, Command.OFF))
@action
# pylint: disable=arguments-differ
def set_value(self, device: str, data: str, *_, **__):
def set_value(self, device: Optional[str] = None, value: Optional[str] = None, **_):
"""
Entity-compatible ``set_value`` method to send a command to a device.
Send a command to a device as a value.
:param device: Device name or address
:param data: Command to send. Possible values are:
:param entity: Device name or address
:param value: Command to send. Possible values are:
- ``on``: Press the button and remain in the pressed state.
- ``off``: Release a previously pressed button.
- ``press``: Press and release the button.
"""
if data == 'on':
return self.on(device)
if data == 'off':
return self.off(device)
if data == 'press':
return self.press(device)
assert device, 'No device specified'
if value == 'on':
self.on(device)
if value == 'off':
self.off(device)
if value == 'press':
self.press(device)
self.logger.warning('Unknown command for SwitchBot "%s": "%s"', device, data)
return None
self.logger.warning('Unknown command for SwitchBot "%s": "%s"', device, value)
@action
def scan(
self, interface: Optional[str] = None, duration: int = 10
) -> BluetoothScanResponse:
"""
Scan for available Switchbot devices nearby.
@override
def set(self, entity: str, value: Any, attribute: Optional[str] = None, **kwargs):
return self.set_value(entity, value, **kwargs)
:param interface: Bluetooth interface to scan (default: default configured interface)
:param duration: Scan duration in seconds
"""
compatible_devices: Dict[str, Any] = {}
devices = (
super().scan(interface=interface, duration=duration).devices # type: ignore
)
for dev in devices:
try:
characteristics = [
chrc
for chrc in self.discover_characteristics( # type: ignore
dev['addr'],
channel_type='random',
wait=False,
timeout=self.scan_timeout,
).characteristics
if chrc.get('uuid') == self.uuid
]
if characteristics:
compatible_devices[dev['addr']] = None
except Exception as e:
self.logger.warning('Device scan error: %s', e)
self.publish_entities(compatible_devices)
return BluetoothScanResponse(devices=compatible_devices)
@action
def status(self, *_, **__) -> List[dict]:
self.publish_entities(self.configured_devices)
return [
{
'address': addr,
'id': addr,
'name': name,
'on': False,
}
for addr, name in self.configured_devices.items()
]
def transform_entities(self, entities: Collection[dict]) -> Collection[EnumSwitch]:
@override
def transform_entities(
self, entities: Collection[BLEDevice]
) -> Collection[EnumSwitch]:
devices = super().transform_entities(entities)
return [
EnumSwitch(
id=addr,
name=name,
value='on',
id=dev.id,
name=dev.name,
value=None,
values=['on', 'off', 'press'],
is_write_only=True,
)
for addr, name in entities
for dev in devices
]

View File

@ -2,12 +2,6 @@ manifest:
events: {}
install:
pip:
- pybluez
- gattlib
apt:
- libboost-python-dev
- libboost-thread-dev
pacman:
- boost-libs
- bleak
package: platypush.plugins.switchbot.bluetooth
type: plugin

View File

@ -13,6 +13,7 @@ from typing import (
Type,
Union,
)
from typing_extensions import override
from platypush.entities import (
DimmerEntityManager,
@ -48,6 +49,7 @@ from platypush.plugins import RunnablePlugin
from platypush.plugins.mqtt import MqttPlugin, action
# pylint: disable=too-many-ancestors
class ZigbeeMqttPlugin(
RunnablePlugin,
MqttPlugin,
@ -56,7 +58,7 @@ class ZigbeeMqttPlugin(
LightEntityManager,
SensorEntityManager,
SwitchEntityManager,
): # lgtm [py/missing-call-to-init]
):
"""
This plugin allows you to interact with Zigbee devices over MQTT through any Zigbee sniffer and
`zigbee2mqtt <https://www.zigbee2mqtt.io/>`_.
@ -616,7 +618,6 @@ class ZigbeeMqttPlugin(
"genLevelCtrl",
"touchlink",
"lightingColorCtrl",
"manuSpecificUbisysDimmerSetup"
],
"output": [
"genOta"
@ -925,7 +926,7 @@ class ZigbeeMqttPlugin(
if not exposes:
return {}
# If the device has no queriable properties, don't specify a reply
# If the device has no queryable properties, don't specify a reply
# topic to listen on
req = self._build_device_get_request(exposes)
reply_topic = self._topic(device)
@ -1072,9 +1073,9 @@ class ZigbeeMqttPlugin(
return properties
@action
# pylint: disable=redefined-builtin,arguments-differ
# pylint: disable=redefined-builtin
def set_value(
self, device: str, *_, property: Optional[str] = None, data=None, **kwargs
self, device: str, property: Optional[str] = None, data=None, **kwargs
):
"""
Entity-compatible way of setting a value on a node.
@ -1094,6 +1095,11 @@ class ZigbeeMqttPlugin(
self.device_set(dev, property, data, **kwargs)
@override
@action
def set(self, entity: str, value: Any, attribute: Optional[str] = None, **kwargs):
return self.set_value(entity, data=value, property=attribute, **kwargs)
@action
def device_check_ota_updates(self, device: str, **kwargs) -> dict:
"""
@ -1578,13 +1584,13 @@ class ZigbeeMqttPlugin(
assert device_info, f'No such device: {name}'
name = self._preferred_name(device_info)
prop = self._get_properties(device_info).get(prop)
prop_info = self._get_properties(device_info).get(prop)
option = self._get_options(device_info).get(prop)
if option:
return name, option
assert prop, f'No such property on device {name}: {prop}'
return name, prop
assert prop_info, f'No such property on device {name}: {prop}'
return name, prop_info
@staticmethod
def _is_read_only(feature: dict) -> bool:

View File

@ -325,7 +325,7 @@ class ZwaveBasePlugin(
@abstractmethod
@action
def set_value( # pylint: disable=arguments-differ
def set_value(
self,
data,
value_id: Optional[int] = None,
@ -347,6 +347,12 @@ class ZwaveBasePlugin(
"""
raise NotImplementedError
@action
def set(self, entity: str, value: Any, attribute: Optional[str] = None, **kwargs):
return self.set_value(
value_id=entity, id_on_network=entity, data=value, **kwargs
)
@abstractmethod
@action
def set_value_label(

View File

@ -175,8 +175,7 @@ setup(
'alexa': ['avs @ https://github.com/BlackLight/avs/tarball/master'],
# Support for bluetooth devices
'bluetooth': [
'pybluez',
'gattlib',
'bleak',
'pyobex @ https://github.com/BlackLight/PyOBEX/tarball/master',
],
# Support for TP-Link devices