core_lib/core/
manager.rs

1#![allow(dead_code)]
2use std::{os::unix::fs::PermissionsExt, path::Path, sync::Arc, time::Duration};
3
4use tokio::{
5    io::{AsyncBufReadExt, BufReader, split},
6    net::UnixListener,
7    time::interval,
8};
9
10use crate::{
11    core::{
12        id::format_commit,
13        state::AppState,
14        watcher::{WatchContext, watch_once},
15    },
16    daemon::server::{DaemonRequest, handle_request},
17    exec::pipeline::run_pipeline,
18    git::repo::Repo,
19};
20
21#[doc = include_str!("docs/supervisor_loop.md")]
22pub async fn supervisor_loop(state: Arc<AppState>, interval_secs: u64) {
23    let mut ticker = interval(Duration::from_secs(interval_secs));
24
25    loop {
26        ticker.tick().await;
27
28        let to_update = collect_updates(&state).await;
29        let mut dirty = false;
30
31        for (id, _new_commit) in to_update {
32            // update_commit(&state, &id, new_commit.clone()).await;
33            if let Some(ctx) = get_watch_ctx(&state, &id).await {
34                match run_pipeline(Arc::new(ctx)).await {
35                    Ok(_) => {
36                        println!("[{id}] ✅ Update succeeded");
37                        dirty = true;
38                    }
39                    Err(e) => {
40                        eprintln!("[{id}] ❌ Update failed => {e}");
41                    }
42                }
43            }
44        }
45
46        if dirty && let Err(e) = state.save_to_disk().await {
47            eprintln!("❌ Failed to save state: {e}");
48        }
49    }
50}
51
52/// Loop through the watches, call `watch_once` on each one,
53/// return the (id, new_commit) to update.
54async fn collect_updates(state: &Arc<AppState>) -> Vec<(String, String)> {
55    let mut to_update = Vec::new();
56    let mut guard = state.watches.write().await;
57
58    for (id, ctx) in guard.iter_mut() {
59        if ctx.paused {
60            continue;
61        }
62
63        // TODO: take Branch in arg for watch_once() branch has to be stored in ctx.config.branches
64        match watch_once(&mut ctx.repo) {
65            Ok(Some(new_commit)) => {
66                println!("[{id}] ✔ OK");
67                ctx.logger
68                    .info(&format!(
69                        "New commit [{}] from branch {}",
70                        format_commit(&ctx.repo.branches.last_commit),
71                        ctx.repo.branches.last_name
72                    ))
73                    .await
74                    .ok(); // ignore log fail
75                //for the moment i ignore the error but in the futur i have to handle it correctly
76                Repo::switch_branch(ctx, &ctx.repo.branches.last_name).ok();
77                to_update.push((id.clone(), new_commit));
78            }
79            Ok(None) => {}
80            Err(e) => eprintln!("[{id}] ❌ Watch failed: {e}"),
81        }
82    }
83
84    to_update
85}
86
87/// Updates the commit stored in the state for a given watch.
88async fn update_commit(state: &Arc<AppState>, id: &str, new_commit: String) -> anyhow::Result<()> {
89    let mut watches_write = state.watches.write().await;
90    if let Some(ctx) = watches_write.get_mut(id) {
91        ctx.repo.branches.last_mut()?.last_commit = new_commit;
92    }
93    Ok(())
94}
95
96pub async fn get_watch_ctx(state: &Arc<AppState>, id: &str) -> Option<WatchContext> {
97    let watches_read: tokio::sync::RwLockReadGuard<
98        '_,
99        std::collections::HashMap<String, WatchContext>,
100    > = state.watches.read().await;
101    watches_read.get(id).cloned()
102}
103
104#[doc = include_str!("docs/start_socket_listener.md")]
105pub async fn start_socket_listener(state: Arc<AppState>) -> anyhow::Result<()> {
106    let sock_path = Path::new("/tmp/fleetd.sock");
107    if sock_path.exists() {
108        std::fs::remove_file(sock_path)?;
109    }
110
111    let listener = UnixListener::bind(sock_path)?;
112    std::fs::set_permissions(sock_path, std::fs::Permissions::from_mode(0o666))?;
113
114    println!("🔌 fleetd is listening on {sock_path:?}");
115
116    loop {
117        let (stream, _) = listener.accept().await?;
118
119        let state = Arc::clone(&state);
120        tokio::spawn(async move {
121            let (read_half, mut write_half) = split(stream);
122            let mut reader = BufReader::new(read_half);
123            let mut buf = String::new();
124            if let Err(e) = reader.read_line(&mut buf).await {
125                eprintln!("❌ Failed to read from stream: {e}");
126                return;
127            }
128
129            let parsed: Result<DaemonRequest, _> = serde_json::from_str(&buf);
130            match parsed {
131                Ok(req) => {
132                    if let Err(e) = handle_request(req, state, &mut write_half).await {
133                        eprintln!("❌ Request handling failed: {e}");
134                    }
135                }
136                Err(e) => eprintln!("❌ JSON parsing error: {e}"),
137            }
138        });
139    }
140}