homeassistant/custom_components/frigate/binary_sensor.py

304 lines
9.3 KiB
Python
Raw Permalink Normal View History

2025-01-10 21:08:35 -08:00
"""Binary sensor platform for Frigate."""
from __future__ import annotations
import logging
from typing import Any, cast
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import (
FrigateMQTTEntity,
ReceiveMessage,
decode_if_necessary,
get_cameras,
get_cameras_and_audio,
get_cameras_zones_and_objects,
get_friendly_name,
get_frigate_device_identifier,
get_frigate_entity_unique_id,
get_zones,
)
from .const import ATTR_CONFIG, DOMAIN, NAME
from .icons import get_dynamic_icon_from_type
_LOGGER: logging.Logger = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Binary sensor entry setup."""
frigate_config = hass.data[DOMAIN][entry.entry_id][ATTR_CONFIG]
entities = []
# add object sensors for cameras and zones
entities.extend(
[
FrigateObjectOccupancySensor(entry, frigate_config, cam_name, obj)
for cam_name, obj in get_cameras_zones_and_objects(frigate_config)
]
)
# add audio sensors for cameras
entities.extend(
[
FrigateAudioSensor(entry, frigate_config, cam_name, audio)
for cam_name, audio in get_cameras_and_audio(frigate_config)
]
)
# add generic motion sensors for cameras
entities.extend(
[
FrigateMotionSensor(entry, frigate_config, cam_name)
for cam_name in get_cameras(frigate_config)
]
)
async_add_entities(entities)
class FrigateObjectOccupancySensor(FrigateMQTTEntity, BinarySensorEntity): # type: ignore[misc]
"""Frigate Occupancy Sensor class."""
def __init__(
self,
config_entry: ConfigEntry,
frigate_config: dict[str, Any],
cam_name: str,
obj_name: str,
) -> None:
"""Construct a new FrigateObjectOccupancySensor."""
self._cam_name = cam_name
self._obj_name = obj_name
self._is_on = False
self._frigate_config = frigate_config
super().__init__(
config_entry,
frigate_config,
{
"state_topic": {
"msg_callback": self._state_message_received,
"qos": 0,
"topic": (
f"{self._frigate_config['mqtt']['topic_prefix']}"
f"/{self._cam_name}/{self._obj_name}"
),
"encoding": None,
},
},
)
@callback # type: ignore[misc]
def _state_message_received(self, msg: ReceiveMessage) -> None:
"""Handle a new received MQTT state message."""
try:
self._is_on = int(msg.payload) > 0
except ValueError:
self._is_on = False
self.async_write_ha_state()
@property
def unique_id(self) -> str:
"""Return a unique ID for this entity."""
return get_frigate_entity_unique_id(
self._config_entry.entry_id,
"occupancy_sensor",
f"{self._cam_name}_{self._obj_name}",
)
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {
"identifiers": {
get_frigate_device_identifier(self._config_entry, self._cam_name)
},
"via_device": get_frigate_device_identifier(self._config_entry),
"name": get_friendly_name(self._cam_name),
"model": self._get_model(),
"configuration_url": f"{self._config_entry.data.get(CONF_URL)}/cameras/{self._cam_name if self._cam_name not in get_zones(self._frigate_config) else ''}",
"manufacturer": NAME,
}
@property
def name(self) -> str:
"""Return the name of the sensor."""
return f"{self._obj_name} occupancy"
@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self._is_on
@property
def device_class(self) -> str:
"""Return the device class."""
return cast(str, BinarySensorDeviceClass.OCCUPANCY)
@property
def icon(self) -> str:
"""Return the icon of the sensor."""
return get_dynamic_icon_from_type(self._obj_name, self._is_on)
class FrigateAudioSensor(FrigateMQTTEntity, BinarySensorEntity): # type: ignore[misc]
"""Frigate Audio Sensor class."""
def __init__(
self,
config_entry: ConfigEntry,
frigate_config: dict[str, Any],
cam_name: str,
audio_name: str,
) -> None:
"""Construct a new FrigateAudioSensor."""
self._cam_name = cam_name
self._audio_name = audio_name
self._is_on = False
self._frigate_config = frigate_config
super().__init__(
config_entry,
frigate_config,
{
"state_topic": {
"msg_callback": self._state_message_received,
"qos": 0,
"topic": (
f"{self._frigate_config['mqtt']['topic_prefix']}"
f"/{self._cam_name}/audio/{self._audio_name}"
),
},
},
)
@callback # type: ignore[misc]
def _state_message_received(self, msg: ReceiveMessage) -> None:
"""Handle a new received MQTT state message."""
self._is_on = decode_if_necessary(msg.payload) == "ON"
self.async_write_ha_state()
@property
def unique_id(self) -> str:
"""Return a unique ID for this entity."""
return get_frigate_entity_unique_id(
self._config_entry.entry_id,
"audio_sensor",
f"{self._cam_name}_{self._audio_name}",
)
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {
"identifiers": {
get_frigate_device_identifier(self._config_entry, self._cam_name)
},
"via_device": get_frigate_device_identifier(self._config_entry),
"name": get_friendly_name(self._cam_name),
"model": self._get_model(),
"configuration_url": f"{self._config_entry.data.get(CONF_URL)}/cameras/{self._cam_name}",
"manufacturer": NAME,
}
@property
def name(self) -> str:
"""Return the name of the sensor."""
return f"{self._audio_name} sound"
@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self._is_on
@property
def device_class(self) -> str:
"""Return the device class."""
return cast(str, BinarySensorDeviceClass.SOUND)
@property
def icon(self) -> str:
"""Return the icon of the sensor."""
return get_dynamic_icon_from_type("sound", self._is_on)
class FrigateMotionSensor(FrigateMQTTEntity, BinarySensorEntity): # type: ignore[misc]
"""Frigate Motion Sensor class."""
_attr_name = "Motion"
def __init__(
self,
config_entry: ConfigEntry,
frigate_config: dict[str, Any],
cam_name: str,
) -> None:
"""Construct a new FrigateMotionSensor."""
self._cam_name = cam_name
self._is_on = False
self._frigate_config = frigate_config
super().__init__(
config_entry,
frigate_config,
{
"state_topic": {
"msg_callback": self._state_message_received,
"qos": 0,
"topic": (
f"{self._frigate_config['mqtt']['topic_prefix']}"
f"/{self._cam_name}/motion"
),
},
},
)
@callback # type: ignore[misc]
def _state_message_received(self, msg: ReceiveMessage) -> None:
"""Handle a new received MQTT state message."""
self._is_on = decode_if_necessary(msg.payload) == "ON"
self.async_write_ha_state()
@property
def unique_id(self) -> str:
"""Return a unique ID for this entity."""
return get_frigate_entity_unique_id(
self._config_entry.entry_id,
"motion_sensor",
f"{self._cam_name}",
)
@property
def device_info(self) -> dict[str, Any]:
"""Return device information."""
return {
"identifiers": {
get_frigate_device_identifier(self._config_entry, self._cam_name)
},
"via_device": get_frigate_device_identifier(self._config_entry),
"name": get_friendly_name(self._cam_name),
"model": self._get_model(),
"configuration_url": f"{self._config_entry.data.get(CONF_URL)}/cameras/{self._cam_name if self._cam_name not in get_zones(self._frigate_config) else ''}",
"manufacturer": NAME,
}
@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self._is_on
@property
def device_class(self) -> str:
"""Return the device class."""
return cast(str, BinarySensorDeviceClass.MOTION)