homeassistant/custom_components/sonoff/core/ewelink/base.py
2025-01-10 21:08:35 -08:00

69 lines
2.1 KiB
Python

import asyncio
import time
from typing import Callable, Optional, TypedDict
from aiohttp import ClientSession
SIGNAL_CONNECTED = "connected"
SIGNAL_UPDATE = "update"
class XDevice(TypedDict, total=False):
deviceid: str
extra: dict
name: str
params: dict
brandName: Optional[str]
productModel: Optional[str]
online: Optional[bool] # required for cloud
apikey: Optional[str] # required for cloud
local: Optional[bool] # required for local
localtype: Optional[str] # exist for local DIY device type
host: Optional[str] # required for local
devicekey: Optional[str] # required for encrypted local devices (not DIY)
local_ts: Optional[float] # time of last local msg from device
params_bulk: Optional[dict] # helper for send_bulk commands
pow_ts: Optional[float] # required for pow devices with cloud connection
parent: Optional[dict]
class XRegistryBase:
dispatcher: dict[str, list[Callable]] = None
_sequence: int = 0
def __init__(self, session: ClientSession):
self.dispatcher = {}
self.session = session
@staticmethod
def sequence() -> str:
"""Return sequnce counter in ms. Always unique."""
t = int(time.time()) * 1000
if t > XRegistryBase._sequence:
XRegistryBase._sequence = t
else:
XRegistryBase._sequence += 1
return str(XRegistryBase._sequence)
def dispatcher_connect(self, signal: str, target: Callable) -> Callable:
targets = self.dispatcher.setdefault(signal, [])
if target not in targets:
targets.append(target)
return lambda: targets.remove(target)
def dispatcher_send(self, signal: str, *args, **kwargs):
if not self.dispatcher.get(signal):
return
for handler in self.dispatcher[signal]:
handler(*args, **kwargs)
async def dispatcher_wait(self, signal: str):
event = asyncio.Event()
disconnect = self.dispatcher_connect(signal, lambda: event.set())
await event.wait()
disconnect()