Skip to main content

binoc_sdk/
data_access.rs

1use std::path::{Path, PathBuf};
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Mutex;
4
5use crate::types::{ArtifactDescriptor, ArtifactFormat, ArtifactSubject};
6use crate::{BinocError, BinocResult, DataAccess, ItemRef};
7
8/// In-process DataAccess backed by the local filesystem, temp directories,
9/// and a filesystem-backed artifact store under `data_root/.artifacts/`.
10///
11/// Construction modes:
12///
13/// - [`Self::new`] — unrestricted paths (tests, ad-hoc tooling).
14/// - [`Self::new_for_diff`] — paths must stay under the two snapshot trees or
15///   session workspace (used by the controller).
16/// - [`Self::for_plugin`] — shares the host's `data_root` for artifact access,
17///   plus a pre-allocated workspace for expansion (C ABI plugins).
18/// - [`Self::with_data_root`] — shares an existing `data_root` for artifact
19///   reads only (no expansion workspace).
20pub struct LocalDataAccess {
21    _session_dir: Option<tempfile::TempDir>,
22    data_root: PathBuf,
23    external_root: Option<PathBuf>,
24    workspace_counter: AtomicU32,
25    workspaces: Mutex<Vec<tempfile::TempDir>>,
26    provide_dir: Mutex<Option<tempfile::TempDir>>,
27    path_policy: PathPolicy,
28}
29
30enum PathPolicy {
31    Unrestricted,
32    Restricted {
33        snapshot_a: PathBuf,
34        snapshot_b: PathBuf,
35        extra_allowed: Mutex<Vec<PathBuf>>,
36    },
37}
38
39fn artifacts_dir(data_root: &Path) -> PathBuf {
40    data_root.join(".artifacts")
41}
42
43fn safe_name(s: &str) -> String {
44    s.bytes()
45        .map(|b| {
46            if b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'.' {
47                (b as char).to_string()
48            } else {
49                format!("%{b:02x}")
50            }
51        })
52        .collect()
53}
54
55fn subject_dir_name(subject: ArtifactSubject) -> &'static str {
56    match subject {
57        ArtifactSubject::Left => "left",
58        ArtifactSubject::Right => "right",
59        ArtifactSubject::Pair => "pair",
60    }
61}
62
63/// True when `path` is `root` or a descendant (component-wise).
64fn path_is_within(path: &Path, root: &Path) -> bool {
65    path.starts_with(root)
66}
67
68fn item_ref_from_physical(physical: &Path, logical: &str) -> ItemRef {
69    ItemRef {
70        logical_path: logical.to_string(),
71        is_dir: physical.is_dir(),
72        content_hash: None,
73        size: None,
74        media_type: None,
75        handle: physical.to_string_lossy().to_string(),
76    }
77}
78
79impl LocalDataAccess {
80    pub fn new() -> Self {
81        let session = tempfile::tempdir().expect("failed to create session temp dir");
82        let data_root = session.path().to_path_buf();
83        Self {
84            _session_dir: Some(session),
85            data_root,
86            external_root: None,
87            workspace_counter: AtomicU32::new(0),
88            workspaces: Mutex::new(Vec::new()),
89            provide_dir: Mutex::new(None),
90            path_policy: PathPolicy::Unrestricted,
91        }
92    }
93
94    /// Session-backed access with path confinement: filesystem reads and
95    /// `register_local` targets must lie under the snapshot roots, the session
96    /// `data_root`, or a workspace / provide directory created by this instance.
97    pub fn new_for_diff(snapshot_a: &Path, snapshot_b: &Path) -> BinocResult<Self> {
98        let session = tempfile::tempdir().map_err(BinocError::Io)?;
99        let data_root = session.path().to_path_buf();
100        let snap_a = std::fs::canonicalize(snapshot_a).map_err(BinocError::Io)?;
101        let snap_b = std::fs::canonicalize(snapshot_b).map_err(BinocError::Io)?;
102        let data_root_canon = std::fs::canonicalize(&data_root).map_err(BinocError::Io)?;
103        Ok(Self {
104            _session_dir: Some(session),
105            data_root,
106            external_root: None,
107            workspace_counter: AtomicU32::new(0),
108            workspaces: Mutex::new(Vec::new()),
109            provide_dir: Mutex::new(None),
110            path_policy: PathPolicy::Restricted {
111                snapshot_a: snap_a,
112                snapshot_b: snap_b,
113                extra_allowed: Mutex::new(vec![data_root_canon]),
114            },
115        })
116    }
117
118    /// Create a LocalDataAccess for a plugin running across the C ABI.
119    /// Shares the host's `data_root` for cache access and uses `workspace`
120    /// for expansion (provide, workspace calls).
121    pub fn for_plugin(data_root: PathBuf, workspace: PathBuf) -> Self {
122        Self {
123            _session_dir: None,
124            data_root,
125            external_root: Some(workspace),
126            workspace_counter: AtomicU32::new(0),
127            workspaces: Mutex::new(Vec::new()),
128            provide_dir: Mutex::new(None),
129            path_policy: PathPolicy::Unrestricted,
130        }
131    }
132
133    /// Create a LocalDataAccess that can only read from an existing data_root
134    /// cache. No workspace for expansion. Used during extract-only access.
135    pub fn with_data_root(data_root: PathBuf) -> Self {
136        Self {
137            _session_dir: None,
138            data_root,
139            external_root: None,
140            workspace_counter: AtomicU32::new(0),
141            workspaces: Mutex::new(Vec::new()),
142            provide_dir: Mutex::new(None),
143            path_policy: PathPolicy::Unrestricted,
144        }
145    }
146
147    fn record_allowed_if_restricted(&self, path: &Path) -> BinocResult<()> {
148        if let PathPolicy::Restricted { extra_allowed, .. } = &self.path_policy {
149            let c = std::fs::canonicalize(path).map_err(BinocError::Io)?;
150            extra_allowed.lock().unwrap().push(c);
151        }
152        Ok(())
153    }
154
155    fn enforce_path_policy_resolved(&self, resolved: &Path) -> BinocResult<()> {
156        match &self.path_policy {
157            PathPolicy::Unrestricted => Ok(()),
158            PathPolicy::Restricted {
159                snapshot_a,
160                snapshot_b,
161                extra_allowed,
162            } => {
163                if path_is_within(resolved, snapshot_a) || path_is_within(resolved, snapshot_b) {
164                    return Ok(());
165                }
166                let roots = extra_allowed.lock().unwrap();
167                for root in roots.iter() {
168                    if path_is_within(resolved, root) {
169                        return Ok(());
170                    }
171                }
172                Err(BinocError::PathPolicy(format!(
173                    "path must stay under snapshot directories or session workspace: {}",
174                    resolved.display()
175                )))
176            }
177        }
178    }
179
180    /// Enforce policy for a path that must already exist on disk (e.g. `register_local`).
181    fn enforce_path_policy(&self, physical: &Path) -> BinocResult<()> {
182        let resolved = std::fs::canonicalize(physical).map_err(BinocError::Io)?;
183        self.enforce_path_policy_resolved(&resolved)
184    }
185
186    /// Enforce policy before reading; allows missing leaf paths under an allowed directory.
187    fn enforce_policy_for_read_path(&self, path: &Path) -> BinocResult<()> {
188        match &self.path_policy {
189            PathPolicy::Unrestricted => Ok(()),
190            PathPolicy::Restricted { .. } => {
191                if let Ok(c) = std::fs::canonicalize(path) {
192                    return self.enforce_path_policy_resolved(&c);
193                }
194                let mut probe: Option<&Path> = Some(path);
195                while let Some(p) = probe {
196                    if p.as_os_str().is_empty() {
197                        break;
198                    }
199                    if p.exists() {
200                        let base = std::fs::canonicalize(p).map_err(BinocError::Io)?;
201                        self.enforce_path_policy_resolved(&base)?;
202                        return Ok(());
203                    }
204                    probe = p.parent();
205                }
206                Err(BinocError::PathPolicy(format!(
207                    "cannot resolve path under session: {}",
208                    path.display()
209                )))
210            }
211        }
212    }
213
214    fn ensure_provide_dir(&self) -> BinocResult<PathBuf> {
215        if let Some(root) = &self.external_root {
216            let d = root.join("_provide");
217            std::fs::create_dir_all(&d).map_err(BinocError::Io)?;
218            self.record_allowed_if_restricted(&d)?;
219            return Ok(d);
220        }
221        let mut guard = self.provide_dir.lock().unwrap();
222        if guard.is_none() {
223            let dir = tempfile::tempdir().map_err(BinocError::Io)?;
224            self.record_allowed_if_restricted(dir.path())?;
225            *guard = Some(dir);
226        }
227        Ok(guard.as_ref().unwrap().path().to_path_buf())
228    }
229}
230
231impl Default for LocalDataAccess {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237impl DataAccess for LocalDataAccess {
238    fn read_bytes(&self, item: &ItemRef) -> BinocResult<Vec<u8>> {
239        let p = Path::new(&item.handle);
240        self.enforce_policy_for_read_path(p)?;
241        std::fs::read(p).map_err(BinocError::Io)
242    }
243
244    fn open_read(&self, item: &ItemRef) -> BinocResult<Box<dyn std::io::Read + Send>> {
245        let p = Path::new(&item.handle);
246        self.enforce_policy_for_read_path(p)?;
247        let file = std::fs::File::open(p).map_err(BinocError::Io)?;
248        Ok(Box::new(file))
249    }
250
251    fn local_path(&self, item: &ItemRef) -> BinocResult<PathBuf> {
252        let p = PathBuf::from(&item.handle);
253        self.enforce_policy_for_read_path(&p)?;
254        Ok(p)
255    }
256
257    fn provide(&self, logical_path: &str, content: &[u8]) -> BinocResult<ItemRef> {
258        let dir = self.ensure_provide_dir()?;
259        let safe_name = logical_path.replace(['/', '\\'], "_");
260        let file_path = dir.join(&safe_name);
261        std::fs::write(&file_path, content).map_err(BinocError::Io)?;
262        self.enforce_path_policy(&file_path)?;
263        Ok(item_ref_from_physical(&file_path, logical_path))
264    }
265
266    fn workspace(&self) -> BinocResult<PathBuf> {
267        if let Some(root) = &self.external_root {
268            let n = self.workspace_counter.fetch_add(1, Ordering::Relaxed);
269            let subdir = root.join(format!("ws-{n}"));
270            std::fs::create_dir_all(&subdir).map_err(BinocError::Io)?;
271            self.record_allowed_if_restricted(&subdir)?;
272            return Ok(subdir);
273        }
274        let dir = tempfile::tempdir().map_err(BinocError::Io)?;
275        let path = dir.path().to_path_buf();
276        self.record_allowed_if_restricted(&path)?;
277        self.workspaces.lock().unwrap().push(dir);
278        Ok(path)
279    }
280
281    fn register_local(&self, physical: &Path, logical: &str) -> BinocResult<ItemRef> {
282        self.enforce_path_policy(physical)?;
283        Ok(item_ref_from_physical(physical, logical))
284    }
285
286    fn publish_artifact(
287        &self,
288        format: &ArtifactFormat,
289        subject: ArtifactSubject,
290        producer: &str,
291        data: &[u8],
292    ) -> BinocResult<ArtifactDescriptor> {
293        let id: u64 = rand::random();
294        let dir = artifacts_dir(&self.data_root)
295            .join(safe_name(&format.package))
296            .join(safe_name(&format.name))
297            .join(format!("v{}", format.version))
298            .join(subject_dir_name(subject));
299        std::fs::create_dir_all(&dir).map_err(BinocError::Io)?;
300        let filename = format!("{}-{id:016x}", safe_name(producer));
301        let handle = dir.join(filename).to_string_lossy().to_string();
302        std::fs::write(&handle, data).map_err(BinocError::Io)?;
303        Ok(ArtifactDescriptor {
304            format: format.clone(),
305            subject,
306            producer: producer.to_string(),
307            handle,
308        })
309    }
310
311    fn get_artifact(&self, descriptor: &ArtifactDescriptor) -> BinocResult<Option<Vec<u8>>> {
312        let path = PathBuf::from(&descriptor.handle);
313        self.enforce_policy_for_read_path(&path)?;
314        match std::fs::read(&path) {
315            Ok(data) => Ok(Some(data)),
316            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
317            Err(e) => Err(BinocError::Io(e)),
318        }
319    }
320
321    fn data_root(&self) -> BinocResult<PathBuf> {
322        Ok(self.data_root.clone())
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn publish_and_get_artifact_round_trip() {
332        let da = LocalDataAccess::new();
333        let fmt = ArtifactFormat::new("binoc", "tabular", 1);
334        let desc = da
335            .publish_artifact(&fmt, ArtifactSubject::Left, "binoc.csv", b"hello world")
336            .unwrap();
337        assert_eq!(desc.format, fmt);
338        assert_eq!(desc.subject, ArtifactSubject::Left);
339        assert_eq!(desc.producer, "binoc.csv");
340        let loaded = da.get_artifact(&desc).unwrap();
341        assert_eq!(loaded, Some(b"hello world".to_vec()));
342    }
343
344    #[test]
345    fn get_artifact_missing_returns_none() {
346        let da = LocalDataAccess::new();
347        let desc = ArtifactDescriptor {
348            format: ArtifactFormat::new("nonexistent", "thing", 1),
349            subject: ArtifactSubject::Pair,
350            producer: "test".into(),
351            handle: "/tmp/does-not-exist-binoc-test".into(),
352        };
353        assert_eq!(da.get_artifact(&desc).unwrap(), None);
354    }
355
356    #[test]
357    fn cross_instance_artifact_visibility() {
358        let da = LocalDataAccess::new();
359        let fmt = ArtifactFormat::new("binoc", "tabular", 1);
360        let desc = da
361            .publish_artifact(&fmt, ArtifactSubject::Right, "binoc.csv", b"shared-value")
362            .unwrap();
363        let data_root = da.data_root().unwrap();
364
365        let plugin_da = LocalDataAccess::with_data_root(data_root);
366        let loaded = plugin_da.get_artifact(&desc).unwrap();
367        assert_eq!(loaded, Some(b"shared-value".to_vec()));
368    }
369
370    #[test]
371    fn for_plugin_shares_artifacts() {
372        let da = LocalDataAccess::new();
373        let data_root = da.data_root().unwrap();
374        let ws = da.workspace().unwrap();
375
376        let plugin_da = LocalDataAccess::for_plugin(data_root, ws);
377        let fmt = ArtifactFormat::new("myplugin", "schema", 1);
378        let desc = plugin_da
379            .publish_artifact(&fmt, ArtifactSubject::Pair, "myplugin", b"plugin-data")
380            .unwrap();
381
382        let loaded = da.get_artifact(&desc).unwrap();
383        assert_eq!(loaded, Some(b"plugin-data".to_vec()));
384    }
385
386    #[test]
387    fn data_root_returns_valid_path() {
388        let da = LocalDataAccess::new();
389        let root = da.data_root().unwrap();
390        assert!(root.exists());
391    }
392
393    #[test]
394    fn restricted_rejects_register_outside_snapshots() {
395        let tmp_a = tempfile::tempdir().unwrap();
396        let tmp_b = tempfile::tempdir().unwrap();
397        let outside = tempfile::tempdir().unwrap();
398        std::fs::write(outside.path().join("x.txt"), b"x").unwrap();
399
400        let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
401        let p = outside.path().join("x.txt");
402        let err = da.register_local(&p, "x.txt").unwrap_err();
403        assert!(matches!(err, BinocError::PathPolicy(_)));
404    }
405
406    #[test]
407    fn restricted_allows_register_under_snapshot() {
408        let tmp_a = tempfile::tempdir().unwrap();
409        let tmp_b = tempfile::tempdir().unwrap();
410        let f = tmp_a.path().join("f.txt");
411        std::fs::write(&f, b"ok").unwrap();
412
413        let da = LocalDataAccess::new_for_diff(tmp_a.path(), tmp_b.path()).unwrap();
414        da.register_local(&f, "f.txt").unwrap();
415    }
416}