1#![allow(dead_code)]
2use core::f32;
3use std::{collections::HashMap, path::PathBuf, time::Duration};
4
5use chrono::{DateTime, Utc};
6use dirs::home_dir;
7use serde::{Deserialize, Serialize};
8use tokio::{
9 fs::{self},
10 io::AsyncWriteExt,
11 time::sleep,
12};
13
14use crate::log::logger::Logger;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
17pub enum JobStatus {
18 Pending,
19 Running,
20 Succeeded,
21 Failed,
22 Skipped,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct JobMetrics {
27 pub name: String,
28 pub status: JobStatus,
29 pub started_at: DateTime<Utc>,
30 pub finished_at: Option<DateTime<Utc>>,
31 pub duration_ms: Option<u128>,
32 pub cpu_usage: f32,
33 pub mem_usage: f32,
34 pub mem_usage_kb: u64,
35 pub max_cpu: f32,
36 pub max_mem: f32,
37 #[serde(skip)]
38 pub buf: Vec<(f32, u64)>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ExecMetrics {
43 pub project_id: String,
44 pub project_name: String,
45
46 pub started_at: DateTime<Utc>,
47 pub finished_at: Option<DateTime<Utc>>,
48 pub duration_ms: Option<u128>,
49
50 pub cpu_usage: f32,
51 pub mem_usage: f32,
52 pub mem_usage_kb: u64,
53 pub max_cpu: f32,
54 pub max_mem: f32,
55
56 pub jobs: HashMap<String, JobMetrics>,
57
58 #[serde(skip, default = "Logger::placeholder")]
59 pub logger: Logger,
60}
61
62impl ExecMetrics {
63 pub fn new(project_id: &str, project_name: &str, logger: Logger) -> Self {
64 Self {
65 project_id: project_id.to_string(),
66 project_name: project_name.to_string(),
67 started_at: Utc::now(),
68 finished_at: None,
69 duration_ms: None,
70 cpu_usage: 0.0,
71 mem_usage: 0.0,
72 mem_usage_kb: 0,
73 max_cpu: 0.0,
74 max_mem: 0.0,
75 jobs: std::collections::HashMap::new(),
76 logger,
77 }
78 }
79
80 pub fn sys_push(&mut self, name: &str, cpu: f32, mem: u64) {
81 if let Some(j) = self.jobs.get_mut(name) {
82 j.buf.push((cpu, mem));
83 }
84 }
85
86 pub fn finalize(&mut self) {
88 let end = Utc::now();
89 self.finished_at = Some(end);
90 self.duration_ms = Some(
91 end.signed_duration_since(self.started_at)
92 .num_milliseconds() as u128,
93 );
94 let v: Vec<(f32, f32, u64)> = self
95 .jobs
96 .values()
97 .map(|v| (v.cpu_usage, v.mem_usage, v.mem_usage_kb))
98 .collect();
99
100 if !v.is_empty() {
101 let mut sys = sysinfo::System::new_all();
102 sys.refresh_memory();
103 let cpu_sum: f32 = v.iter().map(|(cpu, _, _)| *cpu).sum();
104 let cpu_count = v.len() as f32;
105 let average_cpu = cpu_sum / cpu_count;
106
107 let max_cpu = v
108 .iter()
109 .map(|(cpu, _, _)| *cpu)
110 .fold(0.0_f32, |a, b| a.max(b));
111
112 let mem_usage_kb: Vec<u64> = v.iter().map(|(_, _, mem)| *mem).collect();
113 let avg_mem_usage_kb = mem_usage_kb.iter().sum::<u64>();
114
115 let mem_percentages: Vec<f32> = v.iter().map(|(_, mem, _)| *mem).collect(); let average_mem_percent: f32 =
117 mem_percentages.iter().sum::<f32>() / mem_percentages.len() as f32;
118 let max_mem_percent: f32 = mem_percentages.iter().fold(0.0, |a, b| a.max(*b));
119
120 self.cpu_usage = average_cpu;
121 self.mem_usage_kb = avg_mem_usage_kb;
122 self.mem_usage = average_mem_percent;
123 self.max_cpu = max_cpu;
124 self.max_mem = max_mem_percent;
125 } else {
126 self.cpu_usage = 0.0;
127 self.mem_usage_kb = 0;
128 self.max_cpu = 0.0;
129 self.max_mem = 0.0;
130 }
131 }
132
133 pub fn job_started(&mut self, name: &str) {
135 self.jobs.insert(
136 name.to_string(),
137 JobMetrics {
138 name: name.to_string(),
139 status: JobStatus::Running,
140 started_at: Utc::now(),
141 finished_at: None,
142 duration_ms: None,
143 cpu_usage: 0.0,
144 mem_usage: 0.0,
145 mem_usage_kb: 0,
146 max_cpu: 0.0,
147 max_mem: 0.0,
148 buf: Vec::new(),
149 },
150 );
151 }
152
153 pub fn job_finished(&mut self, name: &str, ok: bool) {
154 if let Some(j) = self.jobs.get_mut(name) {
155 if !j.buf.is_empty() {
156 let mut sys = sysinfo::System::new_all();
157 sys.refresh_memory();
158 let total_memory_kb = sys.total_memory() as f32;
159 let cpu_sum: f32 = j.buf.iter().map(|(value, _)| *value).sum();
160 let count = j.buf.len() as f32;
161
162 let average_cpu = cpu_sum / count;
163 let max_cpu = j.buf.iter().map(|(v, _)| *v).fold(0.0_f32, |a, b| a.max(b));
164
165 let mem_usage_kb: Vec<u64> = j.buf.iter().map(|(_, mem)| *mem).collect();
166 let avg_mem_usage_kb = mem_usage_kb.iter().sum::<u64>();
167
168 let mem_percentages: Vec<f32> = j
169 .buf
170 .iter()
171 .map(|(_, mem)| (*mem as f32 / total_memory_kb) * 100.0)
172 .collect();
173 let average_mem_percent: f32 =
174 mem_percentages.iter().sum::<f32>() / mem_percentages.len() as f32;
175 let max_mem_percent: f32 = mem_percentages.iter().fold(0.0, |a, b| a.max(*b));
176
177 j.cpu_usage = average_cpu;
178 j.mem_usage_kb = avg_mem_usage_kb;
179 j.mem_usage = average_mem_percent;
180 j.max_mem = max_mem_percent;
181 j.max_cpu = max_cpu;
182 }
183 let end = Utc::now();
184 j.finished_at = Some(end);
185 j.duration_ms =
186 Some(end.signed_duration_since(j.started_at).num_milliseconds() as u128);
187 j.status = if ok {
188 JobStatus::Succeeded
189 } else {
190 JobStatus::Failed
191 }
192 }
193 }
194
195 pub async fn ensure_metrics_dir() -> anyhow::Result<PathBuf> {
200 let home = home_dir().ok_or_else(|| anyhow::anyhow!("Failed to find HOME directory"))?;
201 let dir = home.join(".fleet").join("metrics");
202 if !fs::try_exists(&dir).await? {
203 fs::create_dir_all(&dir).await?;
204 }
205 Ok(dir)
206 }
207
208 pub fn get_metrics_path_by_id(id: &str) -> anyhow::Result<PathBuf> {
209 let home = home_dir().ok_or_else(|| anyhow::anyhow!("Failed to find HOME directory"))?;
210 let dir = home
211 .join(".fleet")
212 .join("metrics")
213 .join(id.to_owned() + ".ndjson");
214 Ok(dir)
215 }
216
217 pub fn rm_metrics_by_id(id: &str) -> anyhow::Result<()> {
218 let path = ExecMetrics::get_metrics_path_by_id(id)?;
219 if path.exists() {
220 std::fs::remove_file(path)?;
221 }
222 Ok(())
223 }
224
225 pub async fn open_metrics_file(project_id: &str) -> anyhow::Result<tokio::fs::File> {
226 let dir = Self::ensure_metrics_dir().await?;
227 let path = dir.join(format!("{project_id}.ndjson"));
228 let file = tokio::fs::OpenOptions::new()
229 .write(true)
230 .truncate(true)
231 .create(true)
232 .open(path)
233 .await?;
234 Ok(file)
235 }
236
237 pub async fn save(&self) -> anyhow::Result<()> {
238 let mut file = Self::open_metrics_file(&self.project_id).await?;
239 let line = serde_json::to_string(self)?;
240 file.write_all(line.as_bytes()).await?;
241 file.write_all(b"\n").await?;
242 file.flush().await?;
243 Ok(())
244 }
245}
246
247pub async fn monitor_process(pid: u32) -> (f32, u64) {
248 let mut sys = sysinfo::System::new_all();
249 let mut samples_cpu = vec![];
250 let mut max_mem = 0;
251
252 loop {
253 sys.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
254 if let Some(proc) = sys.process(sysinfo::Pid::from(pid as usize)) {
255 samples_cpu.push(proc.cpu_usage());
256 max_mem = max_mem.max(proc.memory());
257 } else {
258 break;
260 }
261 sleep(Duration::from_millis(100)).await;
262 }
263
264 let avg_cpu = if samples_cpu.is_empty() {
265 0.0
266 } else {
267 samples_cpu.iter().sum::<f32>() / samples_cpu.len() as f32
268 };
269
270 (avg_cpu, max_mem)
271}