core_lib/exec/
pipeline.rs1use std::{
2 collections::{HashMap, VecDeque},
3 sync::Arc,
4};
5
6use anyhow::Result;
7use tokio::sync::Mutex;
8
9use crate::{
10 config::parser::check_dependency_graph,
11 core::watcher::WatchContext,
12 exec::{
13 PipeRegistry,
14 metrics::ExecMetrics,
15 runner::{JobNode, build_dependency_graph, run_step},
16 },
17 notifications::sender::{discord_send_failure, discord_send_succes},
18};
19
20pub async fn run_pipeline(ctx: Arc<WatchContext>) -> Result<()> {
21 let metrics = Arc::new(tokio::sync::Mutex::new(ExecMetrics::new(
22 &ctx.id,
23 &ctx.repo.name,
24 ctx.logger.clone(),
25 )));
26
27 let pipe_registry = Arc::new(Mutex::new(PipeRegistry {
28 pipes_register: HashMap::new(),
29 }));
30
31 check_dependency_graph(&ctx.config)?;
32 let graph_map = build_dependency_graph(&ctx.config)?;
33 let graph: Arc<Mutex<HashMap<String, JobNode>>> = Arc::new(Mutex::new(graph_map));
34
35 let ready_queue = Arc::new(Mutex::new(VecDeque::new()));
36 initialize_ready_queue(&graph, &ready_queue).await;
37
38 loop {
39 let ready_jobs = drain_ready_queue(&ready_queue).await;
40 if ready_jobs.is_empty() {
41 break;
42 }
43
44 let handles: Vec<_> = ready_jobs
46 .into_iter()
47 .map(|job_name| {
48 let graph_clone = Arc::clone(&graph);
49 let ready_clone = Arc::clone(&ready_queue);
50 let ctx_clone = Arc::clone(&ctx);
51 let metrics_clone = Arc::clone(&metrics);
52 let pipe_registry_clone = Arc::clone(&pipe_registry);
53
54 tokio::spawn(run_job(
55 job_name,
56 graph_clone,
57 ready_clone,
58 ctx_clone,
59 metrics_clone,
60 pipe_registry_clone,
61 ))
62 })
63 .collect();
64
65 wait_jobs(handles, &metrics, &ctx).await?;
66 }
67
68 finalize_pipeline(&metrics, &ctx).await?;
69 Ok(())
70}
71
72async fn initialize_ready_queue(
74 graph: &Arc<Mutex<HashMap<String, JobNode>>>,
75 ready_queue: &Arc<Mutex<VecDeque<String>>>,
76) {
77 let graph_guard = graph.lock().await;
78 let mut ready_guard = ready_queue.lock().await;
79 for (name, node) in graph_guard.iter() {
80 if node.remaining_dependencies == 0 {
81 ready_guard.push_back(name.clone());
82 }
83 }
84}
85
86async fn drain_ready_queue(ready_queue: &Arc<Mutex<VecDeque<String>>>) -> Vec<String> {
88 let mut queue = ready_queue.lock().await;
89 queue.drain(..).collect()
90}
91
92async fn run_job(
94 job_name: String,
95 graph: Arc<Mutex<HashMap<String, JobNode>>>,
96 ready_queue: Arc<Mutex<VecDeque<String>>>,
97 ctx: Arc<WatchContext>,
98 metrics: Arc<Mutex<ExecMetrics>>,
99 pipe_registry: Arc<Mutex<PipeRegistry>>,
100) -> Result<bool> {
101 let (job_arc, dependents) = {
102 let g = graph.lock().await;
103 let node = g.get(&job_name).unwrap();
104 (Arc::clone(&node.job), node.dependents.clone())
105 };
106
107 {
109 let mut m = metrics.lock().await;
110 m.job_started(&job_name);
111 }
112 ctx.logger.job_start(&job_name).await?;
113
114 let output_strategy = ctx.config.drop_strategy(&job_name, &ctx)?;
115 for step in &job_arc.steps {
116 if let Err(e) = run_step(
117 &ctx,
118 step,
119 &job_arc.env,
120 &output_strategy,
121 Arc::clone(&pipe_registry),
122 )
123 .await
124 {
125 handle_job_failure(&ctx, &metrics, &job_name, e).await?;
126 return Err(anyhow::anyhow!("Job failed: {job_name}"));
127 }
128 }
129
130 {
132 let mut m = metrics.lock().await;
133 m.job_finished(&job_name, true);
134 }
135
136 ctx.logger
137 .info(&format!("Job {job_name} succeeded"))
138 .await?;
139 update_dependents(&graph, &ready_queue, &dependents).await;
140 ctx.logger.job_end(&job_name).await?;
141 Ok(true)
142}
143
144async fn update_dependents(
145 graph: &Arc<Mutex<HashMap<String, JobNode>>>,
146 ready_queue: &Arc<Mutex<VecDeque<String>>>,
147 dependents: &[String],
148) {
149 let mut g = graph.lock().await;
150 for dep_name in dependents {
151 let dep_node = g.get_mut(dep_name).unwrap();
152 if dep_node.remaining_dependencies != usize::MAX {
153 dep_node.remaining_dependencies -= 1;
154 if dep_node.remaining_dependencies == 0 {
155 ready_queue.lock().await.push_back(dep_name.clone());
156 }
157 }
158 }
159}
160
161async fn handle_job_failure(
163 ctx: &Arc<WatchContext>,
164 metrics: &Arc<Mutex<ExecMetrics>>,
165 job_name: &str,
166 error: anyhow::Error,
167) -> Result<()> {
168 let mut m = metrics.lock().await;
169 m.job_finished(job_name, false);
170
171 let need_notif_on_failure = ctx
172 .config
173 .pipeline
174 .notifications
175 .as_ref()
176 .map(|notif| notif.on.contains(&"failure".to_string()))
177 .unwrap_or(false);
178
179 if need_notif_on_failure {
180 let err = error.to_string();
181 let lines: Vec<&str> = err.lines().collect();
182 let first_line = lines.first().unwrap_or(&"");
183 let second_line = lines.get(1).unwrap_or(&"");
184 discord_send_failure(
185 ctx,
186 &format!(
187 "**Job** `{job_name}` **failed**
188 {first_line}
189 **Error:** `{second_line}`",
190 ),
191 &m,
192 )
193 .await?;
194 }
195
196 ctx.logger.error(&format!("Job {job_name} failed")).await?;
197 Ok(())
198}
199
200async fn wait_jobs(
201 handles: Vec<tokio::task::JoinHandle<Result<bool, anyhow::Error>>>,
202 metrics: &Arc<Mutex<ExecMetrics>>,
203 ctx: &Arc<WatchContext>,
204) -> Result<()> {
205 for h in handles {
206 match h.await {
207 Ok(inner) => match inner {
208 Ok(_) => {}
209 Err(e) => {
210 let mut m = metrics.lock().await;
211 m.finalize();
212 m.save().await.ok();
213 ctx.logger.error(&format!("Pipeline failed: {e}")).await?;
214 return Err(anyhow::anyhow!("Pipeline failed: {e}"));
215 }
216 },
217 Err(e) => {
218 let mut m = metrics.lock().await;
219 m.finalize();
220 m.save().await.ok();
221 ctx.logger.error(&format!("Pipeline failed: {e}")).await?;
222 return Err(anyhow::anyhow!("Pipeline failed: {e}"));
223 }
224 }
225 }
226 Ok(())
227}
228
229async fn finalize_pipeline(
230 metrics: &Arc<Mutex<ExecMetrics>>,
231 ctx: &Arc<WatchContext>,
232) -> Result<()> {
233 let mut m = metrics.lock().await;
234 m.finalize();
235 m.save().await?;
236
237 let need_notif_on_success = ctx
238 .config
239 .pipeline
240 .notifications
241 .as_ref()
242 .map(|notif| notif.on.contains(&"success".to_string()))
243 .unwrap_or(false);
244
245 if need_notif_on_success {
246 discord_send_succes(ctx, &m).await?;
247 }
248
249 Ok(())
250}