core_lib/exec/
pipeline.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    sync::Arc,
4};
5
6use anyhow::Result;
7use tokio::sync::Mutex;
8
9use crate::{
10    config::parser::check_dependency_graph,
11    core::watcher::WatchContext,
12    exec::{
13        PipeRegistry,
14        metrics::ExecMetrics,
15        runner::{JobNode, build_dependency_graph, run_step},
16    },
17    notifications::sender::{discord_send_failure, discord_send_succes},
18};
19
20pub async fn run_pipeline(ctx: Arc<WatchContext>) -> Result<()> {
21    let metrics = Arc::new(tokio::sync::Mutex::new(ExecMetrics::new(
22        &ctx.id,
23        &ctx.repo.name,
24        ctx.logger.clone(),
25    )));
26
27    let pipe_registry = Arc::new(Mutex::new(PipeRegistry {
28        pipes_register: HashMap::new(),
29    }));
30
31    check_dependency_graph(&ctx.config)?;
32    let graph_map = build_dependency_graph(&ctx.config)?;
33    let graph: Arc<Mutex<HashMap<String, JobNode>>> = Arc::new(Mutex::new(graph_map));
34
35    let ready_queue = Arc::new(Mutex::new(VecDeque::new()));
36    initialize_ready_queue(&graph, &ready_queue).await;
37
38    loop {
39        let ready_jobs = drain_ready_queue(&ready_queue).await;
40        if ready_jobs.is_empty() {
41            break;
42        }
43
44        // Parallel execution of ready jobs
45        let handles: Vec<_> = ready_jobs
46            .into_iter()
47            .map(|job_name| {
48                let graph_clone = Arc::clone(&graph);
49                let ready_clone = Arc::clone(&ready_queue);
50                let ctx_clone = Arc::clone(&ctx);
51                let metrics_clone = Arc::clone(&metrics);
52                let pipe_registry_clone = Arc::clone(&pipe_registry);
53
54                tokio::spawn(run_job(
55                    job_name,
56                    graph_clone,
57                    ready_clone,
58                    ctx_clone,
59                    metrics_clone,
60                    pipe_registry_clone,
61                ))
62            })
63            .collect();
64
65        wait_jobs(handles, &metrics, &ctx).await?;
66    }
67
68    finalize_pipeline(&metrics, &ctx).await?;
69    Ok(())
70}
71
72/// Init job queue with job ready to execute based on graph
73async fn initialize_ready_queue(
74    graph: &Arc<Mutex<HashMap<String, JobNode>>>,
75    ready_queue: &Arc<Mutex<VecDeque<String>>>,
76) {
77    let graph_guard = graph.lock().await;
78    let mut ready_guard = ready_queue.lock().await;
79    for (name, node) in graph_guard.iter() {
80        if node.remaining_dependencies == 0 {
81            ready_guard.push_back(name.clone());
82        }
83    }
84}
85
86/// Empty and retrieve ready jobs from the queue
87async fn drain_ready_queue(ready_queue: &Arc<Mutex<VecDeque<String>>>) -> Vec<String> {
88    let mut queue = ready_queue.lock().await;
89    queue.drain(..).collect()
90}
91
92/// Runs a single job with step and dependency management
93async fn run_job(
94    job_name: String,
95    graph: Arc<Mutex<HashMap<String, JobNode>>>,
96    ready_queue: Arc<Mutex<VecDeque<String>>>,
97    ctx: Arc<WatchContext>,
98    metrics: Arc<Mutex<ExecMetrics>>,
99    pipe_registry: Arc<Mutex<PipeRegistry>>,
100) -> Result<bool> {
101    let (job_arc, dependents) = {
102        let g = graph.lock().await;
103        let node = g.get(&job_name).unwrap();
104        (Arc::clone(&node.job), node.dependents.clone())
105    };
106
107    // init job metrics
108    {
109        let mut m = metrics.lock().await;
110        m.job_started(&job_name);
111    }
112    ctx.logger.job_start(&job_name).await?;
113
114    let output_strategy = ctx.config.drop_strategy(&job_name, &ctx)?;
115    for step in &job_arc.steps {
116        if let Err(e) = run_step(
117            &ctx,
118            step,
119            &job_arc.env,
120            &output_strategy,
121            Arc::clone(&pipe_registry),
122        )
123        .await
124        {
125            handle_job_failure(&ctx, &metrics, &job_name, e).await?;
126            return Err(anyhow::anyhow!("Job failed: {job_name}"));
127        }
128    }
129
130    // set job  as finished in metrics
131    {
132        let mut m = metrics.lock().await;
133        m.job_finished(&job_name, true);
134    }
135
136    ctx.logger
137        .info(&format!("Job {job_name} succeeded"))
138        .await?;
139    update_dependents(&graph, &ready_queue, &dependents).await;
140    ctx.logger.job_end(&job_name).await?;
141    Ok(true)
142}
143
144async fn update_dependents(
145    graph: &Arc<Mutex<HashMap<String, JobNode>>>,
146    ready_queue: &Arc<Mutex<VecDeque<String>>>,
147    dependents: &[String],
148) {
149    let mut g = graph.lock().await;
150    for dep_name in dependents {
151        let dep_node = g.get_mut(dep_name).unwrap();
152        if dep_node.remaining_dependencies != usize::MAX {
153            dep_node.remaining_dependencies -= 1;
154            if dep_node.remaining_dependencies == 0 {
155                ready_queue.lock().await.push_back(dep_name.clone());
156            }
157        }
158    }
159}
160
161/// Manage job failure (log, métrics, notifications)
162async fn handle_job_failure(
163    ctx: &Arc<WatchContext>,
164    metrics: &Arc<Mutex<ExecMetrics>>,
165    job_name: &str,
166    error: anyhow::Error,
167) -> Result<()> {
168    let mut m = metrics.lock().await;
169    m.job_finished(job_name, false);
170
171    let need_notif_on_failure = ctx
172        .config
173        .pipeline
174        .notifications
175        .as_ref()
176        .map(|notif| notif.on.contains(&"failure".to_string()))
177        .unwrap_or(false);
178
179    if need_notif_on_failure {
180        let err = error.to_string();
181        let lines: Vec<&str> = err.lines().collect();
182        let first_line = lines.first().unwrap_or(&"");
183        let second_line = lines.get(1).unwrap_or(&"");
184        discord_send_failure(
185            ctx,
186            &format!(
187                "**Job** `{job_name}` **failed**
188                {first_line}
189                **Error:** `{second_line}`",
190            ),
191            &m,
192        )
193        .await?;
194    }
195
196    ctx.logger.error(&format!("Job {job_name} failed")).await?;
197    Ok(())
198}
199
200async fn wait_jobs(
201    handles: Vec<tokio::task::JoinHandle<Result<bool, anyhow::Error>>>,
202    metrics: &Arc<Mutex<ExecMetrics>>,
203    ctx: &Arc<WatchContext>,
204) -> Result<()> {
205    for h in handles {
206        match h.await {
207            Ok(inner) => match inner {
208                Ok(_) => {}
209                Err(e) => {
210                    let mut m = metrics.lock().await;
211                    m.finalize();
212                    m.save().await.ok();
213                    ctx.logger.error(&format!("Pipeline failed: {e}")).await?;
214                    return Err(anyhow::anyhow!("Pipeline failed: {e}"));
215                }
216            },
217            Err(e) => {
218                let mut m = metrics.lock().await;
219                m.finalize();
220                m.save().await.ok();
221                ctx.logger.error(&format!("Pipeline failed: {e}")).await?;
222                return Err(anyhow::anyhow!("Pipeline failed: {e}"));
223            }
224        }
225    }
226    Ok(())
227}
228
229async fn finalize_pipeline(
230    metrics: &Arc<Mutex<ExecMetrics>>,
231    ctx: &Arc<WatchContext>,
232) -> Result<()> {
233    let mut m = metrics.lock().await;
234    m.finalize();
235    m.save().await?;
236
237    let need_notif_on_success = ctx
238        .config
239        .pipeline
240        .notifications
241        .as_ref()
242        .map(|notif| notif.on.contains(&"success".to_string()))
243        .unwrap_or(false);
244
245    if need_notif_on_success {
246        discord_send_succes(ctx, &m).await?;
247    }
248
249    Ok(())
250}