core_lib/exec/
container.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use anyhow::Result;
5use bollard::Docker;
6use bollard::models::ContainerCreateBody;
7use bollard::query_parameters::{
8    CreateContainerOptionsBuilder, CreateImageOptionsBuilder, LogsOptionsBuilder,
9    RemoveContainerOptionsBuilder, StartContainerOptions,
10};
11use futures_util::stream::StreamExt;
12
13use tokio::fs::OpenOptions;
14use tokio::io::AsyncWriteExt;
15use tokio::time::timeout;
16
17use crate::core::id::short_id;
18use crate::log::logger::Logger;
19
20async fn ensure_image(docker: &Docker, image: &str, logger: &Logger) -> Result<()> {
21    let image_options = CreateImageOptionsBuilder::default()
22        .from_image(image)
23        .build();
24    let mut stream = docker.create_image(Some(image_options), None, None);
25
26    let mut last = String::new();
27    while let Some(status) = stream.next().await {
28        match status {
29            Ok(info) => {
30                if let Some(s) = info.status {
31                    if last != s {
32                        logger.info(&format!("Pulling {image}: {s}")).await?;
33                    }
34                    last = s.clone();
35                }
36            }
37            Err(e) => {
38                return Err(anyhow::anyhow!("Failed to pull image {image}: {e}"));
39            }
40        }
41    }
42    Ok(())
43}
44
45pub async fn contain_cmd(
46    image: &str,
47    cmd: Vec<String>,
48    env: Option<HashMap<String, String>>,
49    dir: &str,
50    log_path: &str,
51    logger: &Logger,
52    timeout_secs: Option<u64>,
53) -> Result<()> {
54    logger.info("Building image").await?;
55    let docker = Docker::connect_with_local_defaults()?;
56    ensure_image(&docker, image, logger).await?;
57
58    let create_option = CreateContainerOptionsBuilder::default()
59        .name(&format!("fleet-job-{}", short_id()))
60        .build();
61
62    let container_config = ContainerCreateBody {
63        image: Some(image.to_string()),
64        cmd: Some(cmd),
65        env: env.map(|m| m.iter().map(|(k, v)| format!("{k}={v}")).collect()),
66        attach_stdout: Some(true),
67        attach_stderr: Some(true),
68        tty: Some(false),
69        host_config: Some(bollard::models::HostConfig {
70            binds: Some(vec![format!("{dir}:/app",)]),
71            ..Default::default()
72        }),
73        working_dir: Some("/app".to_string()),
74        ..Default::default()
75    };
76
77    let container = docker
78        .create_container(Some(create_option), container_config)
79        .await?;
80
81    docker
82        .start_container(&container.id, None::<StartContainerOptions>)
83        .await?;
84
85    let mut log_file = OpenOptions::new()
86        .create(true)
87        .append(true)
88        .open(log_path)
89        .await?;
90
91    let logs_options = LogsOptionsBuilder::default()
92        .follow(true)
93        .stdout(true)
94        .stderr(true)
95        .build();
96
97    let mut log_stream = docker.logs(&container.id, Some(logs_options));
98
99    let logs_future = async {
100        while let Some(log) = log_stream.next().await {
101            match log? {
102                bollard::container::LogOutput::StdOut { message }
103                | bollard::container::LogOutput::StdErr { message } => {
104                    log_file.write_all(&message).await?;
105                }
106                _ => {}
107            }
108        }
109        Ok::<(), anyhow::Error>(())
110    };
111
112    let result = if let Some(secs) = timeout_secs {
113        // w Timeout
114        match timeout(Duration::from_secs(secs), logs_future).await {
115            Ok(inner) => inner,
116            Err(_) => {
117                logger
118                    .error(&format!(
119                        "Container execution timed out after {secs} seconds"
120                    ))
121                    .await?;
122                Err(anyhow::anyhow!(
123                    "Container execution timed out after {secs} seconds"
124                ))
125            }
126        }
127    } else {
128        // w no timemout
129        logs_future.await
130    };
131
132    let remove_options = RemoveContainerOptionsBuilder::default().force(true).build();
133    docker
134        .remove_container(&container.id, Some(remove_options))
135        .await?;
136    result
137}