From b43017ef01f9161c0eec3126349fc0a7ce4ec427 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 15 Apr 2023 01:25:04 +0200 Subject: [PATCH] Refactoring the `system` plugin to support entities. --- platypush/plugins/system/__init__.py | 441 +++++++++++++++++---------- 1 file changed, 276 insertions(+), 165 deletions(-) diff --git a/platypush/plugins/system/__init__.py b/platypush/plugins/system/__init__.py index 085f0526b6..b4b88a5c7e 100644 --- a/platypush/plugins/system/__init__.py +++ b/platypush/plugins/system/__init__.py @@ -2,18 +2,47 @@ import socket from datetime import datetime from typing import Union, List, Optional, Dict +from typing_extensions import override -from platypush.message.response.system import CpuInfoResponse, CpuTimesResponse, CpuResponseList, CpuStatsResponse, \ - CpuFrequencyResponse, VirtualMemoryUsageResponse, SwapMemoryUsageResponse, DiskResponseList, \ - DiskPartitionResponse, DiskUsageResponse, DiskIoCountersResponse, NetworkIoCountersResponse, NetworkResponseList, \ - NetworkConnectionResponse, NetworkAddressResponse, NetworkInterfaceStatsResponse, SensorTemperatureResponse, \ - SensorResponseList, SensorFanResponse, SensorBatteryResponse, ConnectedUserResponseList, ConnectUserResponse, \ - ProcessResponseList, ProcessResponse - -from platypush.plugins import Plugin, action +from platypush.entities import Entity +from platypush.entities.managers import EntityManager +from platypush.entities.system import CpuInfo as CpuInfoModel +from platypush.message.response.system import ( + CpuTimesResponse, + CpuResponseList, + CpuStatsResponse, + CpuFrequencyResponse, + VirtualMemoryUsageResponse, + SwapMemoryUsageResponse, + DiskResponseList, + DiskPartitionResponse, + DiskUsageResponse, + DiskIoCountersResponse, + NetworkIoCountersResponse, + NetworkResponseList, + NetworkConnectionResponse, + NetworkAddressResponse, + NetworkInterfaceStatsResponse, + SensorTemperatureResponse, + SensorResponseList, + SensorFanResponse, + SensorBatteryResponse, + ConnectedUserResponseList, + ConnectUserResponse, + ProcessResponseList, + ProcessResponse, +) +from platypush.plugins import action +from platypush.plugins.sensor import SensorPlugin +from platypush.schemas.system import ( + CpuInfo, + CpuInfoSchema, + SystemInfoSchema, +) -class SystemPlugin(Plugin): +# pylint: disable=too-many-ancestors +class SystemPlugin(SensorPlugin, EntityManager): """ Plugin to get system info. @@ -24,35 +53,40 @@ class SystemPlugin(Plugin): """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.__cpu_info: Optional[CpuInfo] = None + + @staticmethod + def _entity_args(name: str) -> Dict[str, str]: + return { + 'id': f'system:{name}', + 'name': name, + } + + @property + def _cpu_info(self) -> CpuInfo: + from cpuinfo import get_cpu_info + + if not self.__cpu_info: + # The CPU information won't change while the process is running, so + # it makes sense to cache it only once. + self.__cpu_info = CpuInfoSchema().load(get_cpu_info()) # type: ignore + + return self.__cpu_info # type: ignore + @action - def cpu_info(self) -> CpuInfoResponse: + def cpu_info(self): """ Get CPU info. - :return: :class:`platypush.message.response.system.CpuInfoResponse` + :return: .. schema:: system.CpuInfoSchema """ - from cpuinfo import get_cpu_info - info = get_cpu_info() - - return CpuInfoResponse( - arch=info.get('raw_arch_string'), - bits=info.get('bits'), - count=info.get('count'), - vendor_id=info.get('vendor_id'), - brand=info.get('brand'), - hz_advertised=info.get('hz_advertised_raw')[0], - hz_actual=info.get('hz_actual_raw')[0], - stepping=info.get('stepping'), - model=info.get('model'), - family=info.get('family'), - flags=info.get('flags'), - l1_instruction_cache_size=info.get('l1_instruction_cache_size'), - l1_data_cache_size=info.get('l1_data_cache_size'), - l2_cache_size=info.get('l2_cache_size'), - l3_cache_size=info.get('l3_cache_size'), - ) + return CpuInfoSchema().dump(self._cpu_info) @action - def cpu_times(self, per_cpu=False, percent=False) -> Union[CpuTimesResponse, CpuResponseList]: + def cpu_times( + self, per_cpu=False, percent=False + ) -> Union[CpuTimesResponse, CpuResponseList]: """ Get the CPU times stats. @@ -62,25 +96,30 @@ class SystemPlugin(Plugin): """ import psutil - times = psutil.cpu_times_percent(percpu=per_cpu) if percent else \ - psutil.cpu_times(percpu=per_cpu) + times = ( + psutil.cpu_times_percent(percpu=per_cpu) + if percent + else psutil.cpu_times(percpu=per_cpu) + ) if per_cpu: - return CpuResponseList([ - CpuTimesResponse( - user=t.user, - nice=t.nice, - system=t.system, - idle=t.idle, - iowait=t.iowait, - irq=t.irq, - softirq=t.softirq, - steal=t.steal, - guest=t.guest, - guest_nice=t.guest_nice, - ) - for t in times - ]) + return CpuResponseList( + [ + CpuTimesResponse( + user=t.user, + nice=t.nice, + system=t.system, + idle=t.idle, + iowait=t.iowait, + irq=t.irq, + softirq=t.softirq, + steal=t.steal, + guest=t.guest, + guest_nice=t.guest_nice, + ) + for t in times + ] + ) return CpuTimesResponse( user=times.user, @@ -96,7 +135,9 @@ class SystemPlugin(Plugin): ) @action - def cpu_percent(self, per_cpu: bool = False, interval: Optional[float] = None) -> Union[float, List[float]]: + def cpu_percent( + self, per_cpu: bool = False, interval: Optional[float] = None + ) -> Union[float, List[float]]: """ Get the CPU load percentage. @@ -108,10 +149,11 @@ class SystemPlugin(Plugin): :return: float if ``per_cpu=False``, ``list[float]`` otherwise. """ import psutil + percent = psutil.cpu_percent(percpu=per_cpu, interval=interval) if per_cpu: - return [p for p in percent] + return list(percent) # type: ignore return percent @action @@ -121,6 +163,7 @@ class SystemPlugin(Plugin): :return: :class:`platypush.message.response.system.CpuStatsResponse` """ import psutil + stats = psutil.cpu_stats() return CpuStatsResponse( @@ -131,7 +174,9 @@ class SystemPlugin(Plugin): ) @action - def cpu_frequency(self, per_cpu: bool = False) -> Union[CpuFrequencyResponse, CpuResponseList]: + def cpu_frequency( + self, per_cpu: bool = False + ) -> Union[CpuFrequencyResponse, CpuResponseList]: """ Get CPU stats. @@ -139,17 +184,20 @@ class SystemPlugin(Plugin): :return: :class:`platypush.message.response.system.CpuFrequencyResponse` """ import psutil + freq = psutil.cpu_freq(percpu=per_cpu) if per_cpu: - return CpuResponseList([ - CpuFrequencyResponse( - min=f.min, - max=f.max, - current=f.current, - ) - for f in freq - ]) + return CpuResponseList( + [ + CpuFrequencyResponse( + min=f.min, + max=f.max, + current=f.current, + ) + for f in freq + ] + ) return CpuFrequencyResponse( min=freq.min, @@ -163,6 +211,7 @@ class SystemPlugin(Plugin): Get the average load as a vector that represents the load within the last 1, 5 and 15 minutes. """ import psutil + return psutil.getloadavg() @action @@ -172,6 +221,7 @@ class SystemPlugin(Plugin): :return: list of :class:`platypush.message.response.system.VirtualMemoryUsageResponse` """ import psutil + mem = psutil.virtual_memory() return VirtualMemoryUsageResponse( total=mem.total, @@ -193,6 +243,7 @@ class SystemPlugin(Plugin): :return: list of :class:`platypush.message.response.system.SwapMemoryUsageResponse` """ import psutil + mem = psutil.swap_memory() return SwapMemoryUsageResponse( total=mem.total, @@ -210,18 +261,24 @@ class SystemPlugin(Plugin): :return: list of :class:`platypush.message.response.system.DiskPartitionResponse` """ import psutil + parts = psutil.disk_partitions() - return DiskResponseList([ - DiskPartitionResponse( - device=p.device, - mount_point=p.mountpoint, - fstype=p.fstype, - opts=p.opts, - ) for p in parts - ]) + return DiskResponseList( + [ + DiskPartitionResponse( + device=p.device, + mount_point=p.mountpoint, + fstype=p.fstype, + opts=p.opts, + ) + for p in parts + ] + ) @action - def disk_usage(self, path: Optional[str] = None) -> Union[DiskUsageResponse, DiskResponseList]: + def disk_usage( + self, path: Optional[str] = None + ) -> Union[DiskUsageResponse, DiskResponseList]: """ Get the usage of a mounted disk. @@ -234,29 +291,35 @@ class SystemPlugin(Plugin): if path: usage = psutil.disk_usage(path) return DiskUsageResponse( - path=path, - total=usage.total, - used=usage.used, - free=usage.free, - percent=usage.percent, + path=path, + total=usage.total, + used=usage.used, + free=usage.free, + percent=usage.percent, ) else: - disks = {p.mountpoint: psutil.disk_usage(p.mountpoint) - for p in psutil.disk_partitions()} + disks = { + p.mountpoint: psutil.disk_usage(p.mountpoint) + for p in psutil.disk_partitions() + } - return DiskResponseList([ - DiskUsageResponse( - path=path, - total=disk.total, - used=disk.used, - free=disk.free, - percent=disk.percent, - ) for path, disk in disks.items() - ]) + return DiskResponseList( + [ + DiskUsageResponse( + path=path, + total=disk.total, + used=disk.used, + free=disk.free, + percent=disk.percent, + ) + for path, disk in disks.items() + ] + ) @action - def disk_io_counters(self, disk: Optional[str] = None, per_disk: bool = False) -> \ - Union[DiskIoCountersResponse, DiskResponseList]: + def disk_io_counters( + self, disk: Optional[str] = None, per_disk: bool = False + ) -> Union[DiskIoCountersResponse, DiskResponseList]: """ Get the I/O counter stats for the mounted disks. @@ -293,14 +356,14 @@ class SystemPlugin(Plugin): if not per_disk: return _expand_response(None, io) - return DiskResponseList([ - _expand_response(disk, stats) - for disk, stats in io.items() - ]) + return DiskResponseList( + [_expand_response(disk, stats) for disk, stats in io.items()] + ) @action - def net_io_counters(self, nic: Optional[str] = None, per_nic: bool = False) -> \ - Union[NetworkIoCountersResponse, NetworkResponseList]: + def net_io_counters( + self, nic: Optional[str] = None, per_nic: bool = False + ) -> Union[NetworkIoCountersResponse, NetworkResponseList]: """ Get the I/O counters stats for the network interfaces. @@ -336,14 +399,14 @@ class SystemPlugin(Plugin): if not per_nic: return _expand_response(nic, io) - return NetworkResponseList([ - _expand_response(nic, stats) - for nic, stats in io.items() - ]) + return NetworkResponseList( + [_expand_response(nic, stats) for nic, stats in io.items()] + ) - # noinspection PyShadowingBuiltins @action - def net_connections(self, type: Optional[str] = None) -> Union[NetworkConnectionResponse, NetworkResponseList]: + def net_connections( + self, type: Optional[str] = None + ) -> Union[NetworkConnectionResponse, NetworkResponseList]: """ Get the list of active network connections. On macOS this function requires root privileges. @@ -369,24 +432,30 @@ class SystemPlugin(Plugin): :return: List of :class:`platypush.message.response.system.NetworkConnectionResponse`. """ import psutil + conns = psutil.net_connections(kind=type) - return NetworkResponseList([ - NetworkConnectionResponse( - fd=conn.fd, - family=conn.family.name, - type=conn.type.name, - local_address=conn.laddr[0] if conn.laddr else None, - local_port=conn.laddr[1] if len(conn.laddr) > 1 else None, - remote_address=conn.raddr[0] if conn.raddr else None, - remote_port=conn.raddr[1] if len(conn.raddr) > 1 else None, - status=conn.status, - pid=conn.pid, - ) for conn in conns - ]) + return NetworkResponseList( + [ + NetworkConnectionResponse( + fd=conn.fd, + family=conn.family.name, + type=conn.type.name, + local_address=conn.laddr[0] if conn.laddr else None, + local_port=conn.laddr[1] if len(conn.laddr) > 1 else None, + remote_address=conn.raddr[0] if conn.raddr else None, + remote_port=conn.raddr[1] if len(conn.raddr) > 1 else None, + status=conn.status, + pid=conn.pid, + ) + for conn in conns + ] + ) @action - def net_addresses(self, nic: Optional[str] = None) -> Union[NetworkAddressResponse, NetworkResponseList]: + def net_addresses( + self, nic: Optional[str] = None + ) -> Union[NetworkAddressResponse, NetworkResponseList]: """ Get address info associated to the network interfaces. @@ -395,6 +464,7 @@ class SystemPlugin(Plugin): :class:`platypush.message.response.system.NetworkAddressResponse`. """ import psutil + addrs = psutil.net_if_addrs() def _expand_addresses(_nic, _addrs): @@ -402,22 +472,28 @@ class SystemPlugin(Plugin): for addr in _addrs: if addr.family == socket.AddressFamily.AF_INET: - args.update({ - 'ipv4_address': addr.address, - 'ipv4_netmask': addr.netmask, - 'ipv4_broadcast': addr.broadcast, - }) + args.update( + { + 'ipv4_address': addr.address, + 'ipv4_netmask': addr.netmask, + 'ipv4_broadcast': addr.broadcast, + } + ) elif addr.family == socket.AddressFamily.AF_INET6: - args.update({ - 'ipv6_address': addr.address, - 'ipv6_netmask': addr.netmask, - 'ipv6_broadcast': addr.broadcast, - }) + args.update( + { + 'ipv6_address': addr.address, + 'ipv6_netmask': addr.netmask, + 'ipv6_broadcast': addr.broadcast, + } + ) elif addr.family == socket.AddressFamily.AF_PACKET: - args.update({ - 'mac_address': addr.address, - 'mac_broadcast': addr.broadcast, - }) + args.update( + { + 'mac_address': addr.address, + 'mac_broadcast': addr.broadcast, + } + ) if addr.ptp and not args.get('ptp'): args['ptp'] = addr.ptp @@ -430,13 +506,14 @@ class SystemPlugin(Plugin): addr = addrs[0] return _expand_addresses(nic, addr) - return NetworkResponseList([ - _expand_addresses(nic, addr) - for nic, addr in addrs.items() - ]) + return NetworkResponseList( + [_expand_addresses(nic, addr) for nic, addr in addrs.items()] + ) @action - def net_stats(self, nic: Optional[str] = None) -> Union[NetworkInterfaceStatsResponse, NetworkResponseList]: + def net_stats( + self, nic: Optional[str] = None + ) -> Union[NetworkInterfaceStatsResponse, NetworkResponseList]: """ Get stats about the network interfaces. @@ -445,6 +522,7 @@ class SystemPlugin(Plugin): :class:`platypush.message.response.system.NetworkInterfaceStatsResponse`. """ import psutil + stats = psutil.net_if_stats() def _expand_stats(_nic, _stats): @@ -461,16 +539,18 @@ class SystemPlugin(Plugin): assert stats, 'No such network interface: {}'.format(nic) return _expand_stats(nic, stats[0]) - return NetworkResponseList([ - _expand_stats(nic, addr) - for nic, addr in stats.items() - ]) + return NetworkResponseList( + [_expand_stats(nic, addr) for nic, addr in stats.items()] + ) - # noinspection DuplicatedCode @action - def sensors_temperature(self, sensor: Optional[str] = None, fahrenheit: bool = False) \ - -> Union[SensorTemperatureResponse, List[SensorTemperatureResponse], - Dict[str, Union[SensorTemperatureResponse, List[SensorTemperatureResponse]]]]: + def sensors_temperature( + self, sensor: Optional[str] = None, fahrenheit: bool = False + ) -> Union[ + SensorTemperatureResponse, + List[SensorTemperatureResponse], + Dict[str, Union[SensorTemperatureResponse, List[SensorTemperatureResponse]]], + ]: """ Get stats from the temperature sensors. @@ -478,6 +558,7 @@ class SystemPlugin(Plugin): :param fahrenheit: Return the temperature in Fahrenheit (default: Celsius). """ import psutil + stats = psutil.sensors_temperatures(fahrenheit=fahrenheit) if sensor: @@ -524,7 +605,6 @@ class SystemPlugin(Plugin): return ret - # noinspection DuplicatedCode @action def sensors_fan(self, sensor: Optional[str] = None) -> SensorResponseList: """ @@ -534,27 +614,29 @@ class SystemPlugin(Plugin): :return: List of :class:`platypush.message.response.system.SensorFanResponse`. """ import psutil + stats = psutil.sensors_fans() def _expand_stats(name, _stats): - return SensorResponseList([ - SensorFanResponse( - name=name, - current=s.current, - label=s.label, - ) - for s in _stats - ]) + return SensorResponseList( + [ + SensorFanResponse( + name=name, + current=s.current, + label=s.label, + ) + for s in _stats + ] + ) if sensor: stats = [addr for name, addr in stats.items() if name == sensor] assert stats, 'No such sensor name: {}'.format(sensor) return _expand_stats(sensor, stats[0]) - return SensorResponseList([ - _expand_stats(name, stat) - for name, stat in stats.items() - ]) + return SensorResponseList( + [_expand_stats(name, stat) for name, stat in stats.items()] + ) @action def sensors_battery(self) -> SensorBatteryResponse: @@ -563,6 +645,7 @@ class SystemPlugin(Plugin): :return: List of :class:`platypush.message.response.system.SensorFanResponse`. """ import psutil + stats = psutil.sensors_battery() return SensorBatteryResponse( @@ -578,20 +661,22 @@ class SystemPlugin(Plugin): :return: List of :class:`platypush.message.response.system.ConnectUserResponse`. """ import psutil + users = psutil.users() - return ConnectedUserResponseList([ - ConnectUserResponse( - name=u.name, - terminal=u.terminal, - host=u.host, - started=datetime.fromtimestamp(u.started), - pid=u.pid, - ) - for u in users - ]) + return ConnectedUserResponseList( + [ + ConnectUserResponse( + name=u.name, + terminal=u.terminal, + host=u.host, + started=datetime.fromtimestamp(u.started), + pid=u.pid, + ) + for u in users + ] + ) - # noinspection PyShadowingBuiltins @action def processes(self, filter: Optional[str] = '') -> ProcessResponseList: """ @@ -601,6 +686,7 @@ class SystemPlugin(Plugin): :return: List of :class:`platypush.message.response.system.ProcessResponse`. """ import psutil + processes = [psutil.Process(pid) for pid in psutil.pids()] p_list = [] @@ -652,6 +738,7 @@ class SystemPlugin(Plugin): @staticmethod def _get_process(pid: int): import psutil + return psutil.Process(pid) @action @@ -661,6 +748,7 @@ class SystemPlugin(Plugin): :return: ``True`` if the process exists, ``False`` otherwise. """ import psutil + return psutil.pid_exists(pid) @action @@ -696,7 +784,7 @@ class SystemPlugin(Plugin): self._get_process(pid).kill() @action - def wait(self, pid: int, timeout: int = None): + def wait(self, pid: int, timeout: Optional[int] = None): """ Wait for a process to terminate. @@ -705,5 +793,28 @@ class SystemPlugin(Plugin): """ self._get_process(pid).wait(timeout) + @override + @action + def get_measurement(self, *_, **__): + """ + :return: .. schema:: system.SystemInfoSchema + """ + ret = SystemInfoSchema().dump( + { + 'cpu_info': self._cpu_info, + } + ) + + return ret + + @override + def transform_entities(self, entities: dict) -> List[Entity]: + return [ + CpuInfoModel( + **self._entity_args('cpu_info'), + **entities['cpu_info'], + ), + ] + # vim:sw=4:ts=4:et: