1#![allow(dead_code)]
2use std::{os::unix::fs::PermissionsExt, path::Path, sync::Arc, time::Duration};
3
4use tokio::{
5 io::{AsyncBufReadExt, BufReader, split},
6 net::UnixListener,
7 time::interval,
8};
9
10use crate::{
11 core::{
12 id::format_commit,
13 state::AppState,
14 watcher::{WatchContext, watch_once},
15 },
16 daemon::server::{DaemonRequest, handle_request},
17 exec::pipeline::run_pipeline,
18 git::repo::Repo,
19};
20
21#[doc = include_str!("docs/supervisor_loop.md")]
22pub async fn supervisor_loop(state: Arc<AppState>, interval_secs: u64) {
23 let mut ticker = interval(Duration::from_secs(interval_secs));
24
25 loop {
26 ticker.tick().await;
27
28 let to_update = collect_updates(&state).await;
29 let mut dirty = false;
30
31 for (id, _new_commit) in to_update {
32 if let Some(ctx) = get_watch_ctx(&state, &id).await {
34 match run_pipeline(Arc::new(ctx)).await {
35 Ok(_) => {
36 println!("[{id}] ✅ Update succeeded");
37 dirty = true;
38 }
39 Err(e) => {
40 eprintln!("[{id}] ❌ Update failed => {e}");
41 }
42 }
43 }
44 }
45
46 if dirty && let Err(e) = state.save_to_disk().await {
47 eprintln!("❌ Failed to save state: {e}");
48 }
49 }
50}
51
52async fn collect_updates(state: &Arc<AppState>) -> Vec<(String, String)> {
55 let mut to_update = Vec::new();
56 let mut guard = state.watches.write().await;
57
58 for (id, ctx) in guard.iter_mut() {
59 if ctx.paused {
60 continue;
61 }
62
63 match watch_once(&mut ctx.repo) {
65 Ok(Some(new_commit)) => {
66 println!("[{id}] ✔ OK");
67 ctx.logger
68 .info(&format!(
69 "New commit [{}] from branch {}",
70 format_commit(&ctx.repo.branches.last_commit),
71 ctx.repo.branches.last_name
72 ))
73 .await
74 .ok(); Repo::switch_branch(ctx, &ctx.repo.branches.last_name).ok();
77 to_update.push((id.clone(), new_commit));
78 }
79 Ok(None) => {}
80 Err(e) => eprintln!("[{id}] ❌ Watch failed: {e}"),
81 }
82 }
83
84 to_update
85}
86
87async fn update_commit(state: &Arc<AppState>, id: &str, new_commit: String) -> anyhow::Result<()> {
89 let mut watches_write = state.watches.write().await;
90 if let Some(ctx) = watches_write.get_mut(id) {
91 ctx.repo.branches.last_mut()?.last_commit = new_commit;
92 }
93 Ok(())
94}
95
96pub async fn get_watch_ctx(state: &Arc<AppState>, id: &str) -> Option<WatchContext> {
97 let watches_read: tokio::sync::RwLockReadGuard<
98 '_,
99 std::collections::HashMap<String, WatchContext>,
100 > = state.watches.read().await;
101 watches_read.get(id).cloned()
102}
103
104#[doc = include_str!("docs/start_socket_listener.md")]
105pub async fn start_socket_listener(state: Arc<AppState>) -> anyhow::Result<()> {
106 let sock_path = Path::new("/tmp/fleetd.sock");
107 if sock_path.exists() {
108 std::fs::remove_file(sock_path)?;
109 }
110
111 let listener = UnixListener::bind(sock_path)?;
112 std::fs::set_permissions(sock_path, std::fs::Permissions::from_mode(0o666))?;
113
114 println!("🔌 fleetd is listening on {sock_path:?}");
115
116 loop {
117 let (stream, _) = listener.accept().await?;
118
119 let state = Arc::clone(&state);
120 tokio::spawn(async move {
121 let (read_half, mut write_half) = split(stream);
122 let mut reader = BufReader::new(read_half);
123 let mut buf = String::new();
124 if let Err(e) = reader.read_line(&mut buf).await {
125 eprintln!("❌ Failed to read from stream: {e}");
126 return;
127 }
128
129 let parsed: Result<DaemonRequest, _> = serde_json::from_str(&buf);
130 match parsed {
131 Ok(req) => {
132 if let Err(e) = handle_request(req, state, &mut write_half).await {
133 eprintln!("❌ Request handling failed: {e}");
134 }
135 }
136 Err(e) => eprintln!("❌ JSON parsing error: {e}"),
137 }
138 });
139 }
140}