homeassistant/custom_components/frigate/number.py

243 lines
7.4 KiB
Python
Raw Permalink Normal View History

2025-01-10 21:08:35 -08:00
"""Number platform for frigate."""
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.mqtt import async_publish
from homeassistant.components.number import NumberEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity import DeviceInfo, EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import (
FrigateMQTTEntity,
ReceiveMessage,
get_cameras,
get_friendly_name,
get_frigate_device_identifier,
get_frigate_entity_unique_id,
)
from .const import (
ATTR_CONFIG,
DOMAIN,
MAX_CONTOUR_AREA,
MAX_THRESHOLD,
MIN_CONTOUR_AREA,
MIN_THRESHOLD,
NAME,
)
from .icons import ICON_SPEEDOMETER
_LOGGER: logging.Logger = logging.getLogger(__name__)
CAMERA_FPS_TYPES = ["camera", "detection", "process", "skipped"]
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Sensor entry setup."""
frigate_config = hass.data[DOMAIN][entry.entry_id][ATTR_CONFIG]
entities = []
# add motion configurations for cameras
for cam_name in get_cameras(frigate_config):
entities.extend(
[FrigateMotionContourArea(entry, frigate_config, cam_name, False)]
)
entities.extend(
[FrigateMotionThreshold(entry, frigate_config, cam_name, False)]
)
async_add_entities(entities)
class FrigateMotionContourArea(FrigateMQTTEntity, NumberEntity): # type: ignore[misc]
"""FrigateMotionContourArea class."""
_attr_entity_category = EntityCategory.CONFIG
_attr_name = "Contour area"
_attr_native_min_value = MIN_CONTOUR_AREA
_attr_native_max_value = MAX_CONTOUR_AREA
_attr_native_step = 1
def __init__(
self,
config_entry: ConfigEntry,
frigate_config: dict[str, Any],
cam_name: str,
default_enabled: bool,
) -> None:
"""Construct a FrigateNumber."""
self._frigate_config = frigate_config
self._cam_name = cam_name
self._attr_native_value = float(
self._frigate_config["cameras"][self._cam_name]["motion"]["contour_area"]
)
self._command_topic = (
f"{self._frigate_config['mqtt']['topic_prefix']}"
f"/{self._cam_name}/motion_contour_area/set"
)
self._attr_entity_registry_enabled_default = default_enabled
super().__init__(
config_entry,
frigate_config,
{
"state_topic": {
"msg_callback": self._state_message_received,
"qos": 0,
"topic": (
f"{self._frigate_config['mqtt']['topic_prefix']}"
f"/{self._cam_name}/motion_contour_area/state"
),
},
},
)
@callback # type: ignore[misc]
def _state_message_received(self, msg: ReceiveMessage) -> None:
"""Handle a new received MQTT state message."""
try:
self._attr_native_value = float(msg.payload)
except (TypeError, ValueError):
pass
self.async_write_ha_state()
@property
def unique_id(self) -> str:
"""Return a unique ID to use for this entity."""
return get_frigate_entity_unique_id(
self._config_entry.entry_id,
"number",
f"{self._cam_name}_contour_area",
)
@property
def device_info(self) -> DeviceInfo:
"""Get device information."""
return {
"identifiers": {
get_frigate_device_identifier(self._config_entry, self._cam_name)
},
"via_device": get_frigate_device_identifier(self._config_entry),
"name": get_friendly_name(self._cam_name),
"model": self._get_model(),
"configuration_url": f"{self._config_entry.data.get(CONF_URL)}/cameras/{self._cam_name}",
"manufacturer": NAME,
}
async def async_set_native_value(self, value: float) -> None:
"""Update motion contour area."""
await async_publish(
self.hass,
self._command_topic,
int(value),
0,
False,
)
@property
def icon(self) -> str:
"""Return the icon of the number."""
return ICON_SPEEDOMETER
class FrigateMotionThreshold(FrigateMQTTEntity, NumberEntity): # type: ignore[misc]
"""FrigateMotionThreshold class."""
_attr_entity_category = EntityCategory.CONFIG
_attr_name = "Threshold"
_attr_native_min_value = MIN_THRESHOLD
_attr_native_max_value = MAX_THRESHOLD
_attr_native_step = 1
def __init__(
self,
config_entry: ConfigEntry,
frigate_config: dict[str, Any],
cam_name: str,
default_enabled: bool,
) -> None:
"""Construct a FrigateMotionThreshold."""
self._frigate_config = frigate_config
self._cam_name = cam_name
self._attr_native_value = float(
self._frigate_config["cameras"][self._cam_name]["motion"]["threshold"]
)
self._command_topic = (
f"{frigate_config['mqtt']['topic_prefix']}"
f"/{self._cam_name}/motion_threshold/set"
)
self._attr_entity_registry_enabled_default = default_enabled
super().__init__(
config_entry,
frigate_config,
{
"state_topic": {
"msg_callback": self._state_message_received,
"qos": 0,
"topic": (
f"{self._frigate_config['mqtt']['topic_prefix']}"
f"/{self._cam_name}/motion_threshold/state"
),
},
},
)
@callback # type: ignore[misc]
def _state_message_received(self, msg: ReceiveMessage) -> None:
"""Handle a new received MQTT state message."""
try:
self._attr_native_value = float(msg.payload)
except (TypeError, ValueError):
pass
self.async_write_ha_state()
@property
def unique_id(self) -> str:
"""Return a unique ID to use for this entity."""
return get_frigate_entity_unique_id(
self._config_entry.entry_id,
"number",
f"{self._cam_name}_threshold",
)
@property
def device_info(self) -> DeviceInfo:
"""Get device information."""
return {
"identifiers": {
get_frigate_device_identifier(self._config_entry, self._cam_name)
},
"via_device": get_frigate_device_identifier(self._config_entry),
"name": get_friendly_name(self._cam_name),
"model": self._get_model(),
"configuration_url": f"{self._config_entry.data.get(CONF_URL)}/cameras/{self._cam_name}",
"manufacturer": NAME,
}
async def async_set_native_value(self, value: float) -> None:
"""Update motion threshold."""
await async_publish(
self.hass,
self._command_topic,
int(value),
0,
False,
)
@property
def icon(self) -> str:
"""Return the icon of the number."""
return ICON_SPEEDOMETER