1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Mutex;
4
5use crate::types::{ArtifactDescriptor, ArtifactFormat, ArtifactSubject};
6use crate::{BinocError, BinocResult, DataAccess, ItemRef};
7
8pub struct LocalDataAccess {
21 _session_dir: Option<tempfile::TempDir>,
22 data_root: PathBuf,
23 external_root: Option<PathBuf>,
24 workspace_counter: AtomicU32,
25 workspaces: Mutex<Vec<tempfile::TempDir>>,
26 provide_dir: Mutex<Option<tempfile::TempDir>>,
27 path_policy: PathPolicy,
28}
29
30enum PathPolicy {
31 Unrestricted,
32 Restricted {
33 snapshot_a: PathBuf,
34 snapshot_b: PathBuf,
35 extra_allowed: Mutex<Vec<PathBuf>>,
36 },
37}
38
39fn artifacts_dir(data_root: &Path) -> PathBuf {
40 data_root.join(".artifacts")
41}
42
43fn safe_name(s: &str) -> String {
44 s.bytes()
45 .map(|b| {
46 if b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.' {
47 (b as char).to_string()
48 } else {
49 format!("%{b:02x}")
50 }
51 })
52 .collect()
53}
54
55fn subject_dir_name(subject: ArtifactSubject) -> &'static str {
56 match subject {
57 ArtifactSubject::Left => "left",
58 ArtifactSubject::Right => "right",
59 ArtifactSubject::Pair => "pair",
60 }
61}
62
63fn path_is_within(path: &Path, root: &Path) -> bool {
65 path.starts_with(root)
66}
67
68fn item_ref_from_physical(physical: &Path, logical: &str) -> ItemRef {
69 ItemRef {
70 logical_path: logical.to_string(),
71 is_dir: physical.is_dir(),
72 content_hash: None,
73 size: None,
74 media_type: None,
75 handle: physical.to_string_lossy().to_string(),
76 }
77}
78
79impl LocalDataAccess {
80 pub fn new() -> Self {
81 let session = tempfile::tempdir().expect("failed to create session temp dir");
82 let data_root = session.path().to_path_buf();
83 Self {
84 _session_dir: Some(session),
85 data_root,
86 external_root: None,
87 workspace_counter: AtomicU32::new(0),
88 workspaces: Mutex::new(Vec::new()),
89 provide_dir: Mutex::new(None),
90 path_policy: PathPolicy::Unrestricted,
91 }
92 }
93
94 pub fn new_for_diff(snapshot_a: &Path, snapshot_b: &Path) -> BinocResult<Self> {
98 let session = tempfile::tempdir().map_err(BinocError::Io)?;
99 let data_root = session.path().to_path_buf();
100 let snap_a = std::fs::canonicalize(snapshot_a).map_err(BinocError::Io)?;
101 let snap_b = std::fs::canonicalize(snapshot_b).map_err(BinocError::Io)?;
102 let data_root_canon = std::fs::canonicalize(&data_root).map_err(BinocError::Io)?;
103 Ok(Self {
104 _session_dir: Some(session),
105 data_root,
106 external_root: None,
107 workspace_counter: AtomicU32::new(0),
108 workspaces: Mutex::new(Vec::new()),
109 provide_dir: Mutex::new(None),
110 path_policy: PathPolicy::Restricted {
111 snapshot_a: snap_a,
112 snapshot_b: snap_b,
113 extra_allowed: Mutex::new(vec![data_root_canon]),
114 },
115 })
116 }
117
118 pub fn for_plugin(data_root: PathBuf, workspace: PathBuf) -> Self {
122 Self {
123 _session_dir: None,
124 data_root,
125 external_root: Some(workspace),
126 workspace_counter: AtomicU32::new(0),
127 workspaces: Mutex::new(Vec::new()),
128 provide_dir: Mutex::new(None),
129 path_policy: PathPolicy::Unrestricted,
130 }
131 }
132
133 pub fn with_data_root(data_root: PathBuf) -> Self {
136 Self {
137 _session_dir: None,
138 data_root,
139 external_root: None,
140 workspace_counter: AtomicU32::new(0),
141 workspaces: Mutex::new(Vec::new()),
142 provide_dir: Mutex::new(None),
143 path_policy: PathPolicy::Unrestricted,
144 }
145 }
146
147 fn record_allowed_if_restricted(&self, path: &Path) -> BinocResult<()> {
148 if let PathPolicy::Restricted { extra_allowed, .. } = &self.path_policy {
149 let c = std::fs::canonicalize(path).map_err(BinocError::Io)?;
150 extra_allowed.lock().unwrap().push(c);
151 }
152 Ok(())
153 }
154
155 fn enforce_path_policy_resolved(&self, resolved: &Path) -> BinocResult<()> {
156 match &self.path_policy {
157 PathPolicy::Unrestricted => Ok(()),
158 PathPolicy::Restricted {
159 snapshot_a,
160 snapshot_b,
161 extra_allowed,
162 } => {
163 if path_is_within(resolved, snapshot_a) || path_is_within(resolved, snapshot_b) {
164 return Ok(());
165 }
166 let roots = extra_allowed.lock().unwrap();
167 for root in roots.iter() {
168 if path_is_within(resolved, root) {
169 return Ok(());
170 }
171 }
172 Err(BinocError::PathPolicy(format!(
173 "path must stay under snapshot directories or session workspace: {}",
174 resolved.display()
175 )))
176 }
177 }
178 }
179
180 fn enforce_path_policy(&self, physical: &Path) -> BinocResult<()> {
182 let resolved = std::fs::canonicalize(physical).map_err(BinocError::Io)?;
183 self.enforce_path_policy_resolved(&resolved)
184 }
185
186 fn enforce_policy_for_read_path(&self, path: &Path) -> BinocResult<()> {
188 match &self.path_policy {
189 PathPolicy::Unrestricted => Ok(()),
190 PathPolicy::Restricted { .. } => {
191 if let Ok(c) = std::fs::canonicalize(path) {
192 return self.enforce_path_policy_resolved(&c);
193 }
194 let mut probe: Option<&Path> = Some(path);
195 while let Some(p) = probe {
196 if p.as_os_str().is_empty() {
197 break;
198 }
199 if p.exists() {
200 let base = std::fs::canonicalize(p).map_err(BinocError::Io)?;
201 self.enforce_path_policy_resolved(&base)?;
202 return Ok(());
203 }
204 probe = p.parent();
205 }
206 Err(BinocError::PathPolicy(format!(
207 "cannot resolve path under session: {}",
208 path.display()
209 )))
210 }
211 }
212 }
213
214 fn ensure_provide_dir(&self) -> BinocResult<PathBuf> {
215 if let Some(root) = &self.external_root {
216 let d = root.join("_provide");
217 std::fs::create_dir_all(&d).map_err(BinocError::Io)?;
218 self.record_allowed_if_restricted(&d)?;
219 return Ok(d);
220 }
221 let mut guard = self.provide_dir.lock().unwrap();
222 if guard.is_none() {
223 let dir = tempfile::tempdir().map_err(BinocError::Io)?;
224 self.record_allowed_if_restricted(dir.path())?;
225 *guard = Some(dir);
226 }
227 Ok(guard.as_ref().unwrap().path().to_path_buf())
228 }
229}
230
231impl Default for LocalDataAccess {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237impl DataAccess for LocalDataAccess {
238 fn read_bytes(&self, item: &ItemRef) -> BinocResult<Vec<u8>> {
239 let p = Path::new(&item.handle);
240 self.enforce_policy_for_read_path(p)?;
241 std::fs::read(p).map_err(BinocError::Io)
242 }
243
244 fn open_read(&self, item: &ItemRef) -> BinocResult<Box<dyn std::io::Read + Send>> {
245 let p = Path::new(&item.handle);
246 self.enforce_policy_for_read_path(p)?;
247 let file = std::fs::File::open(p).map_err(BinocError::Io)?;
248 Ok(Box::new(file))
249 }
250
251 fn local_path(&self, item: &ItemRef) -> BinocResult<PathBuf> {
252 let p = PathBuf::from(&item.handle);
253 self.enforce_policy_for_read_path(&p)?;
254 Ok(p)
255 }
256
257 fn provide(&self, logical_path: &str, content: &[u8]) -> BinocResult<ItemRef> {
258 let dir = self.ensure_provide_dir()?;
259 let safe_name = logical_path.replace(['/', '\\'], "_");
260 let file_path = dir.join(&safe_name);
261 std::fs::write(&file_path, content).map_err(BinocError::Io)?;
262 self.enforce_path_policy(&file_path)?;
263 Ok(item_ref_from_physical(&file_path, logical_path))
264 }
265
266 fn workspace(&self) -> BinocResult<PathBuf> {
267 if let Some(root) = &self.external_root {
268 let n = self.workspace_counter.fetch_add(1, Ordering::Relaxed);
269 let subdir = root.join(format!("ws-{n}"));
270 std::fs::create_dir_all(&subdir).map_err(BinocError::Io)?;
271 self.record_allowed_if_restricted(&subdir)?;
272 return Ok(subdir);
273 }
274 let dir = tempfile::tempdir().map_err(BinocError::Io)?;
275 let path = dir.path().to_path_buf();
276 self.record_allowed_if_restricted(&path)?;
277 self.workspaces.lock().unwrap().push(dir);
278 Ok(path)
279 }
280
281 fn register_local(&self, physical: &Path, logical: &str) -> BinocResult<ItemRef> {
282 self.enforce_path_policy(physical)?;
283 Ok(item_ref_from_physical(physical, logical))
284 }
285
286 fn publish_artifact(
287 &self,
288 format: &ArtifactFormat,
289 subject: ArtifactSubject,
290 producer: &str,
291 data: &[u8],
292 ) -> BinocResult<ArtifactDescriptor> {
293 let id: u64 = rand::random();
294 let dir = artifacts_dir(&self.data_root)
295 .join(safe_name(&format.package))
296 .join(safe_name(&format.name))
297 .join(format!("v{}", format.version))
298 .join(subject_dir_name(subject));
299 std::fs::create_dir_all(&dir).map_err(BinocError::Io)?;
300 let filename = format!("{}-{id:016x}", safe_name(producer));
301 let handle = dir.join(filename).to_string_lossy().to_string();
302 std::fs::write(&handle, data).map_err(BinocError::Io)?;
303 Ok(ArtifactDescriptor {
304 format: format.clone(),
305 subject,
306 producer: producer.to_string(),
307 handle,
308 })
309 }
310
311 fn get_artifact(&self, descriptor: &ArtifactDescriptor) -> BinocResult<Option<Vec<u8>>> {
312 let path = PathBuf::from(&descriptor.handle);
313 self.enforce_policy_for_read_path(&path)?;
314 match std::fs::read(&path) {
315 Ok(data) => Ok(Some(data)),
316 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
317 Err(e) => Err(BinocError::Io(e)),
318 }
319 }
320
321 fn data_root(&self) -> BinocResult<PathBuf> {
322 Ok(self.data_root.clone())
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn publish_and_get_artifact_round_trip() {
332 let da = LocalDataAccess::new();
333 let fmt = ArtifactFormat::new("binoc", "tabular", 1);
334 let desc = da
335 .publish_artifact(&fmt, ArtifactSubject::Left, "binoc.csv", b"hello world")
336 .unwrap();
337 assert_eq!(desc.format, fmt);
338 assert_eq!(desc.subject, ArtifactSubject::Left);
339 assert_eq!(desc.producer, "binoc.csv");
340 let loaded = da.get_artifact(&desc).unwrap();
341 assert_eq!(loaded, Some(b"hello world".to_vec()));
342 }
343
344 #[test]
345 fn get_artifact_missing_returns_none() {
346 let da = LocalDataAccess::new();
347 let desc = ArtifactDescriptor {
348 format: ArtifactFormat::new("nonexistent", "thing", 1),
349 subject: ArtifactSubject::Pair,
350 producer: "test".into(),
351 handle: "/tmp/does-not-exist-binoc-test".into(),
352 };
353 assert_eq!(da.get_artifact(&desc).unwrap(), None);
354 }
355
356 #[test]
357 fn cross_instance_artifact_visibility() {
358 let da = LocalDataAccess::new();
359 let fmt = ArtifactFormat::new("binoc", "tabular", 1);
360 let desc = da
361 .publish_artifact(&fmt, ArtifactSubject::Right, "binoc.csv", b"shared-value")
362 .unwrap();
363 let data_root = da.data_root().unwrap();
364
365 let plugin_da = LocalDataAccess::with_data_root(data_root);
366 let loaded = plugin_da.get_artifact(&desc).unwrap();
367 assert_eq!(loaded, Some(b"shared-value".to_vec()));
368 }
369
370 #[test]
371 fn for_plugin_shares_artifacts() {
372 let da = LocalDataAccess::new();
373 let data_root = da.data_root().unwrap();
374 let ws = da.workspace().unwrap();
375
376 let plugin_da = LocalDataAccess::for_plugin(data_root, ws);
377 let fmt = ArtifactFormat::new("myplugin", "schema", 1);
378 let desc = plugin_da
379 .publish_artifact(&fmt, ArtifactSubject::Pair, "myplugin", b"plugin-data")
380 .unwrap();
381
382 let loaded = da.get_artifact(&desc).unwrap();
383 assert_eq!(loaded, Some(b"plugin-data".to_vec()));
384 }
385
386 #[test]
387 fn data_root_returns_valid_path() {
388 let da = LocalDataAccess::new();
389 let root = da.data_root().unwrap();
390 assert!(root.exists());
391 }
392
393 #[test]
394 fn restricted_rejects_register_outside_snapshots() {
395 let tmp_a = tempfile::tempdir().unwrap();
396 let tmp_b = tempfile::tempdir().unwrap();
397 let outside = tempfile::tempdir().unwrap();
398 std::fs::write(outside.path().join("x.txt"), b"x").unwrap();
399
400 let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
401 let p = outside.path().join("x.txt");
402 let err = da.register_local(&p, "x.txt").unwrap_err();
403 assert!(matches!(err, BinocError::PathPolicy(_)));
404 }
405
406 #[test]
407 fn restricted_allows_register_under_snapshot() {
408 let tmp_a = tempfile::tempdir().unwrap();
409 let tmp_b = tempfile::tempdir().unwrap();
410 let f = tmp_a.path().join("f.txt");
411 std::fs::write(&f, b"ok").unwrap();
412
413 let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
414 da.register_local(&f, "f.txt").unwrap();
415 }
416}