core_lib/exec/
mod.rs

1#![allow(dead_code)]
2use std::{collections::HashMap, fs::File, process::Stdio, sync::Arc};
3
4use anyhow::Result;
5use tempfile::NamedTempFile;
6use tokio::{process::Command, sync::Mutex};
7
8pub mod command;
9pub mod container;
10pub mod metrics;
11pub mod pipeline;
12pub mod runner;
13
14#[allow(clippy::enum_variant_names)]
15pub enum OutpuStrategy {
16    ToFiles {
17        stdout: File,
18        stderr: File,
19    },
20    ToPipeOut {
21        cmd: String,
22        target: String,
23        stdout: File,
24        stderr: File,
25    },
26    ToPipeIn {
27        target: String,
28        stdout: File,
29        stderr: File,
30    },
31}
32
33pub enum CMDManage {
34    Default,
35    PipeIn,
36    PipeOut,
37}
38#[derive(Debug)]
39pub struct PipeRegistry {
40    pub pipes_register: HashMap<String, NamedTempFile>,
41}
42
43impl OutpuStrategy {
44    async fn configure(
45        &self,
46        cmd: &mut Command,
47        current: Vec<std::string::String>,
48        reg: Arc<Mutex<PipeRegistry>>,
49    ) -> Result<CMDManage> {
50        match self {
51            OutpuStrategy::ToFiles { stdout, stderr } => {
52                cmd.stdout(Stdio::from(stdout.try_clone()?));
53                cmd.stderr(Stdio::from(stderr.try_clone()?));
54                Ok(CMDManage::Default)
55            }
56            OutpuStrategy::ToPipeOut { cmd: c, target, .. }
57                if current == shell_words::split(c)? =>
58            {
59                // write in output file
60                let tmpfile = tempfile::NamedTempFile::new()?;
61                cmd.stdout(Stdio::from(tmpfile.reopen()?));
62                cmd.stderr(Stdio::from(tmpfile.reopen()?));
63                reg.lock()
64                    .await
65                    .pipes_register
66                    .insert(target.into(), tmpfile);
67                Ok(CMDManage::PipeOut)
68            }
69            OutpuStrategy::ToPipeOut {
70                cmd: _,
71                target: _,
72                stdout,
73                stderr,
74            } => {
75                cmd.stdout(Stdio::from(stdout.try_clone()?));
76                cmd.stderr(Stdio::from(stderr.try_clone()?));
77                Ok(CMDManage::Default)
78            }
79            OutpuStrategy::ToPipeIn {
80                target,
81                stdout,
82                stderr,
83            } if current == shell_words::split(target)? => {
84                // read in tmp file
85                if let Some(pipe_path) = {
86                    let registry: tokio::sync::MutexGuard<'_, PipeRegistry> = reg.lock().await;
87                    registry
88                        .pipes_register
89                        .get(target)
90                        .map(|tmpfile| tmpfile.path().to_path_buf())
91                } {
92                    let file = File::open(pipe_path)?;
93                    cmd.stdin(Stdio::from(file));
94                } else {
95                    cmd.stdin(Stdio::null());
96                }
97                cmd.stdout(Stdio::from(stdout.try_clone()?));
98                cmd.stderr(Stdio::from(stderr.try_clone()?));
99                Ok(CMDManage::PipeIn)
100            }
101            OutpuStrategy::ToPipeIn { stdout, stderr, .. } => {
102                cmd.stdout(Stdio::from(stdout.try_clone()?));
103                cmd.stderr(Stdio::from(stderr.try_clone()?));
104                Ok(CMDManage::Default)
105            }
106        }
107    }
108}