1#![allow(dead_code)]
2use std::{collections::HashMap, sync::Arc};
3
4use anyhow::Result;
5use tokio::sync::Mutex;
6
7use crate::{
8 config::{Cmd, Job, ProjectConfig},
9 core::watcher::WatchContext,
10 exec::{
11 OutpuStrategy, PipeRegistry,
12 command::{CommandOutput, exec_background, exec_timeout},
13 container::contain_cmd,
14 },
15 log::logger::Logger,
16};
17
18const DEFAULT_TIMEOUT: u64 = 300;
19pub struct JobNode {
20 pub job: Arc<Job>,
21 pub depend_on: Vec<String>, pub remaining_dependencies: usize,
23 pub dependents: Vec<String>, }
25
26pub fn build_dependency_graph(config: &ProjectConfig) -> Result<HashMap<String, JobNode>> {
27 let mut graph = HashMap::new();
28
29 for (name, job) in config.pipeline.jobs.iter() {
30 if job.needs.contains(name) {
31 return Err(anyhow::anyhow!("Job: {} cannot depend on itself", name));
32 }
33
34 let job_needs = if !job.pipe.is_empty() && !job.needs.contains(&job.pipe) {
35 let mut j = job.needs.clone();
36 j.push(job.pipe.clone());
37 j
38 } else {
39 job.needs.clone()
40 };
41
42 let node = JobNode {
43 job: Arc::new(job.clone()),
44 depend_on: job_needs.clone(),
45 remaining_dependencies: job_needs.len(),
46 dependents: vec![],
47 };
48 graph.insert(name.clone(), node);
49 }
50
51 for name in graph.keys().cloned().collect::<Vec<_>>() {
52 let depend_on = graph[&name].depend_on.clone();
53 for dep in depend_on {
54 if let Some(dep_node) = graph.get_mut(&dep) {
55 dep_node.dependents.push(name.clone());
56 } else {
57 anyhow::bail!("Job '{}' depends on unknown job '{}'", name, dep);
58 }
59 }
60 }
61 Ok(graph)
62}
63
64pub async fn run_step(
65 ctx: &WatchContext,
66 step: &Cmd,
67 env: &Option<HashMap<String, String>>,
68 output_strategy: &OutpuStrategy,
69 pipe_registry: Arc<Mutex<PipeRegistry>>,
70) -> Result<Option<CommandOutput>> {
71 let parts = shell_words::split(&step.cmd)?;
72
73 if let Some(container) = &step.container {
74 contain_cmd(
75 container,
76 parts,
77 env.clone(),
78 &ctx.project_dir,
79 &ctx.log_path().display().to_string(),
80 &ctx.logger,
81 Some(ctx.config.timeout.unwrap_or(DEFAULT_TIMEOUT)),
82 )
83 .await?;
84 } else if step.blocking {
85 background_process(ctx, parts, &ctx.logger, env.clone()).await?;
86 } else {
87 return Ok(Some(
88 timeout_process(
89 ctx,
90 parts,
91 &ctx.logger,
92 env.clone(),
93 ctx.config.timeout.unwrap_or(DEFAULT_TIMEOUT),
94 output_strategy,
95 pipe_registry,
96 )
97 .await?,
98 ));
99 }
100 Ok(None)
101}
102
103async fn background_process(
104 ctx: &WatchContext,
105 parts: Vec<String>,
106 logger: &Logger,
107 env: Option<HashMap<String, String>>,
108) -> Result<(), anyhow::Error> {
109 match exec_background(parts.clone(), ctx, logger, env).await {
110 Ok(_) => {}
111 Err(e) => {
112 return Err(e);
113 }
114 };
115 Ok(())
116}
117
118async fn timeout_process(
119 ctx: &WatchContext,
120 parts: Vec<String>,
121 logger: &Logger,
122 env: Option<HashMap<String, String>>,
123 default_timeout: u64,
124 output_strategy: &OutpuStrategy,
125 pipe_registry: Arc<Mutex<PipeRegistry>>,
126) -> Result<CommandOutput, anyhow::Error> {
127 match exec_timeout(
128 parts.clone(),
129 ctx,
130 logger,
131 default_timeout,
132 env,
133 output_strategy,
134 pipe_registry,
135 )
136 .await
137 {
138 Ok(o) => Ok(o),
139 Err(e) => Err(e),
140 }
141}