test: add pytest suite for validation, alerting and WS auth

48 tests, ~0.1s total. Each case targets a specific bug class so the
file reads as a contract:

  - test_validation.py
      accepts well-formed device ids, rejects whitespace, slashes,
      colons, newlines, zero-width spaces, oversize values, HTML.
  - test_alert_predicate.py
      threshold boundary, bool-vs-int trap, NaN / inf / out-of-range,
      non-numeric payloads, per-device cooldown window.
  - test_active_snapshot.py
      recent vs stale, the "<= prune_seconds" boundary (inclusive),
      missing last_seen treated as ancient, empty state.
  - test_ws_token.py
      open mode, missing/wrong/empty/extra-param query strings, plus
      the happy path with the correct token.

conftest.py stubs MQTT_BROKER and prepends the repo root to sys.path
so `import backend` works without a .env file. Dev deps split into
requirements-dev.txt to keep the runtime image lean.
This commit is contained in:
authentik Default Admin 2026-05-17 14:45:00 +00:00
parent 725d1a543e
commit c4329a9b9b
8 changed files with 224 additions and 0 deletions

5
.gitignore vendored
View file

@ -1,7 +1,12 @@
.vscode
.venv
.env
.env.*
!env_sample
*.key
*.pem
*.crt
.DS_Store
__pycache__/
*.pyc
.pytest_cache/

3
requirements-dev.txt Normal file
View file

@ -0,0 +1,3 @@
-r requirements.txt
pytest==8.4.2
pytest-asyncio==1.2.0

0
tests/__init__.py Normal file
View file

12
tests/conftest.py Normal file
View file

@ -0,0 +1,12 @@
"""Test bootstrap: put the repo root on sys.path and stub MQTT_BROKER.
backend.py calls load_dotenv() and reads env at import time. Setting
MQTT_BROKER here avoids surprises when the developer has no .env file.
"""
import os
import sys
from pathlib import Path
os.environ.setdefault("MQTT_BROKER", "test.example.invalid")
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

View file

@ -0,0 +1,47 @@
"""Tests for the pruning rule used by active_snapshot()."""
import pytest
from backend import filter_active
def test_returns_recent_devices():
state = {"a": {"cpu": 10}, "b": {"cpu": 20}}
seen = {"a": 100.0, "b": 99.0}
assert filter_active(state, seen, now=110.0, prune_seconds=30) == state
def test_drops_stale_devices():
state = {"fresh": {}, "stale": {}}
seen = {"fresh": 100.0, "stale": 50.0}
assert filter_active(state, seen, now=110.0, prune_seconds=30) == {"fresh": {}}
def test_boundary_inclusive():
# The rule is "<= prune_seconds": at exactly the limit the device is
# still active. This is the documented contract; a refactor that
# tightens to "<" should fail this test loudly.
state = {"a": {}}
seen = {"a": 70.0}
assert filter_active(state, seen, now=100.0, prune_seconds=30) == {"a": {}}
def test_missing_last_seen_is_treated_as_ancient():
# An id in `state` without an entry in `seen` defaults to 0, which is
# always older than now - prune_seconds. This guards against the
# crashy alternative (KeyError) if state/seen ever drift apart.
state = {"a": {}}
seen = {}
assert filter_active(state, seen, now=100.0, prune_seconds=30) == {}
def test_empty_state_returns_empty():
assert filter_active({}, {}, now=100.0, prune_seconds=30) == {}
@pytest.mark.parametrize("prune", [0, 1, 1_000_000])
def test_prune_seconds_is_honored(prune):
state = {"a": {}}
seen = {"a": 0.0}
result = filter_active(state, seen, now=prune, prune_seconds=prune)
assert result == {"a": {}}

View file

