homeassistant/custom_components/frigate/views.py

580 lines
19 KiB
Python
Raw Permalink Normal View History

2025-01-10 21:08:35 -08:00
"""Frigate HTTP views."""
from __future__ import annotations
import asyncio
from collections.abc import Mapping
import datetime
from http import HTTPStatus
from ipaddress import ip_address
import logging
from typing import Any, Optional, cast
import aiohttp
from aiohttp import hdrs, web
from aiohttp.web_exceptions import HTTPBadGateway, HTTPUnauthorized
import jwt
from multidict import CIMultiDict
from yarl import URL
from custom_components.frigate.api import FrigateApiClient
from custom_components.frigate.const import (
ATTR_CLIENT,
ATTR_CLIENT_ID,
ATTR_CONFIG,
ATTR_MQTT,
CONF_NOTIFICATION_PROXY_ENABLE,
CONF_NOTIFICATION_PROXY_EXPIRE_AFTER_SECONDS,
DOMAIN,
)
from homeassistant.components.http import KEY_AUTHENTICATED, HomeAssistantView
from homeassistant.components.http.auth import DATA_SIGN_SECRET, SIGN_QUERY_PARAM
from homeassistant.components.http.const import KEY_HASS
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
_LOGGER: logging.Logger = logging.getLogger(__name__)
def get_default_config_entry(hass: HomeAssistant) -> ConfigEntry | None:
"""Get the default Frigate config entry.
This is for backwards compatibility for when only a single instance was
supported. If there's more than one instance configured, then there is no
default and the user must specify explicitly which instance they want.
"""
frigate_entries = hass.config_entries.async_entries(DOMAIN)
if len(frigate_entries) == 1:
return frigate_entries[0]
return None
def get_frigate_instance_id(config: dict[str, Any]) -> str | None:
"""Get the Frigate instance id from a Frigate configuration."""
# Use the MQTT client_id as a way to separate the frigate instances, rather
# than just using the config_entry_id, in order to make URLs maximally
# relatable/findable by the user. The MQTT client_id value is configured by
# the user in their Frigate configuration and will be unique per Frigate
# instance (enforced in practice on the Frigate/MQTT side).
return cast(Optional[str], config.get(ATTR_MQTT, {}).get(ATTR_CLIENT_ID))
def get_config_entry_for_frigate_instance_id(
hass: HomeAssistant, frigate_instance_id: str
) -> ConfigEntry | None:
"""Get a ConfigEntry for a given frigate_instance_id."""
for config_entry in hass.config_entries.async_entries(DOMAIN):
config = hass.data[DOMAIN].get(config_entry.entry_id, {}).get(ATTR_CONFIG, {})
if config and get_frigate_instance_id(config) == frigate_instance_id:
return config_entry
return None
def get_client_for_frigate_instance_id(
hass: HomeAssistant, frigate_instance_id: str
) -> FrigateApiClient | None:
"""Get a client for a given frigate_instance_id."""
config_entry = get_config_entry_for_frigate_instance_id(hass, frigate_instance_id)
if config_entry:
return cast(
FrigateApiClient,
hass.data[DOMAIN].get(config_entry.entry_id, {}).get(ATTR_CLIENT),
)
return None
def get_frigate_instance_id_for_config_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
) -> ConfigEntry | None:
"""Get a frigate_instance_id for a ConfigEntry."""
config = hass.data[DOMAIN].get(config_entry.entry_id, {}).get(ATTR_CONFIG, {})
return get_frigate_instance_id(config) if config else None
def async_setup(hass: HomeAssistant) -> None:
"""Set up the views."""
session = async_get_clientsession(hass)
hass.http.register_view(JSMPEGProxyView(session))
hass.http.register_view(MSEProxyView(session))
hass.http.register_view(WebRTCProxyView(session))
hass.http.register_view(NotificationsProxyView(session))
hass.http.register_view(SnapshotsProxyView(session))
hass.http.register_view(RecordingProxyView(session))
hass.http.register_view(ThumbnailsProxyView(session))
hass.http.register_view(VodProxyView(session))
hass.http.register_view(VodSegmentProxyView(session))
# These proxies are inspired by:
# - https://github.com/home-assistant/supervisor/blob/main/supervisor/api/ingress.py
class ProxyView(HomeAssistantView): # type: ignore[misc]
"""HomeAssistant view."""
requires_auth = True
def __init__(self, websession: aiohttp.ClientSession):
"""Initialize the frigate clips proxy view."""
self._websession = websession
def _get_config_entry_for_request(
self, request: web.Request, frigate_instance_id: str | None
) -> ConfigEntry | None:
"""Get a ConfigEntry for a given request."""
hass = request.app[KEY_HASS]
if frigate_instance_id:
return get_config_entry_for_frigate_instance_id(hass, frigate_instance_id)
return get_default_config_entry(hass)
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
raise NotImplementedError # pragma: no cover
def _permit_request(
self, request: web.Request, config_entry: ConfigEntry, **kwargs: Any
) -> bool:
"""Determine whether to permit a request."""
return True
async def get(
self,
request: web.Request,
**kwargs: Any,
) -> web.Response | web.StreamResponse | web.WebSocketResponse:
"""Route data to service."""
try:
return await self._handle_request(request, **kwargs)
except aiohttp.ClientError as err:
_LOGGER.debug("Reverse proxy error for %s: %s", request.rel_url, err)
raise HTTPBadGateway() from None
@staticmethod
def _get_query_params(request: web.Request) -> Mapping[str, str]:
"""Get the query params to send upstream."""
return {k: v for k, v in request.query.items() if k != "authSig"}
async def _handle_request(
self,
request: web.Request,
frigate_instance_id: str | None = None,
**kwargs: Any,
) -> web.Response | web.StreamResponse:
"""Handle route for request."""
config_entry = self._get_config_entry_for_request(request, frigate_instance_id)
if not config_entry:
return web.Response(status=HTTPStatus.BAD_REQUEST)
if not self._permit_request(request, config_entry, **kwargs):
return web.Response(status=HTTPStatus.FORBIDDEN)
full_path = self._create_path(**kwargs)
if not full_path:
return web.Response(status=HTTPStatus.NOT_FOUND)
url = str(URL(config_entry.data[CONF_URL]) / full_path)
data = await request.read()
source_header = _init_header(request)
async with self._websession.request(
request.method,
url,
headers=source_header,
params=self._get_query_params(request),
allow_redirects=False,
data=data,
) as result:
headers = _response_header(result)
# Stream response
response = web.StreamResponse(status=result.status, headers=headers)
response.content_type = result.content_type
try:
await response.prepare(request)
async for data in result.content.iter_any():
await response.write(data)
except (aiohttp.ClientError, aiohttp.ClientPayloadError) as err:
_LOGGER.debug("Stream error for %s: %s", request.rel_url, err)
except ConnectionResetError:
# Connection is reset/closed by peer.
pass
return response
class SnapshotsProxyView(ProxyView):
"""A proxy for snapshots."""
url = "/api/frigate/{frigate_instance_id:.+}/snapshot/{eventid:.*}"
extra_urls = ["/api/frigate/snapshot/{eventid:.*}"]
name = "api:frigate:snapshots"
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"api/events/{kwargs['eventid']}/snapshot.jpg"
class RecordingProxyView(ProxyView):
"""A proxy for recordings."""
url = "/api/frigate/{frigate_instance_id:.+}/recording/{camera:.+}/start/{start:[.0-9]+}/end/{end:[.0-9]*}"
extra_urls = [
"/api/frigate/recording/{camera:.+}/start/{start:[.0-9]+}/end/{end:[.0-9]*}"
]
name = "api:frigate:recording"
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return (
f"api/{kwargs['camera']}/start/{kwargs['start']}"
+ f"/end/{kwargs['end']}/clip.mp4"
)
class ThumbnailsProxyView(ProxyView):
"""A proxy for snapshots."""
url = "/api/frigate/{frigate_instance_id:.+}/thumbnail/{eventid:.*}"
name = "api:frigate:thumbnails"
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"api/events/{kwargs['eventid']}/thumbnail.jpg"
class NotificationsProxyView(ProxyView):
"""A proxy for notifications."""
url = "/api/frigate/{frigate_instance_id:.+}/notifications/{event_id}/{path:.*}"
extra_urls = ["/api/frigate/notifications/{event_id}/{path:.*}"]
name = "api:frigate:notification"
requires_auth = False
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
path, event_id = kwargs["path"], kwargs["event_id"]
if path == "thumbnail.jpg":
return f"api/events/{event_id}/thumbnail.jpg"
if path == "snapshot.jpg":
return f"api/events/{event_id}/snapshot.jpg"
if path.endswith("clip.mp4"):
return f"api/events/{event_id}/clip.mp4"
return None
def _permit_request(
self, request: web.Request, config_entry: ConfigEntry, **kwargs: Any
) -> bool:
"""Determine whether to permit a request."""
is_notification_proxy_enabled = bool(
config_entry.options.get(CONF_NOTIFICATION_PROXY_ENABLE, True)
)
# If proxy is disabled, immediately reject
if not is_notification_proxy_enabled:
return False
# Authenticated requests are always allowed.
if request[KEY_AUTHENTICATED]:
return True
# If request is not authenticated, check whether it is expired.
notification_expiration_seconds = int(
config_entry.options.get(CONF_NOTIFICATION_PROXY_EXPIRE_AFTER_SECONDS, 0)
)
# If notification events never expire, immediately permit.
if notification_expiration_seconds == 0:
return True
try:
event_id_timestamp = int(kwargs["event_id"].partition(".")[0])
event_datetime = datetime.datetime.fromtimestamp(
event_id_timestamp, tz=datetime.timezone.utc
)
now_datetime = datetime.datetime.now(tz=datetime.timezone.utc)
expiration_datetime = event_datetime + datetime.timedelta(
seconds=notification_expiration_seconds
)
# Otherwise, permit only if notification event is not expired
return now_datetime.timestamp() <= expiration_datetime.timestamp()
except ValueError:
_LOGGER.warning(
"The event id %s does not have a valid format.", kwargs["event_id"]
)
return False
class VodProxyView(ProxyView):
"""A proxy for vod playlists."""
url = "/api/frigate/{frigate_instance_id:.+}/vod/{path:.+}/{manifest:.+}.m3u8"
extra_urls = ["/api/frigate/vod/{path:.+}/{manifest:.+}.m3u8"]
name = "api:frigate:vod:manifest"
@staticmethod
def _get_query_params(request: web.Request) -> Mapping[str, str]:
"""Get the query params to send upstream."""
return request.query
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"vod/{kwargs['path']}/{kwargs['manifest']}.m3u8"
class VodSegmentProxyView(ProxyView):
"""A proxy for vod segments."""
url = "/api/frigate/{frigate_instance_id:.+}/vod/{path:.+}/{segment:.+}.{extension:(ts|m4s|mp4)}"
extra_urls = ["/api/frigate/vod/{path:.+}/{segment:.+}.{extension:(ts|m4s|mp4)}"]
name = "api:frigate:vod:segment"
requires_auth = False
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"vod/{kwargs['path']}/{kwargs['segment']}.{kwargs['extension']}"
async def _async_validate_signed_manifest(self, request: web.Request) -> bool:
"""Validate the signature for the manifest of this segment."""
hass = request.app[KEY_HASS]
secret = hass.data.get(DATA_SIGN_SECRET)
signature = request.query.get(SIGN_QUERY_PARAM)
if signature is None:
_LOGGER.warning("Missing authSig query parameter on VOD segment request.")
return False
try:
claims = jwt.decode(
signature, secret, algorithms=["HS256"], options={"verify_iss": False}
)
except jwt.InvalidTokenError:
_LOGGER.warning("Invalid JWT token for VOD segment request.")
return False
# Check that the base path is the same as what was signed
check_path = request.path.rsplit("/", maxsplit=1)[0]
if not claims["path"].startswith(check_path):
_LOGGER.warning("%s does not start with %s", claims["path"], check_path)
return False
return True
async def get(
self,
request: web.Request,
**kwargs: Any,
) -> web.Response | web.StreamResponse | web.WebSocketResponse:
"""Route data to service."""
if not await self._async_validate_signed_manifest(request):
raise HTTPUnauthorized()
return await super().get(request, **kwargs)
class WebsocketProxyView(ProxyView):
"""A simple proxy for websockets."""
async def _proxy_msgs(
self,
ws_in: aiohttp.ClientWebSocketResponse | web.WebSocketResponse,
ws_out: aiohttp.ClientWebSocketResponse | web.WebSocketResponse,
) -> None:
async for msg in ws_in:
try:
if msg.type == aiohttp.WSMsgType.TEXT:
await ws_out.send_str(msg.data)
elif msg.type == aiohttp.WSMsgType.BINARY:
await ws_out.send_bytes(msg.data)
elif msg.type == aiohttp.WSMsgType.PING:
await ws_out.ping()
elif msg.type == aiohttp.WSMsgType.PONG:
await ws_out.pong()
except ConnectionResetError:
return
async def _handle_request(
self,
request: web.Request,
frigate_instance_id: str | None = None,
**kwargs: Any,
) -> web.Response | web.StreamResponse:
"""Handle route for request."""
config_entry = self._get_config_entry_for_request(request, frigate_instance_id)
if not config_entry:
return web.Response(status=HTTPStatus.BAD_REQUEST)
if not self._permit_request(request, config_entry, **kwargs):
return web.Response(status=HTTPStatus.FORBIDDEN)
full_path = self._create_path(**kwargs)
if not full_path:
return web.Response(status=HTTPStatus.NOT_FOUND)
req_protocols = []
if hdrs.SEC_WEBSOCKET_PROTOCOL in request.headers:
req_protocols = [
str(proto.strip())
for proto in request.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
]
ws_to_user = web.WebSocketResponse(
protocols=req_protocols, autoclose=False, autoping=False
)
await ws_to_user.prepare(request)
# Preparing
url = str(URL(config_entry.data[CONF_URL]) / full_path)
source_header = _init_header(request)
# Support GET query
if request.query_string:
url = f"{url}?{request.query_string}"
async with self._websession.ws_connect(
url,
headers=source_header,
protocols=req_protocols,
autoclose=False,
autoping=False,
) as ws_to_frigate:
await asyncio.wait(
[
asyncio.create_task(self._proxy_msgs(ws_to_frigate, ws_to_user)),
asyncio.create_task(self._proxy_msgs(ws_to_user, ws_to_frigate)),
],
return_when=asyncio.tasks.FIRST_COMPLETED,
)
return ws_to_user
class JSMPEGProxyView(WebsocketProxyView):
"""A proxy for JSMPEG websocket."""
url = "/api/frigate/{frigate_instance_id:.+}/jsmpeg/{path:.+}"
extra_urls = ["/api/frigate/jsmpeg/{path:.+}"]
name = "api:frigate:jsmpeg"
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"live/jsmpeg/{kwargs['path']}"
class MSEProxyView(WebsocketProxyView):
"""A proxy for MSE websocket."""
url = "/api/frigate/{frigate_instance_id:.+}/mse/{path:.+}"
extra_urls = ["/api/frigate/mse/{path:.+}"]
name = "api:frigate:mse"
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"live/mse/{kwargs['path']}"
class WebRTCProxyView(WebsocketProxyView):
"""A proxy for WebRTC websocket."""
url = "/api/frigate/{frigate_instance_id:.+}/webrtc/{path:.+}"
extra_urls = ["/api/frigate/webrtc/{path:.+}"]
name = "api:frigate:webrtc"
def _create_path(self, **kwargs: Any) -> str | None:
"""Create path."""
return f"live/webrtc/{kwargs['path']}"
def _init_header(request: web.Request) -> CIMultiDict | dict[str, str]:
"""Create initial header."""
headers = {}
# filter flags
for name, value in request.headers.items():
if name in (
hdrs.CONTENT_LENGTH,
hdrs.CONTENT_ENCODING,
hdrs.SEC_WEBSOCKET_EXTENSIONS,
hdrs.SEC_WEBSOCKET_PROTOCOL,
hdrs.SEC_WEBSOCKET_VERSION,
hdrs.SEC_WEBSOCKET_KEY,
hdrs.HOST,
hdrs.AUTHORIZATION,
):
continue
headers[name] = value
# Set X-Forwarded-For
forward_for = request.headers.get(hdrs.X_FORWARDED_FOR)
assert request.transport
connected_ip = ip_address(request.transport.get_extra_info("peername")[0])
if forward_for:
forward_for = f"{forward_for}, {connected_ip!s}"
else:
forward_for = f"{connected_ip!s}"
headers[hdrs.X_FORWARDED_FOR] = forward_for
# Set X-Forwarded-Host
forward_host = request.headers.get(hdrs.X_FORWARDED_HOST)
if not forward_host:
forward_host = request.host
headers[hdrs.X_FORWARDED_HOST] = forward_host
# Set X-Forwarded-Proto
forward_proto = request.headers.get(hdrs.X_FORWARDED_PROTO)
if not forward_proto:
forward_proto = request.url.scheme
headers[hdrs.X_FORWARDED_PROTO] = forward_proto
return headers
def _response_header(response: aiohttp.ClientResponse) -> dict[str, str]:
"""Create response header."""
headers = {}
for name, value in response.headers.items():
if name in (
hdrs.TRANSFER_ENCODING,
# Removing Content-Length header for streaming responses
# prevents seeking from working for mp4 files
# hdrs.CONTENT_LENGTH,
hdrs.CONTENT_TYPE,
hdrs.CONTENT_ENCODING,
# Strips inbound CORS response headers since the aiohttp_cors
# library will assert that they are not already present for CORS
# requests.
hdrs.ACCESS_CONTROL_ALLOW_ORIGIN,
hdrs.ACCESS_CONTROL_ALLOW_CREDENTIALS,
hdrs.ACCESS_CONTROL_EXPOSE_HEADERS,
):
continue
headers[name] = value
return headers