core_lib/exec/
container.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use anyhow::Result;
5use bollard::Docker;
6use bollard::models::ContainerCreateBody;
7use bollard::query_parameters::{
8 CreateContainerOptionsBuilder, CreateImageOptionsBuilder, LogsOptionsBuilder,
9 RemoveContainerOptionsBuilder, StartContainerOptions,
10};
11use futures_util::stream::StreamExt;
12
13use tokio::fs::OpenOptions;
14use tokio::io::AsyncWriteExt;
15use tokio::time::timeout;
16
17use crate::core::id::short_id;
18use crate::log::logger::Logger;
19
20async fn ensure_image(docker: &Docker, image: &str, logger: &Logger) -> Result<()> {
21 let image_options = CreateImageOptionsBuilder::default()
22 .from_image(image)
23 .build();
24 let mut stream = docker.create_image(Some(image_options), None, None);
25
26 let mut last = String::new();
27 while let Some(status) = stream.next().await {
28 match status {
29 Ok(info) => {
30 if let Some(s) = info.status {
31 if last != s {
32 logger.info(&format!("Pulling {image}: {s}")).await?;
33 }
34 last = s.clone();
35 }
36 }
37 Err(e) => {
38 return Err(anyhow::anyhow!("Failed to pull image {image}: {e}"));
39 }
40 }
41 }
42 Ok(())
43}
44
45pub async fn contain_cmd(
46 image: &str,
47 cmd: Vec<String>,
48 env: Option<HashMap<String, String>>,
49 dir: &str,
50 log_path: &str,
51 logger: &Logger,
52 timeout_secs: Option<u64>,
53) -> Result<()> {
54 logger.info("Building image").await?;
55 let docker = Docker::connect_with_local_defaults()?;
56 ensure_image(&docker, image, logger).await?;
57
58 let create_option = CreateContainerOptionsBuilder::default()
59 .name(&format!("fleet-job-{}", short_id()))
60 .build();
61
62 let container_config = ContainerCreateBody {
63 image: Some(image.to_string()),
64 cmd: Some(cmd),
65 env: env.map(|m| m.iter().map(|(k, v)| format!("{k}={v}")).collect()),
66 attach_stdout: Some(true),
67 attach_stderr: Some(true),
68 tty: Some(false),
69 host_config: Some(bollard::models::HostConfig {
70 binds: Some(vec![format!("{dir}:/app",)]),
71 ..Default::default()
72 }),
73 working_dir: Some("/app".to_string()),
74 ..Default::default()
75 };
76
77 let container = docker
78 .create_container(Some(create_option), container_config)
79 .await?;
80
81 docker
82 .start_container(&container.id, None::<StartContainerOptions>)
83 .await?;
84
85 let mut log_file = OpenOptions::new()
86 .create(true)
87 .append(true)
88 .open(log_path)
89 .await?;
90
91 let logs_options = LogsOptionsBuilder::default()
92 .follow(true)
93 .stdout(true)
94 .stderr(true)
95 .build();
96
97 let mut log_stream = docker.logs(&container.id, Some(logs_options));
98
99 let logs_future = async {
100 while let Some(log) = log_stream.next().await {
101 match log? {
102 bollard::container::LogOutput::StdOut { message }
103 | bollard::container::LogOutput::StdErr { message } => {
104 log_file.write_all(&message).await?;
105 }
106 _ => {}
107 }
108 }
109 Ok::<(), anyhow::Error>(())
110 };
111
112 let result = if let Some(secs) = timeout_secs {
113 match timeout(Duration::from_secs(secs), logs_future).await {
115 Ok(inner) => inner,
116 Err(_) => {
117 logger
118 .error(&format!(
119 "Container execution timed out after {secs} seconds"
120 ))
121 .await?;
122 Err(anyhow::anyhow!(
123 "Container execution timed out after {secs} seconds"
124 ))
125 }
126 }
127 } else {
128 logs_future.await
130 };
131
132 let remove_options = RemoveContainerOptionsBuilder::default().force(true).build();
133 docker
134 .remove_container(&container.id, Some(remove_options))
135 .await?;
136 result
137}