1use std::fs::{self, File};
7use std::io::{BufRead, BufReader, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use flate2::read::GzDecoder;
12use flate2::write::GzEncoder;
13use flate2::Compression;
14
15use crate::core::config::BufferConfig;
16use crate::error::{BufferError, PhyTraceResult};
17use crate::models::event::UdmEvent;
18
19#[derive(Debug)]
21pub struct FileBuffer {
22 config: BufferConfig,
23 buffer_dir: PathBuf,
24}
25
26impl FileBuffer {
27 pub fn new(config: &BufferConfig) -> PhyTraceResult<Self> {
29 let buffer_dir = PathBuf::from(&config.path);
30
31 if !buffer_dir.exists() {
33 fs::create_dir_all(&buffer_dir).map_err(|e| {
34 BufferError::Io(format!("Failed to create buffer directory: {}", e))
35 })?;
36 }
37
38 Ok(Self {
39 config: config.clone(),
40 buffer_dir,
41 })
42 }
43
44 pub fn at_path(path: impl Into<PathBuf>) -> PhyTraceResult<Self> {
46 let path = path.into();
47 let config = BufferConfig {
48 path: path.to_string_lossy().to_string(),
49 ..Default::default()
50 };
51 Self::new(&config)
52 }
53
54 pub fn buffer_event(&self, event: &UdmEvent) -> PhyTraceResult<()> {
56 self.check_capacity()?;
57
58 let filename = self.generate_filename();
59 let filepath = self.buffer_dir.join(&filename);
60
61 let json =
62 serde_json::to_string(event).map_err(|e| BufferError::Serialization(e.to_string()))?;
63
64 if self.config.compress {
65 self.write_compressed(&filepath, &json)?;
66 } else {
67 fs::write(&filepath, &json)
68 .map_err(|e| BufferError::Io(format!("Failed to write buffer file: {}", e)))?;
69 }
70
71 Ok(())
72 }
73
74 pub fn buffer_events(&self, events: &[UdmEvent]) -> PhyTraceResult<()> {
76 self.check_capacity()?;
77
78 let filename = format!("batch_{}.jsonl", self.timestamp_nanos());
79 let filepath = self.buffer_dir.join(&filename);
80
81 if self.config.compress {
82 let filepath = filepath.with_extension("jsonl.gz");
83 let file = File::create(&filepath)
84 .map_err(|e| BufferError::Io(format!("Failed to create buffer file: {}", e)))?;
85 let encoder = GzEncoder::new(file, Compression::fast());
86 let mut writer = BufWriter::new(encoder);
87
88 for event in events {
89 let json = serde_json::to_string(event)
90 .map_err(|e| BufferError::Serialization(e.to_string()))?;
91 writeln!(writer, "{}", json)
92 .map_err(|e| BufferError::Io(format!("Failed to write to buffer: {}", e)))?;
93 }
94 writer
95 .flush()
96 .map_err(|e| BufferError::Io(format!("Failed to flush buffer: {}", e)))?;
97 } else {
98 let file = File::create(&filepath)
99 .map_err(|e| BufferError::Io(format!("Failed to create buffer file: {}", e)))?;
100 let mut writer = BufWriter::new(file);
101
102 for event in events {
103 let json = serde_json::to_string(event)
104 .map_err(|e| BufferError::Serialization(e.to_string()))?;
105 writeln!(writer, "{}", json)
106 .map_err(|e| BufferError::Io(format!("Failed to write to buffer: {}", e)))?;
107 }
108 writer
109 .flush()
110 .map_err(|e| BufferError::Io(format!("Failed to flush buffer: {}", e)))?;
111 }
112
113 Ok(())
114 }
115
116 pub fn read_events(&self) -> PhyTraceResult<Vec<BufferedEvent>> {
118 let mut events = Vec::new();
119
120 for entry in fs::read_dir(&self.buffer_dir)
121 .map_err(|e| BufferError::Io(format!("Failed to read buffer directory: {}", e)))?
122 {
123 let entry = entry
124 .map_err(|e| BufferError::Io(format!("Failed to read directory entry: {}", e)))?;
125 let path = entry.path();
126
127 if !path.is_file() {
128 continue;
129 }
130
131 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
132
133 if !filename.ends_with(".json")
135 && !filename.ends_with(".json.gz")
136 && !filename.ends_with(".jsonl")
137 && !filename.ends_with(".jsonl.gz")
138 {
139 continue;
140 }
141
142 if self.is_expired(&path) {
144 let _ = fs::remove_file(&path);
146 continue;
147 }
148
149 let file_events = self.read_file(&path)?;
151 for event in file_events {
152 events.push(BufferedEvent {
153 event,
154 filepath: path.clone(),
155 });
156 }
157 }
158
159 events.sort_by(|a, b| a.event.captured_at.cmp(&b.event.captured_at));
161
162 Ok(events)
163 }
164
165 pub fn remove_file(&self, filepath: &Path) -> PhyTraceResult<()> {
167 fs::remove_file(filepath)
168 .map_err(|e| BufferError::Io(format!("Failed to remove buffer file: {}", e)))?;
169 Ok(())
170 }
171
172 pub fn file_count(&self) -> usize {
174 fs::read_dir(&self.buffer_dir)
175 .map(|entries| {
176 entries
177 .filter_map(|e| e.ok())
178 .filter(|e| e.path().is_file())
179 .filter(|e| {
180 let name = e.file_name().to_string_lossy().to_string();
181 name.ends_with(".json")
182 || name.ends_with(".json.gz")
183 || name.ends_with(".jsonl")
184 || name.ends_with(".jsonl.gz")
185 })
186 .count()
187 })
188 .unwrap_or(0)
189 }
190
191 pub fn total_size(&self) -> u64 {
193 fs::read_dir(&self.buffer_dir)
194 .map(|entries| {
195 entries
196 .filter_map(|e| e.ok())
197 .filter_map(|e| e.metadata().ok())
198 .filter(|m| m.is_file())
199 .map(|m| m.len())
200 .sum()
201 })
202 .unwrap_or(0)
203 }
204
205 pub fn clear(&self) -> PhyTraceResult<()> {
207 for entry in fs::read_dir(&self.buffer_dir)
208 .map_err(|e| BufferError::Io(format!("Failed to read buffer directory: {}", e)))?
209 {
210 let entry = entry
211 .map_err(|e| BufferError::Io(format!("Failed to read directory entry: {}", e)))?;
212 let path = entry.path();
213
214 if path.is_file() {
215 let _ = fs::remove_file(&path);
216 }
217 }
218 Ok(())
219 }
220
221 pub fn cleanup_expired(&self) -> PhyTraceResult<usize> {
223 let mut removed = 0;
224
225 for entry in fs::read_dir(&self.buffer_dir)
226 .map_err(|e| BufferError::Io(format!("Failed to read buffer directory: {}", e)))?
227 {
228 let entry = entry
229 .map_err(|e| BufferError::Io(format!("Failed to read directory entry: {}", e)))?;
230 let path = entry.path();
231
232 if path.is_file() && self.is_expired(&path) {
233 let _ = fs::remove_file(&path);
234 removed += 1;
235 }
236 }
237
238 Ok(removed)
239 }
240
241 fn generate_filename(&self) -> String {
246 let ts = self.timestamp_nanos();
247 if self.config.compress {
248 format!("event_{}.json.gz", ts)
249 } else {
250 format!("event_{}.json", ts)
251 }
252 }
253
254 fn timestamp_nanos(&self) -> u128 {
255 SystemTime::now()
256 .duration_since(UNIX_EPOCH)
257 .unwrap_or(Duration::ZERO)
258 .as_nanos()
259 }
260
261 fn check_capacity(&self) -> PhyTraceResult<()> {
262 let current_size = self.total_size();
263 if current_size >= self.config.max_size_bytes {
264 return Err(BufferError::Full(format!(
265 "Buffer full: {} bytes (max {})",
266 current_size, self.config.max_size_bytes
267 ))
268 .into());
269 }
270 Ok(())
271 }
272
273 fn is_expired(&self, path: &Path) -> bool {
274 let max_age = Duration::from_secs(self.config.max_age_secs);
275
276 path.metadata()
277 .and_then(|m| m.modified())
278 .map(|modified| {
279 SystemTime::now()
280 .duration_since(modified)
281 .map(|age| age > max_age)
282 .unwrap_or(false)
283 })
284 .unwrap_or(false)
285 }
286
287 fn write_compressed(&self, filepath: &Path, content: &str) -> PhyTraceResult<()> {
288 let file = File::create(filepath)
289 .map_err(|e| BufferError::Io(format!("Failed to create buffer file: {}", e)))?;
290 let mut encoder = GzEncoder::new(file, Compression::fast());
291 encoder
292 .write_all(content.as_bytes())
293 .map_err(|e| BufferError::Io(format!("Failed to write compressed data: {}", e)))?;
294 encoder
295 .finish()
296 .map_err(|e| BufferError::Io(format!("Failed to finish compression: {}", e)))?;
297 Ok(())
298 }
299
300 fn read_file(&self, path: &Path) -> PhyTraceResult<Vec<UdmEvent>> {
301 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
302
303 let is_compressed = filename.ends_with(".gz");
304 let is_batch = filename.contains("batch")
305 || filename.ends_with(".jsonl")
306 || filename.ends_with(".jsonl.gz");
307
308 if is_batch {
309 self.read_batch_file(path, is_compressed)
310 } else {
311 self.read_single_file(path, is_compressed).map(|e| vec![e])
312 }
313 }
314
315 fn read_single_file(&self, path: &Path, compressed: bool) -> PhyTraceResult<UdmEvent> {
316 let content = if compressed {
317 let file = File::open(path)
318 .map_err(|e| BufferError::Io(format!("Failed to open buffer file: {}", e)))?;
319 let mut decoder = GzDecoder::new(file);
320 let mut content = String::new();
321 std::io::Read::read_to_string(&mut decoder, &mut content)
322 .map_err(|e| BufferError::Io(format!("Failed to read compressed file: {}", e)))?;
323 content
324 } else {
325 fs::read_to_string(path)
326 .map_err(|e| BufferError::Io(format!("Failed to read buffer file: {}", e)))?
327 };
328
329 serde_json::from_str(&content).map_err(|e| BufferError::Serialization(e.to_string()).into())
330 }
331
332 fn read_batch_file(&self, path: &Path, compressed: bool) -> PhyTraceResult<Vec<UdmEvent>> {
333 let file = File::open(path)
334 .map_err(|e| BufferError::Io(format!("Failed to open buffer file: {}", e)))?;
335
336 let reader: Box<dyn BufRead> = if compressed {
337 Box::new(BufReader::new(GzDecoder::new(file)))
338 } else {
339 Box::new(BufReader::new(file))
340 };
341
342 let mut events = Vec::new();
343 for line in reader.lines() {
344 let line = line.map_err(|e| BufferError::Io(format!("Failed to read line: {}", e)))?;
345 if line.trim().is_empty() {
346 continue;
347 }
348 let event: UdmEvent = serde_json::from_str(&line)
349 .map_err(|e| BufferError::Serialization(e.to_string()))?;
350 events.push(event);
351 }
352
353 Ok(events)
354 }
355}
356
357#[derive(Debug)]
359pub struct BufferedEvent {
360 pub event: UdmEvent,
362 pub filepath: PathBuf,
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::models::domains::IdentityDomain;
370 use crate::models::enums::SourceType;
371 use tempfile::tempdir;
372
373 fn test_event(id: &str) -> UdmEvent {
374 UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
375 source_id: Some(id.to_string()),
376 ..Default::default()
377 })
378 }
379
380 fn test_buffer() -> (FileBuffer, tempfile::TempDir) {
381 let dir = tempdir().unwrap();
382 let config = BufferConfig {
383 path: dir.path().to_string_lossy().to_string(),
384 compress: false,
385 ..Default::default()
386 };
387 let buffer = FileBuffer::new(&config).unwrap();
388 (buffer, dir)
389 }
390
391 fn test_buffer_compressed() -> (FileBuffer, tempfile::TempDir) {
392 let dir = tempdir().unwrap();
393 let config = BufferConfig {
394 path: dir.path().to_string_lossy().to_string(),
395 compress: true,
396 ..Default::default()
397 };
398 let buffer = FileBuffer::new(&config).unwrap();
399 (buffer, dir)
400 }
401
402 #[test]
403 fn test_buffer_single_event() {
404 let (buffer, _dir) = test_buffer();
405 let event = test_event("robot-001");
406
407 buffer.buffer_event(&event).unwrap();
408
409 assert_eq!(buffer.file_count(), 1);
410 }
411
412 #[test]
413 fn test_buffer_and_read() {
414 let (buffer, _dir) = test_buffer();
415 let event = test_event("robot-001");
416
417 buffer.buffer_event(&event).unwrap();
418 let events = buffer.read_events().unwrap();
419
420 assert_eq!(events.len(), 1);
421 assert_eq!(
422 events[0].event.identity.as_ref().unwrap().source_id,
423 Some("robot-001".to_string())
424 );
425 }
426
427 #[test]
428 fn test_buffer_batch() {
429 let (buffer, _dir) = test_buffer();
430 let events = vec![
431 test_event("robot-001"),
432 test_event("robot-002"),
433 test_event("robot-003"),
434 ];
435
436 buffer.buffer_events(&events).unwrap();
437
438 let read = buffer.read_events().unwrap();
439 assert_eq!(read.len(), 3);
440 }
441
442 #[test]
443 fn test_buffer_clear() {
444 let (buffer, _dir) = test_buffer();
445 buffer.buffer_event(&test_event("robot-001")).unwrap();
446 buffer.buffer_event(&test_event("robot-002")).unwrap();
447
448 assert_eq!(buffer.file_count(), 2);
449
450 buffer.clear().unwrap();
451 assert_eq!(buffer.file_count(), 0);
452 }
453
454 #[test]
455 fn test_compressed_buffer() {
456 let dir = tempdir().unwrap();
457 let config = BufferConfig {
458 path: dir.path().to_string_lossy().to_string(),
459 compress: true,
460 ..Default::default()
461 };
462 let buffer = FileBuffer::new(&config).unwrap();
463 let event = test_event("robot-001");
464
465 buffer.buffer_event(&event).unwrap();
466 let events = buffer.read_events().unwrap();
467
468 assert_eq!(events.len(), 1);
469 }
470
471 #[test]
474 fn test_at_path() {
475 let dir = tempdir().unwrap();
476 let buffer = FileBuffer::at_path(dir.path()).unwrap();
477
478 buffer.buffer_event(&test_event("robot-001")).unwrap();
479 assert_eq!(buffer.file_count(), 1);
480 }
481
482 #[test]
483 fn test_remove_file() {
484 let (buffer, _dir) = test_buffer();
485 buffer.buffer_event(&test_event("robot-001")).unwrap();
486
487 let events = buffer.read_events().unwrap();
488 assert_eq!(events.len(), 1);
489
490 let filepath = events[0].filepath.clone();
491 buffer.remove_file(&filepath).unwrap();
492
493 assert_eq!(buffer.file_count(), 0);
494 }
495
496 #[test]
497 fn test_total_size() {
498 let (buffer, _dir) = test_buffer();
499
500 assert_eq!(buffer.total_size(), 0);
501
502 buffer.buffer_event(&test_event("robot-001")).unwrap();
503 let size1 = buffer.total_size();
504 assert!(size1 > 0);
505
506 buffer.buffer_event(&test_event("robot-002")).unwrap();
507 let size2 = buffer.total_size();
508 assert!(size2 > size1);
509 }
510
511 #[test]
512 fn test_file_count_multiple() {
513 let (buffer, _dir) = test_buffer();
514
515 assert_eq!(buffer.file_count(), 0);
516
517 for i in 0..5 {
518 buffer
519 .buffer_event(&test_event(&format!("robot-{:03}", i)))
520 .unwrap();
521 }
522
523 assert_eq!(buffer.file_count(), 5);
524 }
525
526 #[test]
527 fn test_compressed_batch() {
528 let (buffer, _dir) = test_buffer_compressed();
529 let events = vec![
530 test_event("robot-001"),
531 test_event("robot-002"),
532 test_event("robot-003"),
533 ];
534
535 buffer.buffer_events(&events).unwrap();
536
537 let read = buffer.read_events().unwrap();
538 assert_eq!(read.len(), 3);
539 }
540
541 #[test]
542 fn test_uncompressed_batch() {
543 let (buffer, _dir) = test_buffer();
544 let events = vec![
545 test_event("robot-001"),
546 test_event("robot-002"),
547 test_event("robot-003"),
548 ];
549
550 buffer.buffer_events(&events).unwrap();
551
552 let read = buffer.read_events().unwrap();
553 assert_eq!(read.len(), 3);
554
555 let ids: Vec<_> = read
557 .iter()
558 .map(|e| {
559 e.event
560 .identity
561 .as_ref()
562 .unwrap()
563 .source_id
564 .as_ref()
565 .unwrap()
566 .clone()
567 })
568 .collect();
569 assert!(ids.contains(&"robot-001".to_string()));
570 assert!(ids.contains(&"robot-002".to_string()));
571 assert!(ids.contains(&"robot-003".to_string()));
572 }
573
574 #[test]
575 fn test_cleanup_expired() {
576 let dir = tempdir().unwrap();
577 let config = BufferConfig {
578 path: dir.path().to_string_lossy().to_string(),
579 compress: false,
580 max_age_secs: 0, ..Default::default()
582 };
583 let buffer = FileBuffer::new(&config).unwrap();
584
585 buffer.buffer_event(&test_event("robot-001")).unwrap();
586 buffer.buffer_event(&test_event("robot-002")).unwrap();
587
588 assert_eq!(buffer.file_count(), 2);
589
590 std::thread::sleep(std::time::Duration::from_millis(10));
592
593 let removed = buffer.cleanup_expired().unwrap();
594 assert_eq!(removed, 2);
595 assert_eq!(buffer.file_count(), 0);
596 }
597
598 #[test]
599 fn test_read_events_removes_expired() {
600 let dir = tempdir().unwrap();
601 let config = BufferConfig {
602 path: dir.path().to_string_lossy().to_string(),
603 compress: false,
604 max_age_secs: 0, ..Default::default()
606 };
607 let buffer = FileBuffer::new(&config).unwrap();
608
609 buffer.buffer_event(&test_event("robot-001")).unwrap();
610
611 std::thread::sleep(std::time::Duration::from_millis(10));
613
614 let events = buffer.read_events().unwrap();
616 assert!(events.is_empty());
617 }
618
619 #[test]
620 fn test_buffer_creates_directory() {
621 let dir = tempdir().unwrap();
622 let nested_path = dir.path().join("nested").join("buffer").join("dir");
623
624 let config = BufferConfig {
625 path: nested_path.to_string_lossy().to_string(),
626 compress: false,
627 ..Default::default()
628 };
629 let buffer = FileBuffer::new(&config).unwrap();
630
631 assert!(nested_path.exists());
632 buffer.buffer_event(&test_event("robot-001")).unwrap();
633 assert_eq!(buffer.file_count(), 1);
634 }
635
636 #[test]
637 fn test_buffer_full_error() {
638 let dir = tempdir().unwrap();
639 let config = BufferConfig {
640 path: dir.path().to_string_lossy().to_string(),
641 compress: false,
642 max_size_bytes: 1, ..Default::default()
644 };
645 let buffer = FileBuffer::new(&config).unwrap();
646
647 buffer.buffer_event(&test_event("robot-001")).unwrap();
649
650 let result = buffer.buffer_event(&test_event("robot-002"));
652 assert!(result.is_err());
653 }
654
655 #[test]
656 fn test_read_ignores_non_buffer_files() {
657 let (buffer, dir) = test_buffer();
658
659 std::fs::write(dir.path().join("readme.txt"), "not a buffer file").unwrap();
661 std::fs::write(dir.path().join("data.csv"), "a,b,c").unwrap();
662
663 buffer.buffer_event(&test_event("robot-001")).unwrap();
664
665 let events = buffer.read_events().unwrap();
666 assert_eq!(events.len(), 1);
667 }
668
669 #[test]
670 fn test_read_ignores_directories() {
671 let (buffer, dir) = test_buffer();
672
673 std::fs::create_dir(dir.path().join("subdir")).unwrap();
675
676 buffer.buffer_event(&test_event("robot-001")).unwrap();
677
678 let events = buffer.read_events().unwrap();
679 assert_eq!(events.len(), 1);
680 }
681
682 #[test]
683 fn test_events_sorted_by_timestamp() {
684 let (buffer, _dir) = test_buffer();
685
686 for i in 0..3 {
688 buffer
689 .buffer_event(&test_event(&format!("robot-{:03}", i)))
690 .unwrap();
691 std::thread::sleep(std::time::Duration::from_millis(5));
692 }
693
694 let events = buffer.read_events().unwrap();
695 assert_eq!(events.len(), 3);
696
697 for i in 0..events.len() - 1 {
699 assert!(events[i].event.captured_at <= events[i + 1].event.captured_at);
700 }
701 }
702
703 #[test]
704 fn test_clear_with_subdirectories() {
705 let (buffer, dir) = test_buffer();
706
707 std::fs::create_dir(dir.path().join("subdir")).unwrap();
709
710 buffer.buffer_event(&test_event("robot-001")).unwrap();
711 buffer.buffer_event(&test_event("robot-002")).unwrap();
712
713 buffer.clear().unwrap();
714
715 assert!(dir.path().join("subdir").exists());
717 assert_eq!(buffer.file_count(), 0);
718 }
719
720 #[test]
721 fn test_compressed_single_event_read_write() {
722 let (buffer, _dir) = test_buffer_compressed();
723
724 let event = test_event("compressed-robot");
725 buffer.buffer_event(&event).unwrap();
726
727 let events = buffer.read_events().unwrap();
728 assert_eq!(events.len(), 1);
729 assert_eq!(
730 events[0].event.identity.as_ref().unwrap().source_id,
731 Some("compressed-robot".to_string())
732 );
733 }
734
735 #[test]
736 fn test_multiple_batches() {
737 let (buffer, _dir) = test_buffer();
738
739 buffer
741 .buffer_events(&[test_event("batch1-1"), test_event("batch1-2")])
742 .unwrap();
743 std::thread::sleep(std::time::Duration::from_millis(5));
744 buffer
745 .buffer_events(&[test_event("batch2-1"), test_event("batch2-2")])
746 .unwrap();
747
748 let events = buffer.read_events().unwrap();
749 assert_eq!(events.len(), 4);
750 }
751
752 #[test]
753 fn test_buffered_event_contains_filepath() {
754 let (buffer, _dir) = test_buffer();
755
756 buffer.buffer_event(&test_event("robot-001")).unwrap();
757
758 let events = buffer.read_events().unwrap();
759 assert_eq!(events.len(), 1);
760 assert!(events[0].filepath.exists());
761 assert!(events[0].filepath.is_file());
762 }
763
764 #[test]
765 fn test_empty_buffer_read() {
766 let (buffer, _dir) = test_buffer();
767
768 let events = buffer.read_events().unwrap();
769 assert!(events.is_empty());
770 }
771
772 #[test]
773 fn test_file_count_empty() {
774 let (buffer, _dir) = test_buffer();
775 assert_eq!(buffer.file_count(), 0);
776 }
777
778 #[test]
779 fn test_total_size_empty() {
780 let (buffer, _dir) = test_buffer();
781 assert_eq!(buffer.total_size(), 0);
782 }
783
784 #[test]
785 fn test_cleanup_expired_none_expired() {
786 let (buffer, _dir) = test_buffer();
787
788 buffer.buffer_event(&test_event("robot-001")).unwrap();
789
790 let removed = buffer.cleanup_expired().unwrap();
792 assert_eq!(removed, 0);
793 assert_eq!(buffer.file_count(), 1);
794 }
795
796 #[test]
797 fn test_clear_empty_buffer() {
798 let (buffer, _dir) = test_buffer();
799
800 buffer.clear().unwrap();
802 assert_eq!(buffer.file_count(), 0);
803 }
804
805 #[test]
806 fn test_buffer_events_empty_slice() {
807 let (buffer, _dir) = test_buffer();
808
809 buffer.buffer_events(&[]).unwrap();
811 assert_eq!(buffer.file_count(), 1); }
813
814 #[test]
815 fn test_compressed_batch_read_write() {
816 let (buffer, _dir) = test_buffer_compressed();
817
818 let events: Vec<UdmEvent> = (0..5)
819 .map(|i| test_event(&format!("robot-{:03}", i)))
820 .collect();
821
822 buffer.buffer_events(&events).unwrap();
823
824 let read = buffer.read_events().unwrap();
825 assert_eq!(read.len(), 5);
826 }
827}