1#![allow(dead_code)]
2use std::sync::Arc;
3
4use crate::{
5 config::ProjectConfig,
6 core::{
7 id::short_id,
8 manager::get_watch_ctx,
9 state::{AppState, get_id_by_name, get_name_by_id},
10 watcher::{WatchContext, WatchContextBuilder},
11 },
12 daemon::utiles::extract_repo_path,
13 exec::{metrics::ExecMetrics, pipeline::run_pipeline},
14 git::repo::Repo,
15 log::logger::Logger,
16};
17use anyhow::Result;
18use serde::{Deserialize, Serialize};
19use tokio::{
20 fs::{File, OpenOptions},
21 io::{AsyncWriteExt, WriteHalf},
22 net::UnixStream,
23};
24
25#[derive(Serialize, Deserialize, Debug, PartialEq)]
26#[serde(tag = "action")]
27pub enum DaemonRequest {
28 #[serde(rename = "add_watch")]
29 AddWatch {
30 project_dir: String,
31 repo: Box<Repo>,
33 config: Box<ProjectConfig>,
34 },
35
36 #[serde(rename = "run_pipeline")]
37 RunPipeline {
38 id: String,
39 },
40
41 #[serde(rename = "stop_watch")]
42 StopWatch {
43 id: String,
44 },
45
46 #[serde(rename = "up_watch")]
47 UpWatch {
48 id: String,
49 },
50
51 #[serde(rename = "rm_watch")]
52 RmWatch {
53 id: String,
54 },
55
56 #[serde(rename = "list_watch")]
57 ListWatches {
58 all: bool,
59 },
60
61 #[serde(rename = "logs_watch")]
62 LogsWatches {
63 id: String,
64 f: bool,
65 },
66
67 None,
68}
69
70#[derive(Serialize, Deserialize, Debug, PartialEq)]
71pub struct WatchInfo {
72 pub branch: String,
73 pub project_dir: String,
74 pub short_commit: String,
75 pub short_url: String,
76 pub repo_name: String,
77 pub id: String,
78 pub paused: bool,
79}
80
81#[derive(Serialize, Deserialize, Debug, PartialEq)]
82pub enum DaemonResponse {
83 Success(String),
84 Error(String),
85 ListWatches(Vec<WatchInfo>),
86 LogWatch(String, bool),
87 Ignore,
88 None,
89}
90
91pub async fn get_log_file(ctx: &WatchContext) -> Result<File> {
92 let log_path = ctx.log_path();
93 println!("log path => {}", log_path.to_str().unwrap());
94 let log_file = OpenOptions::new()
95 .create(true)
96 .append(true)
97 .open(&log_path)
98 .await?;
99 Ok(log_file)
100}
101
102async fn get_logs_by_id(id: &str) -> Result<String> {
103 let log_path = WatchContext::log_path_by_id(id);
104
105 match log_path.to_str() {
110 Some(p) => Ok(String::from(p)),
111 None => Err(anyhow::anyhow!("Failed to find log path")),
112 }
113}
114
115pub async fn handle_request(
118 req: DaemonRequest,
119 state: Arc<AppState>,
120 stream: &mut WriteHalf<UnixStream>,
121) -> Result<(), anyhow::Error> {
122 let response = match req {
123 DaemonRequest::AddWatch {
124 project_dir,
125 repo,
126 config,
127 } => handle_add_watch(state, project_dir, *repo, *config).await?,
128
129 DaemonRequest::StopWatch { id } => handle_stop_watch(state, id).await,
130
131 DaemonRequest::UpWatch { id } => handle_up_watch(state, id).await,
132
133 DaemonRequest::RmWatch { id } => handle_rm_watch(state, id).await,
134
135 DaemonRequest::ListWatches { all } => handle_list_watches(state, all).await,
136
137 DaemonRequest::LogsWatches { id, f } => handle_logs_watches(id, f).await,
138
139 DaemonRequest::RunPipeline { id } => {
140 handle_run_pipeline(&id, state, stream).await?;
141 DaemonResponse::Ignore
142 }
143 DaemonRequest::None => DaemonResponse::None,
144 };
145
146 if response != DaemonResponse::Ignore {
147 send_response(stream, response).await?;
148 }
149 Ok(())
150}
151
152async fn handle_run_pipeline(
153 id: &str,
154 state: Arc<AppState>,
155 stream: &mut WriteHalf<UnixStream>,
156) -> anyhow::Result<()> {
157 if let Some(ctx) = get_watch_ctx(&state, id).await {
158 send_response(
159 stream,
160 DaemonResponse::Success(format!("Pipeline {id} has been runed")),
161 )
162 .await?;
163 match run_pipeline(Arc::new(ctx)).await {
164 Ok(_) => {
165 println!("[{id}] ✅ Update succeeded");
166 }
167 Err(e) => {
168 eprintln!("[{id}] ❌ Update failed => {e}");
169 }
170 }
171 }
172 Ok(())
173}
174
175async fn handle_add_watch(
178 state: Arc<AppState>,
179 project_dir: String,
180 repo: Repo,
181 config: ProjectConfig,
182) -> anyhow::Result<DaemonResponse> {
183 let mut guard = state.watches.write().await;
184 let existing_id = guard
185 .iter()
186 .find(|(_, ctx)| ctx.project_dir == project_dir)
187 .map(|(id, _ctx)| id.clone());
188
189 let id = existing_id.unwrap_or_else(short_id);
190
191 let ctx = WatchContextBuilder::new(repo, config, project_dir, id.clone())
192 .build()
193 .await?;
194
195 let result = async {
196 let logger = Logger::new(&ctx.log_path()).await?;
197 {
198 guard.retain(|_, existing_ctx| existing_ctx.project_dir != ctx.project_dir);
200 AppState::add_watch(&ctx).await?;
201 guard.insert(id.clone(), ctx);
202 }
203 logger
204 .info(&format!("Project registered with ID : {}", &id))
205 .await?;
206 Ok::<_, anyhow::Error>(())
207 }
208 .await;
209
210 match result {
211 Ok(_) => Ok(DaemonResponse::Success(format!(
212 "📌 Project registered with ID: {id}"
213 ))),
214 Err(e) => Ok(DaemonResponse::Error(format!("Failed to add watch: {e}"))),
215 }
216}
217
218pub async fn handle_stop_watch(state: Arc<AppState>, id: String) -> DaemonResponse {
220 match async {
221 let mut guard = state.watches.write().await;
222 if let Some(w) = guard.get_mut(&id) {
223 w.stop();
224 AppState::add_watch(w).await?;
225 Ok::<_, anyhow::Error>(format!("🛑 Watch stopped for ID: {id}"))
226 } else {
227 Err(anyhow::anyhow!("⚠ ID not found: {}", id))
228 }
229 }
230 .await
231 {
232 Ok(msg) => DaemonResponse::Success(msg),
233 Err(e) => DaemonResponse::Error(format!("Failed to stop watch: {e}")),
234 }
235}
236
237pub async fn handle_up_watch(state: Arc<AppState>, id: String) -> DaemonResponse {
239 match async {
240 let mut guard = state.watches.write().await;
241 if let Some(w) = guard.get_mut(&id) {
242 w.run();
243 AppState::add_watch(w).await?;
244 Ok::<_, anyhow::Error>(format!("🟢 Watch up for ID: {id}"))
245 } else {
246 Err(anyhow::anyhow!("⚠ ID not found: {}", id))
247 }
248 }
249 .await
250 {
251 Ok(msg) => DaemonResponse::Success(msg),
252 Err(e) => DaemonResponse::Error(format!("Failed to stop watch: {e}")),
253 }
254}
255
256pub async fn handle_rm_watch(state: Arc<AppState>, id: String) -> DaemonResponse {
258 match async {
259 let mut guard = state.watches.write().await;
260 if let Some(w) = guard.remove(&id) {
261 ExecMetrics::rm_metrics_by_id(&id)?; Logger::rm_logs_by_id(&id)?; AppState::remove_watch_by_id(&id).await?; Ok::<_, anyhow::Error>(format!("Project: {} was deleted", w.repo.name))
265 } else {
266 Err(anyhow::anyhow!("⚠ ID not found: {}", id))
267 }
268 }
269 .await
270 {
271 Ok(msg) => DaemonResponse::Success(msg),
272 Err(e) => DaemonResponse::Error(format!("Failed to stop watch: {e}")),
273 }
274}
275
276pub async fn handle_list_watches(state: Arc<AppState>, all: bool) -> DaemonResponse {
278 match async {
279 let guard = state.watches.read().await;
280 let result: Result<Vec<WatchInfo>, anyhow::Error> = guard
281 .iter()
282 .filter(|(_, ctx)| all || !ctx.paused) .map(|(id, ctx)| {
284 let short_commit = ctx
285 .repo
286 .branches
287 .last_commit
288 .chars()
289 .take(8)
290 .collect::<String>();
291 let short_url = extract_repo_path(&ctx.repo.remote)?;
292 let short_branch = if ctx.repo.branches.name.len() > 12 {
293 format!("{}...", &ctx.repo.branches.name[..9])
294 } else {
295 ctx.repo.branches.name.clone()
296 };
297 Ok(WatchInfo {
298 branch: short_branch,
299 project_dir: ctx.project_dir.clone(),
300 short_commit,
301 short_url,
302 repo_name: ctx.repo.name.clone(),
303 id: id.clone(),
304 paused: ctx.paused,
305 })
306 })
307 .collect();
308 Ok::<_, anyhow::Error>(DaemonResponse::ListWatches(result?))
309 }
310 .await
311 {
312 Ok(resp) => resp,
313 Err(e) => DaemonResponse::Error(format!("Failed to list watches: {e}")),
314 }
315}
316
317async fn handle_logs_watches(id: String, follow: bool) -> DaemonResponse {
321 match async {
322 let id = match get_name_by_id(&id).await {
323 Ok(Some(_)) => id,
324 Err(_) | Ok(None) => match get_id_by_name(&id).await? {
325 Some(uuid) => uuid,
326 None => anyhow::bail!("No repo with this name exists"),
327 },
328 };
329 let logs = get_logs_by_id(&id).await?;
330 Ok::<_, anyhow::Error>(DaemonResponse::LogWatch(logs, follow))
331 }
332 .await
333 {
334 Ok(resp) => resp,
335 Err(e) => DaemonResponse::Error(format!("Failed to fetch logs: {e}")),
336 }
337}
338
339async fn send_response(
340 stream: &mut WriteHalf<UnixStream>,
341 response: DaemonResponse,
342) -> Result<(), anyhow::Error> {
343 let response_str = serde_json::to_string(&response)? + "\n";
344 stream.write_all(response_str.as_bytes()).await?;
345 stream.flush().await?;
346 Ok(())
347}
348
349async fn send_error_response(
350 stream: &mut WriteHalf<UnixStream>,
351 message: &str,
352) -> Result<(), anyhow::Error> {
353 let response = DaemonResponse::Error(message.to_string());
354 let response_str = serde_json::to_string(&response)? + "\n";
355 stream.write_all(response_str.as_bytes()).await?;
356 stream.flush().await?;
357 Ok(())
358}