core_lib/exec/
runner.rs

1#![allow(dead_code)]
2use std::{collections::HashMap, sync::Arc};
3
4use anyhow::Result;
5use tokio::sync::Mutex;
6
7use crate::{
8    config::{Cmd, Job, ProjectConfig},
9    core::watcher::WatchContext,
10    exec::{
11        OutpuStrategy, PipeRegistry,
12        command::{CommandOutput, exec_background, exec_timeout},
13        container::contain_cmd,
14    },
15    log::logger::Logger,
16};
17
18const DEFAULT_TIMEOUT: u64 = 300;
19pub struct JobNode {
20    pub job: Arc<Job>,
21    pub depend_on: Vec<String>, // names of jobs this job depends on
22    pub remaining_dependencies: usize,
23    pub dependents: Vec<String>, // names of jobs that depend on this one
24}
25
26pub fn build_dependency_graph(config: &ProjectConfig) -> Result<HashMap<String, JobNode>> {
27    let mut graph = HashMap::new();
28
29    for (name, job) in config.pipeline.jobs.iter() {
30        if job.needs.contains(name) {
31            return Err(anyhow::anyhow!("Job: {} cannot depend on itself", name));
32        }
33
34        let job_needs = if !job.pipe.is_empty() && !job.needs.contains(&job.pipe) {
35            let mut j = job.needs.clone();
36            j.push(job.pipe.clone());
37            j
38        } else {
39            job.needs.clone()
40        };
41
42        let node = JobNode {
43            job: Arc::new(job.clone()),
44            depend_on: job_needs.clone(),
45            remaining_dependencies: job_needs.len(),
46            dependents: vec![],
47        };
48        graph.insert(name.clone(), node);
49    }
50
51    for name in graph.keys().cloned().collect::<Vec<_>>() {
52        let depend_on = graph[&name].depend_on.clone();
53        for dep in depend_on {
54            if let Some(dep_node) = graph.get_mut(&dep) {
55                dep_node.dependents.push(name.clone());
56            } else {
57                anyhow::bail!("Job '{}' depends on unknown job '{}'", name, dep);
58            }
59        }
60    }
61    Ok(graph)
62}
63
64pub async fn run_step(
65    ctx: &WatchContext,
66    step: &Cmd,
67    env: &Option<HashMap<String, String>>,
68    output_strategy: &OutpuStrategy,
69    pipe_registry: Arc<Mutex<PipeRegistry>>,
70) -> Result<Option<CommandOutput>> {
71    let parts = shell_words::split(&step.cmd)?;
72
73    if let Some(container) = &step.container {
74        contain_cmd(
75            container,
76            parts,
77            env.clone(),
78            &ctx.project_dir,
79            &ctx.log_path().display().to_string(),
80            &ctx.logger,
81            Some(ctx.config.timeout.unwrap_or(DEFAULT_TIMEOUT)),
82        )
83        .await?;
84    } else if step.blocking {
85        background_process(ctx, parts, &ctx.logger, env.clone()).await?;
86    } else {
87        return Ok(Some(
88            timeout_process(
89                ctx,
90                parts,
91                &ctx.logger,
92                env.clone(),
93                ctx.config.timeout.unwrap_or(DEFAULT_TIMEOUT),
94                output_strategy,
95                pipe_registry,
96            )
97            .await?,
98        ));
99    }
100    Ok(None)
101}
102
103async fn background_process(
104    ctx: &WatchContext,
105    parts: Vec<String>,
106    logger: &Logger,
107    env: Option<HashMap<String, String>>,
108) -> Result<(), anyhow::Error> {
109    match exec_background(parts.clone(), ctx, logger, env).await {
110        Ok(_) => {}
111        Err(e) => {
112            return Err(e);
113        }
114    };
115    Ok(())
116}
117
118async fn timeout_process(
119    ctx: &WatchContext,
120    parts: Vec<String>,
121    logger: &Logger,
122    env: Option<HashMap<String, String>>,
123    default_timeout: u64,
124    output_strategy: &OutpuStrategy,
125    pipe_registry: Arc<Mutex<PipeRegistry>>,
126) -> Result<CommandOutput, anyhow::Error> {
127    match exec_timeout(
128        parts.clone(),
129        ctx,
130        logger,
131        default_timeout,
132        env,
133        output_strategy,
134        pipe_registry,
135    )
136    .await
137    {
138        Ok(o) => Ok(o),
139        Err(e) => Err(e),
140    }
141}