EdgeWatch/collect_metrics.py
Richard Nixon adf6a7a1ce feat(config): typed settings via pydantic-settings
Replace the scattered os.getenv() + int()/float() pattern with a
BaseSettings class on both modules. Wins:

  - bad config now fails at import with a readable pydantic error
    (WS_PORT=abc no longer produces a ValueError stack from inside
    main()); ports are bounded to [1, 65535], cpu_alert_th to [0,100],
    backoff_min/interval to >= 1,
  - .env loading moves into pydantic-settings (env_file in
    SettingsConfigDict), so the manual load_dotenv() call is gone,
  - every callback now reads from a single ``settings`` instance, so
    runtime overrides are possible (tests use monkeypatch on
    backend.settings instead of patching module-level constants).

Test for ws_token is updated to patch backend.settings.ws_auth_token
rather than the old WS_AUTH_TOKEN module constant; the contract is
unchanged so all 55 tests still pass.

Pydantic stack pinned: pydantic==2.13.4, pydantic-core==2.46.4,
pydantic-settings==2.14.1 (plus annotated-types and typing-inspection
as transitives). pip-audit remains clean.
2026-05-17 17:19:50 +01:00

123 lines
3.6 KiB
Python

import asyncio
import json
import logging
import ssl
import subprocess
import time
from typing import Optional
import psutil
from gmqtt import Client as MQTTClient
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class AgentSettings(BaseSettings):
"""Typed configuration for the edge-device agent."""
model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", extra="ignore"
)
device_id: str = "device-iot-001"
mqtt_broker: str = ""
mqtt_port: int = Field(default=8883, ge=1, le=65535)
ca_cert: str = "AmazonRootCA1.pem"
client_cert: str = "device.cert.pem"
client_key: str = "device.private.key"
interval: int = Field(default=10, ge=1)
agent_cpu_warn: float = Field(default=90.0, ge=0, le=100)
log_level: str = "INFO"
settings = AgentSettings()
logging.basicConfig(
level=settings.log_level.upper(),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
log = logging.getLogger("edgewatch.agent")
TOPIC = f"devices/{settings.device_id}/metrics"
# Process reference to measure agent overhead
process = psutil.Process()
def collect_metrics():
"""Collect CPU, RAM, Disk, GPU (if available) and agent self-metrics."""
cpu = psutil.cpu_percent(interval=None)
mem = psutil.virtual_memory().percent
disk = psutil.disk_usage("/").percent
try:
out = subprocess.check_output(
[
"nvidia-smi",
"--query-gpu=utilization.gpu,memory.used,memory.total",
"--format=csv,noheader,nounits",
]
).decode().strip()
gpu_util, _, _ = out.split(",")
gpu: Optional[float] = float(gpu_util)
except Exception: # noqa: BLE001
gpu = None # No GPU available
agent_mem = process.memory_info().rss / (1024 * 1024) # MB
agent_cpu = process.cpu_percent(interval=None) # %
return {
"device_id": settings.device_id,
"timestamp": int(time.time()),
"cpu_percent": cpu,
"mem_percent": mem,
"disk_percent": disk,
"gpu_percent": gpu,
"agent_cpu_percent": agent_cpu,
"agent_mem_mb": agent_mem,
}
async def main():
"""Main loop: connect to MQTT broker and periodically publish metrics."""
client = MQTTClient(settings.device_id)
def on_connect(_client, _flags, _rc, _properties):
log.info("agent connected device=%s", settings.device_id)
def on_disconnect(_client, _packet, _exc=None):
log.warning("agent disconnected device=%s", settings.device_id)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_ctx.load_verify_locations(settings.ca_cert)
ssl_ctx.load_cert_chain(certfile=settings.client_cert,
keyfile=settings.client_key)
await client.connect(settings.mqtt_broker, settings.mqtt_port, ssl=ssl_ctx)
try:
while True:
metrics = collect_metrics()
if metrics["cpu_percent"] > settings.agent_cpu_warn:
log.warning("local CPU warn device=%s cpu=%.1f",
settings.device_id, metrics["cpu_percent"])
log.debug("agent self_cpu=%.1f self_mem_mb=%.2f",
metrics["agent_cpu_percent"], metrics["agent_mem_mb"])
client.publish(TOPIC, json.dumps(metrics), qos=1, retain=False)
await asyncio.sleep(settings.interval)
except asyncio.CancelledError:
log.info("agent cancelled device=%s", settings.device_id)
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())