Replace the scattered os.getenv() + int()/float() pattern with a
BaseSettings class on both modules. Wins:
- bad config now fails at import with a readable pydantic error
(WS_PORT=abc no longer produces a ValueError stack from inside
main()); ports are bounded to [1, 65535], cpu_alert_th to [0,100],
backoff_min/interval to >= 1,
- .env loading moves into pydantic-settings (env_file in
SettingsConfigDict), so the manual load_dotenv() call is gone,
- every callback now reads from a single ``settings`` instance, so
runtime overrides are possible (tests use monkeypatch on
backend.settings instead of patching module-level constants).
Test for ws_token is updated to patch backend.settings.ws_auth_token
rather than the old WS_AUTH_TOKEN module constant; the contract is
unchanged so all 55 tests still pass.
Pydantic stack pinned: pydantic==2.13.4, pydantic-core==2.46.4,
pydantic-settings==2.14.1 (plus annotated-types and typing-inspection
as transitives). pip-audit remains clean.
123 lines
3.6 KiB
Python
123 lines
3.6 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import ssl
|
|
import subprocess
|
|
import time
|
|
from typing import Optional
|
|
|
|
import psutil
|
|
from gmqtt import Client as MQTTClient
|
|
from pydantic import Field
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
|
|
|
|
class AgentSettings(BaseSettings):
|
|
"""Typed configuration for the edge-device agent."""
|
|
|
|
model_config = SettingsConfigDict(
|
|
env_file=".env", env_file_encoding="utf-8", extra="ignore"
|
|
)
|
|
|
|
device_id: str = "device-iot-001"
|
|
mqtt_broker: str = ""
|
|
mqtt_port: int = Field(default=8883, ge=1, le=65535)
|
|
ca_cert: str = "AmazonRootCA1.pem"
|
|
client_cert: str = "device.cert.pem"
|
|
client_key: str = "device.private.key"
|
|
interval: int = Field(default=10, ge=1)
|
|
agent_cpu_warn: float = Field(default=90.0, ge=0, le=100)
|
|
log_level: str = "INFO"
|
|
|
|
|
|
settings = AgentSettings()
|
|
|
|
logging.basicConfig(
|
|
level=settings.log_level.upper(),
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
log = logging.getLogger("edgewatch.agent")
|
|
|
|
TOPIC = f"devices/{settings.device_id}/metrics"
|
|
|
|
# Process reference to measure agent overhead
|
|
process = psutil.Process()
|
|
|
|
|
|
def collect_metrics():
|
|
"""Collect CPU, RAM, Disk, GPU (if available) and agent self-metrics."""
|
|
cpu = psutil.cpu_percent(interval=None)
|
|
mem = psutil.virtual_memory().percent
|
|
disk = psutil.disk_usage("/").percent
|
|
|
|
try:
|
|
out = subprocess.check_output(
|
|
[
|
|
"nvidia-smi",
|
|
"--query-gpu=utilization.gpu,memory.used,memory.total",
|
|
"--format=csv,noheader,nounits",
|
|
]
|
|
).decode().strip()
|
|
gpu_util, _, _ = out.split(",")
|
|
gpu: Optional[float] = float(gpu_util)
|
|
except Exception: # noqa: BLE001
|
|
gpu = None # No GPU available
|
|
|
|
agent_mem = process.memory_info().rss / (1024 * 1024) # MB
|
|
agent_cpu = process.cpu_percent(interval=None) # %
|
|
|
|
return {
|
|
"device_id": settings.device_id,
|
|
"timestamp": int(time.time()),
|
|
"cpu_percent": cpu,
|
|
"mem_percent": mem,
|
|
"disk_percent": disk,
|
|
"gpu_percent": gpu,
|
|
"agent_cpu_percent": agent_cpu,
|
|
"agent_mem_mb": agent_mem,
|
|
}
|
|
|
|
|
|
async def main():
|
|
"""Main loop: connect to MQTT broker and periodically publish metrics."""
|
|
client = MQTTClient(settings.device_id)
|
|
|
|
def on_connect(_client, _flags, _rc, _properties):
|
|
log.info("agent connected device=%s", settings.device_id)
|
|
|
|
def on_disconnect(_client, _packet, _exc=None):
|
|
log.warning("agent disconnected device=%s", settings.device_id)
|
|
|
|
client.on_connect = on_connect
|
|
client.on_disconnect = on_disconnect
|
|
|
|
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ssl_ctx.load_verify_locations(settings.ca_cert)
|
|
ssl_ctx.load_cert_chain(certfile=settings.client_cert,
|
|
keyfile=settings.client_key)
|
|
|
|
await client.connect(settings.mqtt_broker, settings.mqtt_port, ssl=ssl_ctx)
|
|
|
|
try:
|
|
while True:
|
|
metrics = collect_metrics()
|
|
|
|
if metrics["cpu_percent"] > settings.agent_cpu_warn:
|
|
log.warning("local CPU warn device=%s cpu=%.1f",
|
|
settings.device_id, metrics["cpu_percent"])
|
|
|
|
log.debug("agent self_cpu=%.1f self_mem_mb=%.2f",
|
|
metrics["agent_cpu_percent"], metrics["agent_mem_mb"])
|
|
|
|
client.publish(TOPIC, json.dumps(metrics), qos=1, retain=False)
|
|
|
|
await asyncio.sleep(settings.interval)
|
|
|
|
except asyncio.CancelledError:
|
|
log.info("agent cancelled device=%s", settings.device_id)
|
|
finally:
|
|
await client.disconnect()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|