"""Metrics database manager."""
import logging
from python_template_server.db import BaseDatabaseManager
from sqlmodel import Field, Session, SQLModel, col, select
from pi_dashboard.models import DashboardDatabaseConfig, DatabaseAction, SystemMetrics, current_timestamp_int
logger = logging.getLogger(__name__)
# Database table models
[docs]
class SystemMetricsDB(SQLModel, table=True):
"""System metrics table."""
__tablename__ = "metrics"
id: int | None = Field(default=None, primary_key=True)
cpu_usage: float = Field(..., description="CPU usage percentage")
memory_usage: float = Field(..., description="Memory usage percentage")
disk_usage: float = Field(..., description="Disk usage percentage")
uptime: int = Field(..., description="System uptime in seconds")
temperature: float = Field(..., description="System temperature in Celsius")
timestamp: int = Field(..., description="Unix timestamp of when the metrics were collected")
[docs]
@classmethod
def from_system_metrics(cls, system_metrics: SystemMetrics) -> "SystemMetricsDB":
"""Create a SystemMetricsDB instance from a SystemMetrics."""
return cls(
cpu_usage=system_metrics.cpu_usage,
memory_usage=system_metrics.memory_usage,
disk_usage=system_metrics.disk_usage,
uptime=system_metrics.uptime,
temperature=system_metrics.temperature,
timestamp=system_metrics.timestamp,
)
[docs]
def to_system_metrics(self) -> SystemMetrics:
"""Convert the database model to a SystemMetrics."""
return SystemMetrics(
id=self.id,
cpu_usage=self.cpu_usage,
memory_usage=self.memory_usage,
disk_usage=self.disk_usage,
uptime=self.uptime,
temperature=self.temperature,
timestamp=self.timestamp,
)
# Database manager class
[docs]
class MetricsDatabaseManager(BaseDatabaseManager):
"""Manager class for metrics database operations."""
[docs]
def __init__(self, db_config: DashboardDatabaseConfig) -> None:
"""Initialize the MetricsDatabaseManager with the given database configuration."""
self.db_config: DashboardDatabaseConfig
super().__init__(db_config)
@property
def db_url(self) -> str:
"""Get the database URL."""
return self.db_config.db_url(self.db_config.metrics_db_filename) # type: ignore[no-any-return]
[docs]
def is_stale(self, entry: SystemMetrics) -> bool:
"""Return True if the metrics entry is older than the specified metrics lifetime.
:param SystemMetrics entry: The metrics entry to check
:return bool: True if the entry is stale
"""
age = current_timestamp_int() - entry.timestamp
return age >= self.db_config.metrics_lifetime_days * 86400
def _get_all_system_metrics_entries(self, session: Session) -> list[SystemMetrics]:
"""Retrieve all metrics entries from the database."""
statement = select(SystemMetricsDB).order_by(col(SystemMetricsDB.timestamp).desc())
metrics_entries_db = session.exec(statement).all()
return [metrics_db.to_system_metrics() for metrics_db in metrics_entries_db]
def _get_system_metrics_by_id(self, session: Session, metrics_id: int) -> SystemMetricsDB | None:
"""Retrieve a SystemMetricsDB by its ID."""
statement = select(SystemMetricsDB).where(SystemMetricsDB.id == metrics_id)
return session.exec(statement).first()
def _get_system_metrics_by_timestamp(self, session: Session, timestamp: int) -> SystemMetricsDB | None:
"""Retrieve a SystemMetricsDB by its timestamp."""
statement = select(SystemMetricsDB).where(SystemMetricsDB.timestamp == timestamp)
return session.exec(statement).first()
def _create_system_metrics_entry(self, session: Session, system_metrics: SystemMetrics) -> int | None:
"""Add a new metrics entry to the database."""
if existing_entry := self._get_system_metrics_by_timestamp(session=session, timestamp=system_metrics.timestamp):
logger.warning(
"Metrics entry with timestamp %d already exists, skipping creation", system_metrics.timestamp
)
return existing_entry.id
metrics_db = SystemMetricsDB.from_system_metrics(system_metrics=system_metrics)
session.add(metrics_db)
session.commit()
session.refresh(metrics_db)
return metrics_db.id
def _delete_system_metrics_entry(self, session: Session, metrics_db: SystemMetricsDB) -> int | None:
"""Delete a metrics entry from the database."""
session.delete(metrics_db)
session.commit()
return metrics_db.id
[docs]
def get_all_system_metrics_entries(self) -> list[SystemMetrics]:
"""Public method to retrieve all metrics entries."""
with Session(self.engine) as session:
return self._get_all_system_metrics_entries(session)
[docs]
def get_system_metrics_entries_since(self, last_n_seconds: int, max_data_points: int) -> list[SystemMetrics]:
"""Get metrics entries from the last N seconds with adaptive downsampling."""
with Session(self.engine) as session:
cutoff_time = current_timestamp_int() - last_n_seconds
statement = (
select(SystemMetricsDB)
.where(col(SystemMetricsDB.timestamp) >= cutoff_time)
.order_by(col(SystemMetricsDB.timestamp).asc())
)
metrics_entries_db = session.exec(statement).all()
# Adaptive downsampling if there are more entries than max_data_points
if len(metrics_entries_db) > max_data_points:
step = len(metrics_entries_db) // max_data_points
metrics_entries_db = metrics_entries_db[::step]
return [metrics_db.to_system_metrics() for metrics_db in metrics_entries_db]
[docs]
def cleanup_old_system_metrics(self) -> None:
"""Delete metrics entries that are older than the specified metrics lifetime."""
with Session(self.engine) as session:
all_metrics = self._get_all_system_metrics_entries(session=session)
stale_metrics = [m for m in all_metrics if self.is_stale(m)]
deleted_count = 0
for metrics in stale_metrics:
metrics_db = self._get_system_metrics_by_id(session=session, metrics_id=metrics.id) # type: ignore[arg-type]
if metrics_db:
self._delete_system_metrics_entry(session=session, metrics_db=metrics_db)
deleted_count += 1
if deleted_count > 0:
logger.info("Deleted %d stale metrics entries", deleted_count)