core_lib/daemon/
server.rs

1#![allow(dead_code)]
2use std::sync::Arc;
3
4use crate::{
5    config::ProjectConfig,
6    core::{
7        id::short_id,
8        manager::get_watch_ctx,
9        state::{AppState, get_id_by_name, get_name_by_id},
10        watcher::{WatchContext, WatchContextBuilder},
11    },
12    daemon::utiles::extract_repo_path,
13    exec::{metrics::ExecMetrics, pipeline::run_pipeline},
14    git::repo::Repo,
15    log::logger::Logger,
16};
17use anyhow::Result;
18use serde::{Deserialize, Serialize};
19use tokio::{
20    fs::{File, OpenOptions},
21    io::{AsyncWriteExt, WriteHalf},
22    net::UnixStream,
23};
24
25#[derive(Serialize, Deserialize, Debug, PartialEq)]
26#[serde(tag = "action")]
27pub enum DaemonRequest {
28    #[serde(rename = "add_watch")]
29    AddWatch {
30        project_dir: String,
31        // use Box (clippy)
32        repo: Box<Repo>,
33        config: Box<ProjectConfig>,
34    },
35
36    #[serde(rename = "run_pipeline")]
37    RunPipeline {
38        id: String,
39    },
40
41    #[serde(rename = "stop_watch")]
42    StopWatch {
43        id: String,
44    },
45
46    #[serde(rename = "up_watch")]
47    UpWatch {
48        id: String,
49    },
50
51    #[serde(rename = "rm_watch")]
52    RmWatch {
53        id: String,
54    },
55
56    #[serde(rename = "list_watch")]
57    ListWatches {
58        all: bool,
59    },
60
61    #[serde(rename = "logs_watch")]
62    LogsWatches {
63        id: String,
64        f: bool,
65    },
66
67    None,
68}
69
70#[derive(Serialize, Deserialize, Debug, PartialEq)]
71pub struct WatchInfo {
72    pub branch: String,
73    pub project_dir: String,
74    pub short_commit: String,
75    pub short_url: String,
76    pub repo_name: String,
77    pub id: String,
78    pub paused: bool,
79}
80
81#[derive(Serialize, Deserialize, Debug, PartialEq)]
82pub enum DaemonResponse {
83    Success(String),
84    Error(String),
85    ListWatches(Vec<WatchInfo>),
86    LogWatch(String, bool),
87    Ignore,
88    None,
89}
90
91pub async fn get_log_file(ctx: &WatchContext) -> Result<File> {
92    let log_path = ctx.log_path();
93    println!("log path => {}", log_path.to_str().unwrap());
94    let log_file = OpenOptions::new()
95        .create(true)
96        .append(true)
97        .open(&log_path)
98        .await?;
99    Ok(log_file)
100}
101
102async fn get_logs_by_id(id: &str) -> Result<String> {
103    let log_path = WatchContext::log_path_by_id(id);
104
105    // let file = File::open(&log_path).await?;
106    // let mut reader = BufReader::new(file);
107    // let mut contents = String::new();
108    // reader.read_to_string(&mut contents).await?;
109    match log_path.to_str() {
110        Some(p) => Ok(String::from(p)),
111        None => Err(anyhow::anyhow!("Failed to find log path")),
112    }
113}
114
115/// Handles a daemon request and sends the resulting [`DaemonResponse`] back to the client.
116/// All errors inside handlers are mapped to `DaemonResponse::Error`.
117pub async fn handle_request(
118    req: DaemonRequest,
119    state: Arc<AppState>,
120    stream: &mut WriteHalf<UnixStream>,
121) -> Result<(), anyhow::Error> {
122    let response = match req {
123        DaemonRequest::AddWatch {
124            project_dir,
125            repo,
126            config,
127        } => handle_add_watch(state, project_dir, *repo, *config).await?,
128
129        DaemonRequest::StopWatch { id } => handle_stop_watch(state, id).await,
130
131        DaemonRequest::UpWatch { id } => handle_up_watch(state, id).await,
132
133        DaemonRequest::RmWatch { id } => handle_rm_watch(state, id).await,
134
135        DaemonRequest::ListWatches { all } => handle_list_watches(state, all).await,
136
137        DaemonRequest::LogsWatches { id, f } => handle_logs_watches(id, f).await,
138
139        DaemonRequest::RunPipeline { id } => {
140            handle_run_pipeline(&id, state, stream).await?;
141            DaemonResponse::Ignore
142        }
143        DaemonRequest::None => DaemonResponse::None,
144    };
145
146    if response != DaemonResponse::Ignore {
147        send_response(stream, response).await?;
148    }
149    Ok(())
150}
151
152async fn handle_run_pipeline(
153    id: &str,
154    state: Arc<AppState>,
155    stream: &mut WriteHalf<UnixStream>,
156) -> anyhow::Result<()> {
157    if let Some(ctx) = get_watch_ctx(&state, id).await {
158        send_response(
159            stream,
160            DaemonResponse::Success(format!("Pipeline {id} has been runed")),
161        )
162        .await?;
163        match run_pipeline(Arc::new(ctx)).await {
164            Ok(_) => {
165                println!("[{id}] ✅ Update succeeded");
166            }
167            Err(e) => {
168                eprintln!("[{id}] ❌ Update failed => {e}");
169            }
170        }
171    }
172    Ok(())
173}
174
175/// Registers a new watch, updates the application state, and returns a response.
176/// Any error will be converted to `DaemonResponse::Error`.
177async fn handle_add_watch(
178    state: Arc<AppState>,
179    project_dir: String,
180    repo: Repo,
181    config: ProjectConfig,
182) -> anyhow::Result<DaemonResponse> {
183    let mut guard = state.watches.write().await;
184    let existing_id = guard
185        .iter()
186        .find(|(_, ctx)| ctx.project_dir == project_dir)
187        .map(|(id, _ctx)| id.clone());
188
189    let id = existing_id.unwrap_or_else(short_id);
190
191    let ctx = WatchContextBuilder::new(repo, config, project_dir, id.clone())
192        .build()
193        .await?;
194
195    let result = async {
196        let logger = Logger::new(&ctx.log_path()).await?;
197        {
198            // delete the projects with the same project_dir, before saving the new one
199            guard.retain(|_, existing_ctx| existing_ctx.project_dir != ctx.project_dir);
200            AppState::add_watch(&ctx).await?;
201            guard.insert(id.clone(), ctx);
202        }
203        logger
204            .info(&format!("Project registered with ID : {}", &id))
205            .await?;
206        Ok::<_, anyhow::Error>(())
207    }
208    .await;
209
210    match result {
211        Ok(_) => Ok(DaemonResponse::Success(format!(
212            "📌 Project registered with ID: {id}"
213        ))),
214        Err(e) => Ok(DaemonResponse::Error(format!("Failed to add watch: {e}"))),
215    }
216}
217
218/// Stops a watch by ID if it exists in the application state.
219pub async fn handle_stop_watch(state: Arc<AppState>, id: String) -> DaemonResponse {
220    match async {
221        let mut guard = state.watches.write().await;
222        if let Some(w) = guard.get_mut(&id) {
223            w.stop();
224            AppState::add_watch(w).await?;
225            Ok::<_, anyhow::Error>(format!("🛑 Watch stopped for ID: {id}"))
226        } else {
227            Err(anyhow::anyhow!("⚠ ID not found: {}", id))
228        }
229    }
230    .await
231    {
232        Ok(msg) => DaemonResponse::Success(msg),
233        Err(e) => DaemonResponse::Error(format!("Failed to stop watch: {e}")),
234    }
235}
236
237/// Run a watch by ID if it exists in the application state.
238pub async fn handle_up_watch(state: Arc<AppState>, id: String) -> DaemonResponse {
239    match async {
240        let mut guard = state.watches.write().await;
241        if let Some(w) = guard.get_mut(&id) {
242            w.run();
243            AppState::add_watch(w).await?;
244            Ok::<_, anyhow::Error>(format!("🟢 Watch up for ID: {id}"))
245        } else {
246            Err(anyhow::anyhow!("⚠ ID not found: {}", id))
247        }
248    }
249    .await
250    {
251        Ok(msg) => DaemonResponse::Success(msg),
252        Err(e) => DaemonResponse::Error(format!("Failed to stop watch: {e}")),
253    }
254}
255
256/// Rm a watch by ID if it exists in the application state.
257pub async fn handle_rm_watch(state: Arc<AppState>, id: String) -> DaemonResponse {
258    match async {
259        let mut guard = state.watches.write().await;
260        if let Some(w) = guard.remove(&id) {
261            ExecMetrics::rm_metrics_by_id(&id)?; // remove metrics file
262            Logger::rm_logs_by_id(&id)?; // remove log file 
263            AppState::remove_watch_by_id(&id).await?; // remove this watch in watches.json
264            Ok::<_, anyhow::Error>(format!("Project: {} was deleted", w.repo.name))
265        } else {
266            Err(anyhow::anyhow!("⚠ ID not found: {}", id))
267        }
268    }
269    .await
270    {
271        Ok(msg) => DaemonResponse::Success(msg),
272        Err(e) => DaemonResponse::Error(format!("Failed to stop watch: {e}")),
273    }
274}
275
276/// Returns a list of all current watches as a [`DaemonResponse::ListWatches`].
277pub async fn handle_list_watches(state: Arc<AppState>, all: bool) -> DaemonResponse {
278    match async {
279        let guard = state.watches.read().await;
280        let result: Result<Vec<WatchInfo>, anyhow::Error> = guard
281            .iter()
282            .filter(|(_, ctx)| all || !ctx.paused) // if all = true everything pass, else only if paused is false they can pass
283            .map(|(id, ctx)| {
284                let short_commit = ctx
285                    .repo
286                    .branches
287                    .last_commit
288                    .chars()
289                    .take(8)
290                    .collect::<String>();
291                let short_url = extract_repo_path(&ctx.repo.remote)?;
292                let short_branch = if ctx.repo.branches.name.len() > 12 {
293                    format!("{}...", &ctx.repo.branches.name[..9])
294                } else {
295                    ctx.repo.branches.name.clone()
296                };
297                Ok(WatchInfo {
298                    branch: short_branch,
299                    project_dir: ctx.project_dir.clone(),
300                    short_commit,
301                    short_url,
302                    repo_name: ctx.repo.name.clone(),
303                    id: id.clone(),
304                    paused: ctx.paused,
305                })
306            })
307            .collect();
308        Ok::<_, anyhow::Error>(DaemonResponse::ListWatches(result?))
309    }
310    .await
311    {
312        Ok(resp) => resp,
313        Err(e) => DaemonResponse::Error(format!("Failed to list watches: {e}")),
314    }
315}
316
317/// Fetches logs for a given watch by ID or name.
318/// If the watch is not found, sends an error directly to the client.
319/// Returns `None` if an error was already sent to the stream.
320async fn handle_logs_watches(id: String, follow: bool) -> DaemonResponse {
321    match async {
322        let id = match get_name_by_id(&id).await {
323            Ok(Some(_)) => id,
324            Err(_) | Ok(None) => match get_id_by_name(&id).await? {
325                Some(uuid) => uuid,
326                None => anyhow::bail!("No repo with this name exists"),
327            },
328        };
329        let logs = get_logs_by_id(&id).await?;
330        Ok::<_, anyhow::Error>(DaemonResponse::LogWatch(logs, follow))
331    }
332    .await
333    {
334        Ok(resp) => resp,
335        Err(e) => DaemonResponse::Error(format!("Failed to fetch logs: {e}")),
336    }
337}
338
339async fn send_response(
340    stream: &mut WriteHalf<UnixStream>,
341    response: DaemonResponse,
342) -> Result<(), anyhow::Error> {
343    let response_str = serde_json::to_string(&response)? + "\n";
344    stream.write_all(response_str.as_bytes()).await?;
345    stream.flush().await?;
346    Ok(())
347}
348
349async fn send_error_response(
350    stream: &mut WriteHalf<UnixStream>,
351    message: &str,
352) -> Result<(), anyhow::Error> {
353    let response = DaemonResponse::Error(message.to_string());
354    let response_str = serde_json::to_string(&response)? + "\n";
355    stream.write_all(response_str.as_bytes()).await?;
356    stream.flush().await?;
357    Ok(())
358}