homeassistant/custom_components/sonoff/core/ewelink/local.py
2025-01-10 21:08:35 -08:00

254 lines
7.9 KiB
Python

"""This registry can read data from LAN devices and send commands to them.
For non DIY devices data will be encrypted with devicekey. The registry cannot
decode such messages by itself because it does not manage the list of known
devices and their devicekey.
"""
import asyncio
import base64
import ipaddress
import json
import logging
import aiohttp
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Random import get_random_bytes
from zeroconf import Zeroconf, ServiceStateChange
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo
from .base import SIGNAL_CONNECTED, SIGNAL_UPDATE, XDevice, XRegistryBase
_LOGGER = logging.getLogger(__name__)
# some venv users don't have Crypto.Util.Padding
# I don't know why pycryptodome is not installed on their systems
# https://github.com/AlexxIT/SonoffLAN/issues/129
def pad(data_to_pad: bytes, block_size: int):
padding_len = block_size - len(data_to_pad) % block_size
padding = bytes([padding_len]) * padding_len
return data_to_pad + padding
def unpad(padded_data: bytes, block_size: int):
padding_len = padded_data[-1]
return padded_data[:-padding_len]
def encrypt(payload: dict, devicekey: str):
devicekey = devicekey.encode("utf-8")
hash_ = MD5.new()
hash_.update(devicekey)
key = hash_.digest()
iv = get_random_bytes(16)
plaintext = json.dumps(payload["data"]).encode("utf-8")
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
padded = pad(plaintext, AES.block_size)
ciphertext = cipher.encrypt(padded)
payload["encrypt"] = True
payload["data"] = base64.b64encode(ciphertext).decode("utf-8")
payload["iv"] = base64.b64encode(iv).decode("utf-8")
return payload
def decrypt(payload: dict, devicekey: str):
devicekey = devicekey.encode("utf-8")
hash_ = MD5.new()
hash_.update(devicekey)
key = hash_.digest()
cipher = AES.new(key, AES.MODE_CBC, iv=base64.b64decode(payload["iv"]))
ciphertext = base64.b64decode(payload["data"])
padded = cipher.decrypt(ciphertext)
return unpad(padded, AES.block_size)
class XRegistryLocal(XRegistryBase):
browser: AsyncServiceBrowser = None
online: bool = False
def start(self, zeroconf: Zeroconf):
self.browser = AsyncServiceBrowser(
zeroconf, "_ewelink._tcp.local.", [self._handler1]
)
self.online = True
self.dispatcher_send(SIGNAL_CONNECTED)
async def stop(self):
if not self.online:
return
self.online = False
await self.browser.async_cancel()
def _handler1(
self,
zeroconf: Zeroconf,
service_type: str,
name: str,
state_change: ServiceStateChange,
):
"""Step 1. Receive change event from zeroconf."""
if state_change == ServiceStateChange.Removed:
return
asyncio.create_task(self._handler2(zeroconf, service_type, name))
async def _handler2(self, zeroconf: Zeroconf, service_type: str, name: str):
"""Step 2. Request additional info about add and update event from device."""
deviceid = name[8:18]
try:
info = AsyncServiceInfo(service_type, name)
if not await info.async_request(zeroconf, 3000) or not info.properties:
_LOGGER.debug(f"{deviceid} <= Local0 | Can't get zeroconf info")
return
# support update with empty host and host without port
for addr in info.addresses:
# zeroconf lib should return IPv4, but better check anyway
addr = ipaddress.IPv4Address(addr)
host = f"{addr}:{info.port}" if info.port else str(addr)
break
else:
if info.server and info.port:
host = f"{info.server}:{info.port}"
else:
host = None
data = {
k.decode(): v.decode() if isinstance(v, bytes) else v
for k, v in info.properties.items()
}
self._handler3(deviceid, host, data)
except Exception as e:
_LOGGER.debug(f"{deviceid} <= Local0 | Zeroconf error", exc_info=e)
def _handler3(self, deviceid: str, host: str, data: dict):
"""Step 3. Process new data from device."""
raw = "".join([data[f"data{i}"] for i in range(1, 5, 1) if f"data{i}" in data])
msg = {
"deviceid": deviceid,
"subdevid": data["id"],
"localtype": data["type"],
"seq": data.get("seq"),
}
if host:
msg["host"] = host
if data.get("encrypt"):
msg["data"] = raw
msg["iv"] = data["iv"]
else:
msg["params"] = json.loads(raw)
self.dispatcher_send(SIGNAL_UPDATE, msg)
async def send(
self,
device: XDevice,
params: dict = None,
command: str = None,
sequence: str = None,
timeout: int = 5,
):
# known commands for DIY: switch, startup, pulse, sledonline
# other commands: switch, switches, transmit, dimmable, light, fan
if command is None:
if params is None:
return "noquery"
command = next(iter(params))
payload = {
"sequence": sequence or self.sequence(),
"deviceid": device["deviceid"],
"selfApikey": "123",
"data": params or {},
}
if "devicekey" in device:
payload = encrypt(payload, device["devicekey"])
log = f"{device['deviceid']} => Local4 | {device.get('host', '')} | {params}"
try:
host = device["host"]
if ":" not in host:
host += ":8081" # default port, some devices may have another
# noinspection HttpUrlsUsage
r = await self.session.post(
f"http://{host}/zeroconf/{command}",
json=payload,
headers={"Connection": "close"},
timeout=timeout,
)
try:
resp: dict = await r.json()
if resp["error"] == 0:
_LOGGER.debug(f"{log} <= {resp}")
if "iv" in resp:
msg = {
"deviceid": device["deviceid"],
"localtype": device["localtype"],
"seq": resp["seq"],
"data": resp["data"],
"iv": resp["iv"],
}
if params and params.get("subDevId"):
msg["subdevid"] = params["subDevId"]
self.dispatcher_send(SIGNAL_UPDATE, msg)
return "online"
else:
_LOGGER.debug(f"{log} <= {resp}")
return "error"
except Exception as e:
_LOGGER.debug(f"{log} !! Can't read JSON {e}")
return "error"
except asyncio.TimeoutError:
_LOGGER.debug(f"{log} !! Timeout {timeout}")
return "timeout"
except aiohttp.ClientConnectorError as e:
_LOGGER.debug(f"{log} !! Can't connect: {e}")
return "E#CON"
except (
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
asyncio.CancelledError,
) as e:
_LOGGER.debug(log, exc_info=e)
return "E#COS"
except Exception as e:
_LOGGER.error(log, exc_info=e)
return "E#???"
@staticmethod
def decrypt_msg(msg: dict, devicekey: str = None) -> dict:
data = decrypt(msg, devicekey)
# Fix Sonoff RF Bridge sintax bug
if data and data.startswith(b'{"rf'):
data = data.replace(b'"="', b'":"')
# fix https://github.com/AlexxIT/SonoffLAN/issues/1160
data = data.rstrip(b"\x02")
return json.loads(data)