phytrace_sdk/reliability/
buffer.rs

1//! File-based event buffer for offline reliability.
2//!
3//! Events are stored as JSON files in a buffer directory when the transport
4//! is unavailable, then replayed when connectivity is restored.
5
6use std::fs::{self, File};
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use flate2::read::GzDecoder;
12use flate2::write::GzEncoder;
13use flate2::Compression;
14
15use crate::core::config::BufferConfig;
16use crate::error::{BufferError, PhyTraceResult};
17use crate::models::event::UdmEvent;
18
19/// File-based event buffer.
20#[derive(Debug)]
21pub struct FileBuffer {
22    config: BufferConfig,
23    buffer_dir: PathBuf,
24}
25
26impl FileBuffer {
27    /// Create a new file buffer with configuration.
28    pub fn new(config: &BufferConfig) -> PhyTraceResult<Self> {
29        let buffer_dir = PathBuf::from(&config.path);
30
31        // Create buffer directory if it doesn't exist
32        if !buffer_dir.exists() {
33            fs::create_dir_all(&buffer_dir).map_err(|e| {
34                BufferError::Io(format!("Failed to create buffer directory: {}", e))
35            })?;
36        }
37
38        Ok(Self {
39            config: config.clone(),
40            buffer_dir,
41        })
42    }
43
44    /// Create a buffer with default configuration at the specified path.
45    pub fn at_path(path: impl Into<PathBuf>) -> PhyTraceResult<Self> {
46        let path = path.into();
47        let config = BufferConfig {
48            path: path.to_string_lossy().to_string(),
49            ..Default::default()
50        };
51        Self::new(&config)
52    }
53
54    /// Buffer a single event.
55    pub fn buffer_event(&self, event: &UdmEvent) -> PhyTraceResult<()> {
56        self.check_capacity()?;
57
58        let filename = self.generate_filename();
59        let filepath = self.buffer_dir.join(&filename);
60
61        let json =
62            serde_json::to_string(event).map_err(|e| BufferError::Serialization(e.to_string()))?;
63
64        if self.config.compress {
65            self.write_compressed(&filepath, &json)?;
66        } else {
67            fs::write(&filepath, &json)
68                .map_err(|e| BufferError::Io(format!("Failed to write buffer file: {}", e)))?;
69        }
70
71        Ok(())
72    }
73
74    /// Buffer multiple events.
75    pub fn buffer_events(&self, events: &[UdmEvent]) -> PhyTraceResult<()> {
76        self.check_capacity()?;
77
78        let filename = format!("batch_{}.jsonl", self.timestamp_nanos());
79        let filepath = self.buffer_dir.join(&filename);
80
81        if self.config.compress {
82            let filepath = filepath.with_extension("jsonl.gz");
83            let file = File::create(&filepath)
84                .map_err(|e| BufferError::Io(format!("Failed to create buffer file: {}", e)))?;
85            let encoder = GzEncoder::new(file, Compression::fast());
86            let mut writer = BufWriter::new(encoder);
87
88            for event in events {
89                let json = serde_json::to_string(event)
90                    .map_err(|e| BufferError::Serialization(e.to_string()))?;
91                writeln!(writer, "{}", json)
92                    .map_err(|e| BufferError::Io(format!("Failed to write to buffer: {}", e)))?;
93            }
94            writer
95                .flush()
96                .map_err(|e| BufferError::Io(format!("Failed to flush buffer: {}", e)))?;
97        } else {
98            let file = File::create(&filepath)
99                .map_err(|e| BufferError::Io(format!("Failed to create buffer file: {}", e)))?;
100            let mut writer = BufWriter::new(file);
101
102            for event in events {
103                let json = serde_json::to_string(event)
104                    .map_err(|e| BufferError::Serialization(e.to_string()))?;
105                writeln!(writer, "{}", json)
106                    .map_err(|e| BufferError::Io(format!("Failed to write to buffer: {}", e)))?;
107            }
108            writer
109                .flush()
110                .map_err(|e| BufferError::Io(format!("Failed to flush buffer: {}", e)))?;
111        }
112
113        Ok(())
114    }
115
116    /// Read all buffered events, oldest first.
117    pub fn read_events(&self) -> PhyTraceResult<Vec<BufferedEvent>> {
118        let mut events = Vec::new();
119
120        for entry in fs::read_dir(&self.buffer_dir)
121            .map_err(|e| BufferError::Io(format!("Failed to read buffer directory: {}", e)))?
122        {
123            let entry = entry
124                .map_err(|e| BufferError::Io(format!("Failed to read directory entry: {}", e)))?;
125            let path = entry.path();
126
127            if !path.is_file() {
128                continue;
129            }
130
131            let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
132
133            // Skip non-buffer files
134            if !filename.ends_with(".json")
135                && !filename.ends_with(".json.gz")
136                && !filename.ends_with(".jsonl")
137                && !filename.ends_with(".jsonl.gz")
138            {
139                continue;
140            }
141
142            // Check age
143            if self.is_expired(&path) {
144                // Delete expired files
145                let _ = fs::remove_file(&path);
146                continue;
147            }
148
149            // Read events from file
150            let file_events = self.read_file(&path)?;
151            for event in file_events {
152                events.push(BufferedEvent {
153                    event,
154                    filepath: path.clone(),
155                });
156            }
157        }
158
159        // Sort by event timestamp (oldest first)
160        events.sort_by(|a, b| a.event.captured_at.cmp(&b.event.captured_at));
161
162        Ok(events)
163    }
164
165    /// Remove a specific buffered file.
166    pub fn remove_file(&self, filepath: &Path) -> PhyTraceResult<()> {
167        fs::remove_file(filepath)
168            .map_err(|e| BufferError::Io(format!("Failed to remove buffer file: {}", e)))?;
169        Ok(())
170    }
171
172    /// Get the count of buffered files.
173    pub fn file_count(&self) -> usize {
174        fs::read_dir(&self.buffer_dir)
175            .map(|entries| {
176                entries
177                    .filter_map(|e| e.ok())
178                    .filter(|e| e.path().is_file())
179                    .filter(|e| {
180                        let name = e.file_name().to_string_lossy().to_string();
181                        name.ends_with(".json")
182                            || name.ends_with(".json.gz")
183                            || name.ends_with(".jsonl")
184                            || name.ends_with(".jsonl.gz")
185                    })
186                    .count()
187            })
188            .unwrap_or(0)
189    }
190
191    /// Get total size of buffered files in bytes.
192    pub fn total_size(&self) -> u64 {
193        fs::read_dir(&self.buffer_dir)
194            .map(|entries| {
195                entries
196                    .filter_map(|e| e.ok())
197                    .filter_map(|e| e.metadata().ok())
198                    .filter(|m| m.is_file())
199                    .map(|m| m.len())
200                    .sum()
201            })
202            .unwrap_or(0)
203    }
204
205    /// Clear all buffered events.
206    pub fn clear(&self) -> PhyTraceResult<()> {
207        for entry in fs::read_dir(&self.buffer_dir)
208            .map_err(|e| BufferError::Io(format!("Failed to read buffer directory: {}", e)))?
209        {
210            let entry = entry
211                .map_err(|e| BufferError::Io(format!("Failed to read directory entry: {}", e)))?;
212            let path = entry.path();
213
214            if path.is_file() {
215                let _ = fs::remove_file(&path);
216            }
217        }
218        Ok(())
219    }
220
221    /// Cleanup expired files.
222    pub fn cleanup_expired(&self) -> PhyTraceResult<usize> {
223        let mut removed = 0;
224
225        for entry in fs::read_dir(&self.buffer_dir)
226            .map_err(|e| BufferError::Io(format!("Failed to read buffer directory: {}", e)))?
227        {
228            let entry = entry
229                .map_err(|e| BufferError::Io(format!("Failed to read directory entry: {}", e)))?;
230            let path = entry.path();
231
232            if path.is_file() && self.is_expired(&path) {
233                let _ = fs::remove_file(&path);
234                removed += 1;
235            }
236        }
237
238        Ok(removed)
239    }
240
241    // =========================================================================
242    // Private helpers
243    // =========================================================================
244
245    fn generate_filename(&self) -> String {
246        let ts = self.timestamp_nanos();
247        if self.config.compress {
248            format!("event_{}.json.gz", ts)
249        } else {
250            format!("event_{}.json", ts)
251        }
252    }
253
254    fn timestamp_nanos(&self) -> u128 {
255        SystemTime::now()
256            .duration_since(UNIX_EPOCH)
257            .unwrap_or(Duration::ZERO)
258            .as_nanos()
259    }
260
261    fn check_capacity(&self) -> PhyTraceResult<()> {
262        let current_size = self.total_size();
263        if current_size >= self.config.max_size_bytes {
264            return Err(BufferError::Full(format!(
265                "Buffer full: {} bytes (max {})",
266                current_size, self.config.max_size_bytes
267            ))
268            .into());
269        }
270        Ok(())
271    }
272
273    fn is_expired(&self, path: &Path) -> bool {
274        let max_age = Duration::from_secs(self.config.max_age_secs);
275
276        path.metadata()
277            .and_then(|m| m.modified())
278            .map(|modified| {
279                SystemTime::now()
280                    .duration_since(modified)
281                    .map(|age| age > max_age)
282                    .unwrap_or(false)
283            })
284            .unwrap_or(false)
285    }
286
287    fn write_compressed(&self, filepath: &Path, content: &str) -> PhyTraceResult<()> {
288        let file = File::create(filepath)
289            .map_err(|e| BufferError::Io(format!("Failed to create buffer file: {}", e)))?;
290        let mut encoder = GzEncoder::new(file, Compression::fast());
291        encoder
292            .write_all(content.as_bytes())
293            .map_err(|e| BufferError::Io(format!("Failed to write compressed data: {}", e)))?;
294        encoder
295            .finish()
296            .map_err(|e| BufferError::Io(format!("Failed to finish compression: {}", e)))?;
297        Ok(())
298    }
299
300    fn read_file(&self, path: &Path) -> PhyTraceResult<Vec<UdmEvent>> {
301        let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
302
303        let is_compressed = filename.ends_with(".gz");
304        let is_batch = filename.contains("batch")
305            || filename.ends_with(".jsonl")
306            || filename.ends_with(".jsonl.gz");
307
308        if is_batch {
309            self.read_batch_file(path, is_compressed)
310        } else {
311            self.read_single_file(path, is_compressed).map(|e| vec![e])
312        }
313    }
314
315    fn read_single_file(&self, path: &Path, compressed: bool) -> PhyTraceResult<UdmEvent> {
316        let content = if compressed {
317            let file = File::open(path)
318                .map_err(|e| BufferError::Io(format!("Failed to open buffer file: {}", e)))?;
319            let mut decoder = GzDecoder::new(file);
320            let mut content = String::new();
321            std::io::Read::read_to_string(&mut decoder, &mut content)
322                .map_err(|e| BufferError::Io(format!("Failed to read compressed file: {}", e)))?;
323            content
324        } else {
325            fs::read_to_string(path)
326                .map_err(|e| BufferError::Io(format!("Failed to read buffer file: {}", e)))?
327        };
328
329        serde_json::from_str(&content).map_err(|e| BufferError::Serialization(e.to_string()).into())
330    }
331
332    fn read_batch_file(&self, path: &Path, compressed: bool) -> PhyTraceResult<Vec<UdmEvent>> {
333        let file = File::open(path)
334            .map_err(|e| BufferError::Io(format!("Failed to open buffer file: {}", e)))?;
335
336        let reader: Box<dyn BufRead> = if compressed {
337            Box::new(BufReader::new(GzDecoder::new(file)))
338        } else {
339            Box::new(BufReader::new(file))
340        };
341
342        let mut events = Vec::new();
343        for line in reader.lines() {
344            let line = line.map_err(|e| BufferError::Io(format!("Failed to read line: {}", e)))?;
345            if line.trim().is_empty() {
346                continue;
347            }
348            let event: UdmEvent = serde_json::from_str(&line)
349                .map_err(|e| BufferError::Serialization(e.to_string()))?;
350            events.push(event);
351        }
352
353        Ok(events)
354    }
355}
356
357/// A buffered event with its file path.
358#[derive(Debug)]
359pub struct BufferedEvent {
360    /// The event.
361    pub event: UdmEvent,
362    /// Path to the buffer file.
363    pub filepath: PathBuf,
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use crate::models::domains::IdentityDomain;
370    use crate::models::enums::SourceType;
371    use tempfile::tempdir;
372
373    fn test_event(id: &str) -> UdmEvent {
374        UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
375            source_id: Some(id.to_string()),
376            ..Default::default()
377        })
378    }
379
380    fn test_buffer() -> (FileBuffer, tempfile::TempDir) {
381        let dir = tempdir().unwrap();
382        let config = BufferConfig {
383            path: dir.path().to_string_lossy().to_string(),
384            compress: false,
385            ..Default::default()
386        };
387        let buffer = FileBuffer::new(&config).unwrap();
388        (buffer, dir)
389    }
390
391    fn test_buffer_compressed() -> (FileBuffer, tempfile::TempDir) {
392        let dir = tempdir().unwrap();
393        let config = BufferConfig {
394            path: dir.path().to_string_lossy().to_string(),
395            compress: true,
396            ..Default::default()
397        };
398        let buffer = FileBuffer::new(&config).unwrap();
399        (buffer, dir)
400    }
401
402    #[test]
403    fn test_buffer_single_event() {
404        let (buffer, _dir) = test_buffer();
405        let event = test_event("robot-001");
406
407        buffer.buffer_event(&event).unwrap();
408
409        assert_eq!(buffer.file_count(), 1);
410    }
411
412    #[test]
413    fn test_buffer_and_read() {
414        let (buffer, _dir) = test_buffer();
415        let event = test_event("robot-001");
416
417        buffer.buffer_event(&event).unwrap();
418        let events = buffer.read_events().unwrap();
419
420        assert_eq!(events.len(), 1);
421        assert_eq!(
422            events[0].event.identity.as_ref().unwrap().source_id,
423            Some("robot-001".to_string())
424        );
425    }
426
427    #[test]
428    fn test_buffer_batch() {
429        let (buffer, _dir) = test_buffer();
430        let events = vec![
431            test_event("robot-001"),
432            test_event("robot-002"),
433            test_event("robot-003"),
434        ];
435
436        buffer.buffer_events(&events).unwrap();
437
438        let read = buffer.read_events().unwrap();
439        assert_eq!(read.len(), 3);
440    }
441
442    #[test]
443    fn test_buffer_clear() {
444        let (buffer, _dir) = test_buffer();
445        buffer.buffer_event(&test_event("robot-001")).unwrap();
446        buffer.buffer_event(&test_event("robot-002")).unwrap();
447
448        assert_eq!(buffer.file_count(), 2);
449
450        buffer.clear().unwrap();
451        assert_eq!(buffer.file_count(), 0);
452    }
453
454    #[test]
455    fn test_compressed_buffer() {
456        let dir = tempdir().unwrap();
457        let config = BufferConfig {
458            path: dir.path().to_string_lossy().to_string(),
459            compress: true,
460            ..Default::default()
461        };
462        let buffer = FileBuffer::new(&config).unwrap();
463        let event = test_event("robot-001");
464
465        buffer.buffer_event(&event).unwrap();
466        let events = buffer.read_events().unwrap();
467
468        assert_eq!(events.len(), 1);
469    }
470
471    // Additional tests for improved coverage
472
473    #[test]
474    fn test_at_path() {
475        let dir = tempdir().unwrap();
476        let buffer = FileBuffer::at_path(dir.path()).unwrap();
477
478        buffer.buffer_event(&test_event("robot-001")).unwrap();
479        assert_eq!(buffer.file_count(), 1);
480    }
481
482    #[test]
483    fn test_remove_file() {
484        let (buffer, _dir) = test_buffer();
485        buffer.buffer_event(&test_event("robot-001")).unwrap();
486
487        let events = buffer.read_events().unwrap();
488        assert_eq!(events.len(), 1);
489
490        let filepath = events[0].filepath.clone();
491        buffer.remove_file(&filepath).unwrap();
492
493        assert_eq!(buffer.file_count(), 0);
494    }
495
496    #[test]
497    fn test_total_size() {
498        let (buffer, _dir) = test_buffer();
499
500        assert_eq!(buffer.total_size(), 0);
501
502        buffer.buffer_event(&test_event("robot-001")).unwrap();
503        let size1 = buffer.total_size();
504        assert!(size1 > 0);
505
506        buffer.buffer_event(&test_event("robot-002")).unwrap();
507        let size2 = buffer.total_size();
508        assert!(size2 > size1);
509    }
510
511    #[test]
512    fn test_file_count_multiple() {
513        let (buffer, _dir) = test_buffer();
514
515        assert_eq!(buffer.file_count(), 0);
516
517        for i in 0..5 {
518            buffer
519                .buffer_event(&test_event(&format!("robot-{:03}", i)))
520                .unwrap();
521        }
522
523        assert_eq!(buffer.file_count(), 5);
524    }
525
526    #[test]
527    fn test_compressed_batch() {
528        let (buffer, _dir) = test_buffer_compressed();
529        let events = vec![
530            test_event("robot-001"),
531            test_event("robot-002"),
532            test_event("robot-003"),
533        ];
534
535        buffer.buffer_events(&events).unwrap();
536
537        let read = buffer.read_events().unwrap();
538        assert_eq!(read.len(), 3);
539    }
540
541    #[test]
542    fn test_uncompressed_batch() {
543        let (buffer, _dir) = test_buffer();
544        let events = vec![
545            test_event("robot-001"),
546            test_event("robot-002"),
547            test_event("robot-003"),
548        ];
549
550        buffer.buffer_events(&events).unwrap();
551
552        let read = buffer.read_events().unwrap();
553        assert_eq!(read.len(), 3);
554
555        // Verify the identity of each event
556        let ids: Vec<_> = read
557            .iter()
558            .map(|e| {
559                e.event
560                    .identity
561                    .as_ref()
562                    .unwrap()
563                    .source_id
564                    .as_ref()
565                    .unwrap()
566                    .clone()
567            })
568            .collect();
569        assert!(ids.contains(&"robot-001".to_string()));
570        assert!(ids.contains(&"robot-002".to_string()));
571        assert!(ids.contains(&"robot-003".to_string()));
572    }
573
574    #[test]
575    fn test_cleanup_expired() {
576        let dir = tempdir().unwrap();
577        let config = BufferConfig {
578            path: dir.path().to_string_lossy().to_string(),
579            compress: false,
580            max_age_secs: 0, // Immediately expire
581            ..Default::default()
582        };
583        let buffer = FileBuffer::new(&config).unwrap();
584
585        buffer.buffer_event(&test_event("robot-001")).unwrap();
586        buffer.buffer_event(&test_event("robot-002")).unwrap();
587
588        assert_eq!(buffer.file_count(), 2);
589
590        // Small delay to ensure files are considered expired
591        std::thread::sleep(std::time::Duration::from_millis(10));
592
593        let removed = buffer.cleanup_expired().unwrap();
594        assert_eq!(removed, 2);
595        assert_eq!(buffer.file_count(), 0);
596    }
597
598    #[test]
599    fn test_read_events_removes_expired() {
600        let dir = tempdir().unwrap();
601        let config = BufferConfig {
602            path: dir.path().to_string_lossy().to_string(),
603            compress: false,
604            max_age_secs: 0, // Immediately expire
605            ..Default::default()
606        };
607        let buffer = FileBuffer::new(&config).unwrap();
608
609        buffer.buffer_event(&test_event("robot-001")).unwrap();
610
611        // Small delay to ensure file is considered expired
612        std::thread::sleep(std::time::Duration::from_millis(10));
613
614        // read_events should skip and remove expired files
615        let events = buffer.read_events().unwrap();
616        assert!(events.is_empty());
617    }
618
619    #[test]
620    fn test_buffer_creates_directory() {
621        let dir = tempdir().unwrap();
622        let nested_path = dir.path().join("nested").join("buffer").join("dir");
623
624        let config = BufferConfig {
625            path: nested_path.to_string_lossy().to_string(),
626            compress: false,
627            ..Default::default()
628        };
629        let buffer = FileBuffer::new(&config).unwrap();
630
631        assert!(nested_path.exists());
632        buffer.buffer_event(&test_event("robot-001")).unwrap();
633        assert_eq!(buffer.file_count(), 1);
634    }
635
636    #[test]
637    fn test_buffer_full_error() {
638        let dir = tempdir().unwrap();
639        let config = BufferConfig {
640            path: dir.path().to_string_lossy().to_string(),
641            compress: false,
642            max_size_bytes: 1, // Very small to trigger full
643            ..Default::default()
644        };
645        let buffer = FileBuffer::new(&config).unwrap();
646
647        // First write succeeds but fills the buffer
648        buffer.buffer_event(&test_event("robot-001")).unwrap();
649
650        // Second write should fail due to buffer full
651        let result = buffer.buffer_event(&test_event("robot-002"));
652        assert!(result.is_err());
653    }
654
655    #[test]
656    fn test_read_ignores_non_buffer_files() {
657        let (buffer, dir) = test_buffer();
658
659        // Create a non-buffer file
660        std::fs::write(dir.path().join("readme.txt"), "not a buffer file").unwrap();
661        std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
662
663        buffer.buffer_event(&test_event("robot-001")).unwrap();
664
665        let events = buffer.read_events().unwrap();
666        assert_eq!(events.len(), 1);
667    }
668
669    #[test]
670    fn test_read_ignores_directories() {
671        let (buffer, dir) = test_buffer();
672
673        // Create a subdirectory
674        std::fs::create_dir(dir.path().join("subdir")).unwrap();
675
676        buffer.buffer_event(&test_event("robot-001")).unwrap();
677
678        let events = buffer.read_events().unwrap();
679        assert_eq!(events.len(), 1);
680    }
681
682    #[test]
683    fn test_events_sorted_by_timestamp() {
684        let (buffer, _dir) = test_buffer();
685
686        // Buffer events with slight delays to ensure different timestamps
687        for i in 0..3 {
688            buffer
689                .buffer_event(&test_event(&format!("robot-{:03}", i)))
690                .unwrap();
691            std::thread::sleep(std::time::Duration::from_millis(5));
692        }
693
694        let events = buffer.read_events().unwrap();
695        assert_eq!(events.len(), 3);
696
697        // Verify sorted by timestamp (oldest first)
698        for i in 0..events.len() - 1 {
699            assert!(events[i].event.captured_at <= events[i + 1].event.captured_at);
700        }
701    }
702
703    #[test]
704    fn test_clear_with_subdirectories() {
705        let (buffer, dir) = test_buffer();
706
707        // Create a subdirectory (should be ignored by clear)
708        std::fs::create_dir(dir.path().join("subdir")).unwrap();
709
710        buffer.buffer_event(&test_event("robot-001")).unwrap();
711        buffer.buffer_event(&test_event("robot-002")).unwrap();
712
713        buffer.clear().unwrap();
714
715        // Subdirectory should still exist
716        assert!(dir.path().join("subdir").exists());
717        assert_eq!(buffer.file_count(), 0);
718    }
719
720    #[test]
721    fn test_compressed_single_event_read_write() {
722        let (buffer, _dir) = test_buffer_compressed();
723
724        let event = test_event("compressed-robot");
725        buffer.buffer_event(&event).unwrap();
726
727        let events = buffer.read_events().unwrap();
728        assert_eq!(events.len(), 1);
729        assert_eq!(
730            events[0].event.identity.as_ref().unwrap().source_id,
731            Some("compressed-robot".to_string())
732        );
733    }
734
735    #[test]
736    fn test_multiple_batches() {
737        let (buffer, _dir) = test_buffer();
738
739        // Buffer two separate batches
740        buffer
741            .buffer_events(&[test_event("batch1-1"), test_event("batch1-2")])
742            .unwrap();
743        std::thread::sleep(std::time::Duration::from_millis(5));
744        buffer
745            .buffer_events(&[test_event("batch2-1"), test_event("batch2-2")])
746            .unwrap();
747
748        let events = buffer.read_events().unwrap();
749        assert_eq!(events.len(), 4);
750    }
751
752    #[test]
753    fn test_buffered_event_contains_filepath() {
754        let (buffer, _dir) = test_buffer();
755
756        buffer.buffer_event(&test_event("robot-001")).unwrap();
757
758        let events = buffer.read_events().unwrap();
759        assert_eq!(events.len(), 1);
760        assert!(events[0].filepath.exists());
761        assert!(events[0].filepath.is_file());
762    }
763
764    #[test]
765    fn test_empty_buffer_read() {
766        let (buffer, _dir) = test_buffer();
767
768        let events = buffer.read_events().unwrap();
769        assert!(events.is_empty());
770    }
771
772    #[test]
773    fn test_file_count_empty() {
774        let (buffer, _dir) = test_buffer();
775        assert_eq!(buffer.file_count(), 0);
776    }
777
778    #[test]
779    fn test_total_size_empty() {
780        let (buffer, _dir) = test_buffer();
781        assert_eq!(buffer.total_size(), 0);
782    }
783
784    #[test]
785    fn test_cleanup_expired_none_expired() {
786        let (buffer, _dir) = test_buffer();
787
788        buffer.buffer_event(&test_event("robot-001")).unwrap();
789
790        // Default max_age is large, so nothing should be expired
791        let removed = buffer.cleanup_expired().unwrap();
792        assert_eq!(removed, 0);
793        assert_eq!(buffer.file_count(), 1);
794    }
795
796    #[test]
797    fn test_clear_empty_buffer() {
798        let (buffer, _dir) = test_buffer();
799
800        // Should not error on empty buffer
801        buffer.clear().unwrap();
802        assert_eq!(buffer.file_count(), 0);
803    }
804
805    #[test]
806    fn test_buffer_events_empty_slice() {
807        let (buffer, _dir) = test_buffer();
808
809        // Should handle empty slice gracefully
810        buffer.buffer_events(&[]).unwrap();
811        assert_eq!(buffer.file_count(), 1); // File created but empty
812    }
813
814    #[test]
815    fn test_compressed_batch_read_write() {
816        let (buffer, _dir) = test_buffer_compressed();
817
818        let events: Vec<UdmEvent> = (0..5)
819            .map(|i| test_event(&format!("robot-{:03}", i)))
820            .collect();
821
822        buffer.buffer_events(&events).unwrap();
823
824        let read = buffer.read_events().unwrap();
825        assert_eq!(read.len(), 5);
826    }
827}