core_lib/exec/
metrics.rs

1#![allow(dead_code)]
2use core::f32;
3use std::{collections::HashMap, path::PathBuf, time::Duration};
4
5use chrono::{DateTime, Utc};
6use dirs::home_dir;
7use serde::{Deserialize, Serialize};
8use tokio::{
9    fs::{self},
10    io::AsyncWriteExt,
11    time::sleep,
12};
13
14use crate::log::logger::Logger;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub enum JobStatus {
18    Pending,
19    Running,
20    Succeeded,
21    Failed,
22    Skipped,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct JobMetrics {
27    pub name: String,
28    pub status: JobStatus,
29    pub started_at: DateTime<Utc>,
30    pub finished_at: Option<DateTime<Utc>>,
31    pub duration_ms: Option<u128>,
32    pub cpu_usage: f32,
33    pub mem_usage: f32,
34    pub mem_usage_kb: u64,
35    pub max_cpu: f32,
36    pub max_mem: f32,
37    #[serde(skip)]
38    pub buf: Vec<(f32, u64)>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ExecMetrics {
43    pub project_id: String,
44    pub project_name: String,
45
46    pub started_at: DateTime<Utc>,
47    pub finished_at: Option<DateTime<Utc>>,
48    pub duration_ms: Option<u128>,
49
50    pub cpu_usage: f32,
51    pub mem_usage: f32,
52    pub mem_usage_kb: u64,
53    pub max_cpu: f32,
54    pub max_mem: f32,
55
56    pub jobs: HashMap<String, JobMetrics>,
57
58    #[serde(skip, default = "Logger::placeholder")]
59    pub logger: Logger,
60}
61
62impl ExecMetrics {
63    pub fn new(project_id: &str, project_name: &str, logger: Logger) -> Self {
64        Self {
65            project_id: project_id.to_string(),
66            project_name: project_name.to_string(),
67            started_at: Utc::now(),
68            finished_at: None,
69            duration_ms: None,
70            cpu_usage: 0.0,
71            mem_usage: 0.0,
72            mem_usage_kb: 0,
73            max_cpu: 0.0,
74            max_mem: 0.0,
75            jobs: std::collections::HashMap::new(),
76            logger,
77        }
78    }
79
80    pub fn sys_push(&mut self, name: &str, cpu: f32, mem: u64) {
81        if let Some(j) = self.jobs.get_mut(name) {
82            j.buf.push((cpu, mem));
83        }
84    }
85
86    /// call at the end of the pipeline
87    pub fn finalize(&mut self) {
88        let end = Utc::now();
89        self.finished_at = Some(end);
90        self.duration_ms = Some(
91            end.signed_duration_since(self.started_at)
92                .num_milliseconds() as u128,
93        );
94        let v: Vec<(f32, f32, u64)> = self
95            .jobs
96            .values()
97            .map(|v| (v.cpu_usage, v.mem_usage, v.mem_usage_kb))
98            .collect();
99
100        if !v.is_empty() {
101            let mut sys = sysinfo::System::new_all();
102            sys.refresh_memory();
103            let cpu_sum: f32 = v.iter().map(|(cpu, _, _)| *cpu).sum();
104            let cpu_count = v.len() as f32;
105            let average_cpu = cpu_sum / cpu_count;
106
107            let max_cpu = v
108                .iter()
109                .map(|(cpu, _, _)| *cpu)
110                .fold(0.0_f32, |a, b| a.max(b));
111
112            let mem_usage_kb: Vec<u64> = v.iter().map(|(_, _, mem)| *mem).collect();
113            let avg_mem_usage_kb = mem_usage_kb.iter().sum::<u64>();
114
115            let mem_percentages: Vec<f32> = v.iter().map(|(_, mem, _)| *mem).collect(); // déjà en %
116            let average_mem_percent: f32 =
117                mem_percentages.iter().sum::<f32>() / mem_percentages.len() as f32;
118            let max_mem_percent: f32 = mem_percentages.iter().fold(0.0, |a, b| a.max(*b));
119
120            self.cpu_usage = average_cpu;
121            self.mem_usage_kb = avg_mem_usage_kb;
122            self.mem_usage = average_mem_percent;
123            self.max_cpu = max_cpu;
124            self.max_mem = max_mem_percent;
125        } else {
126            self.cpu_usage = 0.0;
127            self.mem_usage_kb = 0;
128            self.max_cpu = 0.0;
129            self.max_mem = 0.0;
130        }
131    }
132
133    /// start job (just insert it with Running status)
134    pub fn job_started(&mut self, name: &str) {
135        self.jobs.insert(
136            name.to_string(),
137            JobMetrics {
138                name: name.to_string(),
139                status: JobStatus::Running,
140                started_at: Utc::now(),
141                finished_at: None,
142                duration_ms: None,
143                cpu_usage: 0.0,
144                mem_usage: 0.0,
145                mem_usage_kb: 0,
146                max_cpu: 0.0,
147                max_mem: 0.0,
148                buf: Vec::new(),
149            },
150        );
151    }
152
153    pub fn job_finished(&mut self, name: &str, ok: bool) {
154        if let Some(j) = self.jobs.get_mut(name) {
155            if !j.buf.is_empty() {
156                let mut sys = sysinfo::System::new_all();
157                sys.refresh_memory();
158                let total_memory_kb = sys.total_memory() as f32;
159                let cpu_sum: f32 = j.buf.iter().map(|(value, _)| *value).sum();
160                let count = j.buf.len() as f32;
161
162                let average_cpu = cpu_sum / count;
163                let max_cpu = j.buf.iter().map(|(v, _)| *v).fold(0.0_f32, |a, b| a.max(b));
164
165                let mem_usage_kb: Vec<u64> = j.buf.iter().map(|(_, mem)| *mem).collect();
166                let avg_mem_usage_kb = mem_usage_kb.iter().sum::<u64>();
167
168                let mem_percentages: Vec<f32> = j
169                    .buf
170                    .iter()
171                    .map(|(_, mem)| (*mem as f32 / total_memory_kb) * 100.0)
172                    .collect();
173                let average_mem_percent: f32 =
174                    mem_percentages.iter().sum::<f32>() / mem_percentages.len() as f32;
175                let max_mem_percent: f32 = mem_percentages.iter().fold(0.0, |a, b| a.max(*b));
176
177                j.cpu_usage = average_cpu;
178                j.mem_usage_kb = avg_mem_usage_kb;
179                j.mem_usage = average_mem_percent;
180                j.max_mem = max_mem_percent;
181                j.max_cpu = max_cpu;
182            }
183            let end = Utc::now();
184            j.finished_at = Some(end);
185            j.duration_ms =
186                Some(end.signed_duration_since(j.started_at).num_milliseconds() as u128);
187            j.status = if ok {
188                JobStatus::Succeeded
189            } else {
190                JobStatus::Failed
191            }
192        }
193    }
194
195    // -----------------------
196    // Metrics persistance
197    // -----------------------
198
199    pub async fn ensure_metrics_dir() -> anyhow::Result<PathBuf> {
200        let home = home_dir().ok_or_else(|| anyhow::anyhow!("Failed to find HOME directory"))?;
201        let dir = home.join(".fleet").join("metrics");
202        if !fs::try_exists(&dir).await? {
203            fs::create_dir_all(&dir).await?;
204        }
205        Ok(dir)
206    }
207
208    pub fn get_metrics_path_by_id(id: &str) -> anyhow::Result<PathBuf> {
209        let home = home_dir().ok_or_else(|| anyhow::anyhow!("Failed to find HOME directory"))?;
210        let dir = home
211            .join(".fleet")
212            .join("metrics")
213            .join(id.to_owned() + ".ndjson");
214        Ok(dir)
215    }
216
217    pub fn rm_metrics_by_id(id: &str) -> anyhow::Result<()> {
218        let path = ExecMetrics::get_metrics_path_by_id(id)?;
219        if path.exists() {
220            std::fs::remove_file(path)?;
221        }
222        Ok(())
223    }
224
225    pub async fn open_metrics_file(project_id: &str) -> anyhow::Result<tokio::fs::File> {
226        let dir = Self::ensure_metrics_dir().await?;
227        let path = dir.join(format!("{project_id}.ndjson"));
228        let file = tokio::fs::OpenOptions::new()
229            .write(true)
230            .truncate(true)
231            .create(true)
232            .open(path)
233            .await?;
234        Ok(file)
235    }
236
237    pub async fn save(&self) -> anyhow::Result<()> {
238        let mut file = Self::open_metrics_file(&self.project_id).await?;
239        let line = serde_json::to_string(self)?;
240        file.write_all(line.as_bytes()).await?;
241        file.write_all(b"\n").await?;
242        file.flush().await?;
243        Ok(())
244    }
245}
246
247pub async fn monitor_process(pid: u32) -> (f32, u64) {
248    let mut sys = sysinfo::System::new_all();
249    let mut samples_cpu = vec![];
250    let mut max_mem = 0;
251
252    loop {
253        sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
254        if let Some(proc) = sys.process(sysinfo::Pid::from(pid as usize)) {
255            samples_cpu.push(proc.cpu_usage());
256            max_mem = max_mem.max(proc.memory());
257        } else {
258            // Process finished
259            break;
260        }
261        sleep(Duration::from_millis(100)).await;
262    }
263
264    let avg_cpu = if samples_cpu.is_empty() {
265        0.0
266    } else {
267        samples_cpu.iter().sum::<f32>() / samples_cpu.len() as f32
268    };
269
270    (avg_cpu, max_mem)
271}