@ -0,0 +1,63 @@
"""Tests for the Slack alert predicate.
These tests document the *intent* of the guards. Each case is named
after the bug that would slip through if the guard were removed.
"""
import math
import pytest
from backend import should_alert
THRESHOLD = 90
COOLDOWN = 60
def test_fires_above_threshold():
assert should_alert(95.0, "d", now=1000.0, last_alert_map={}, cooldown=COOLDOWN, threshold=THRESHOLD)
def test_silent_below_threshold():
assert not should_alert(80.0, "d", now=1000.0, last_alert_map={}, cooldown=COOLDOWN, threshold=THRESHOLD)
def test_fires_at_boundary():
assert should_alert(THRESHOLD, "d", now=1000.0, last_alert_map={}, cooldown=COOLDOWN, threshold=THRESHOLD)
def test_rejects_bool_true():
# Python quirk: True == 1 and isinstance(True, int) is True. The guard
# must refuse it explicitly; otherwise a payload {"cpu_percent": true}
# would skip the numeric range check.
assert not should_alert(True, "d", now=1000.0, last_alert_map={}, cooldown=COOLDOWN, threshold=THRESHOLD)
@pytest.mark.parametrize("bad", [math.nan, math.inf, -math.inf, -1, 101, 1e308])
def test_rejects_out_of_range_or_nonfinite(bad):
assert not should_alert(bad, "d", now=1000.0, last_alert_map={}, cooldown=COOLDOWN, threshold=THRESHOLD)
@pytest.mark.parametrize("bad", [None, "95", b"95", [95], {"v": 95}])
def test_rejects_non_numeric(bad):
assert not should_alert(bad, "d", now=1000.0, last_alert_map={}, cooldown=COOLDOWN, threshold=THRESHOLD)
def test_cooldown_silences_repeat_alerts():
last = {"d": 950.0}
# Only 50s passed, cooldown is 60s -> still silent.
assert not should_alert(95.0, "d", now=1000.0, last_alert_map=last, cooldown=COOLDOWN, threshold=THRESHOLD)
def test_cooldown_releases_after_window():
last = {"d": 900.0}
# 100s passed > 60s cooldown -> fires again.
assert should_alert(95.0, "d", now=1000.0, last_alert_map=last, cooldown=COOLDOWN, threshold=THRESHOLD)
def test_cooldown_is_per_device():
# device A is in cooldown but device B should still alert.
last = {"a": 999.0}
assert not should_alert(95.0, "a", now=1000.0, last_alert_map=last, cooldown=COOLDOWN, threshold=THRESHOLD)
assert should_alert(95.0, "b", now=1000.0, last_alert_map=last, cooldown=COOLDOWN, threshold=THRESHOLD)

43
tests/test_validation.py Normal file
View file

@ -0,0 +1,43 @@
"""Tests for device_id validation.
Each case is a regression for a specific attack/mistake that the
backend has to refuse before storing state or rendering Slack mrkdwn.
"""
import pytest
from backend import validate_device_id
@pytest.mark.parametrize(
"value",
[
"device-001",
"edge_node.42",
"A",
"a" * 64, # boundary: exactly 64 chars
"01234567-89ab-cdef",
],
)
def test_accepts_well_formed_ids(value):
assert validate_device_id(value) == value
@pytest.mark.parametrize(
"value",
[
None,
"",
123,
b"device-001", # bytes, not str
"device 001", # space
"device/001", # slash
"device:001", # colon
"device\n<!channel>", # Slack mrkdwn injection vector
"device" + "" * 3, # zero-width spaces would fool human review
"a" * 65, # boundary: just over 64
"<a href='x'>",
],
)
def test_rejects_malformed_ids(value):
assert validate_device_id(value) is None

51
tests/test_ws_token.py Normal file
View file

@ -0,0 +1,51 @@
"""Tests for WebSocket shared-token authentication.
We avoid spinning up a real WS server: ``_ws_token_ok`` only needs an
object that exposes ``.request.path``. A small dummy is enough.
"""
from types import SimpleNamespace
import pytest
import backend
def _fake_ws(target: str):
"""Mimic websockets.ServerConnection.request.path for the assertion."""
return SimpleNamespace(request=SimpleNamespace(path=target))
def test_open_mode_accepts_any_handshake(monkeypatch):
# Empty WS_AUTH_TOKEN = open mode (dev-friendly default).
monkeypatch.setattr(backend, "WS_AUTH_TOKEN", "")
assert backend._ws_token_ok(_fake_ws("/")) is True
assert backend._ws_token_ok(_fake_ws("/?token=anything")) is True
def test_rejects_missing_token(monkeypatch):
monkeypatch.setattr(backend, "WS_AUTH_TOKEN", "s3cret")
assert backend._ws_token_ok(_fake_ws("/")) is False
def test_rejects_wrong_token(monkeypatch):
monkeypatch.setattr(backend, "WS_AUTH_TOKEN", "s3cret")
assert backend._ws_token_ok(_fake_ws("/?token=wrong")) is False
def test_accepts_correct_token(monkeypatch):
monkeypatch.setattr(backend, "WS_AUTH_TOKEN", "s3cret")
assert backend._ws_token_ok(_fake_ws("/?token=s3cret")) is True
def test_accepts_token_with_extra_query_params(monkeypatch):
monkeypatch.setattr(backend, "WS_AUTH_TOKEN", "s3cret")
assert backend._ws_token_ok(_fake_ws("/?foo=bar&token=s3cret&x=1")) is True
def test_rejects_empty_token_value(monkeypatch):
# ?token= with no value should never match a configured secret,
# otherwise compare_digest("", "") would let everyone in if
# WS_AUTH_TOKEN were ever accidentally set to "".
monkeypatch.setattr(backend, "WS_AUTH_TOKEN", "s3cret")
assert backend._ws_token_ok(_fake_ws("/?token=")) is False