Skip to main content

phytrace_sdk/reliability/
batch.rs

1//! Batch processing for efficient event transmission.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::Mutex;
6
7use crate::core::config::BufferConfig;
8use crate::error::PhyTraceResult;
9use crate::models::event::UdmEvent;
10use crate::transport::traits::{BatchResult, Transport};
11
12/// Batch processor for aggregating events before transmission.
13#[derive(Debug)]
14pub struct BatchProcessor {
15    events: Arc<Mutex<Vec<UdmEvent>>>,
16    max_size: usize,
17    max_age: Duration,
18    created_at: Arc<Mutex<Instant>>,
19}
20
21impl BatchProcessor {
22    /// Create a new batch processor.
23    pub fn new(max_size: usize, max_age: Duration) -> Self {
24        Self {
25            events: Arc::new(Mutex::new(Vec::new())),
26            max_size,
27            max_age,
28            created_at: Arc::new(Mutex::new(Instant::now())),
29        }
30    }
31
32    /// Create from buffer configuration.
33    pub fn from_config(config: &BufferConfig) -> Self {
34        Self::new(
35            config.batch_size,
36            Duration::from_secs(config.flush_interval_secs),
37        )
38    }
39
40    /// Add an event to the batch.
41    ///
42    /// Returns `Some(events)` if the batch is ready to be sent.
43    pub async fn add(&self, event: UdmEvent) -> Option<Vec<UdmEvent>> {
44        let mut events = self.events.lock().await;
45        events.push(event);
46
47        if self.should_flush_inner(&events).await {
48            let batch = std::mem::take(&mut *events);
49            *self.created_at.lock().await = Instant::now();
50            Some(batch)
51        } else {
52            None
53        }
54    }
55
56    /// Add multiple events.
57    ///
58    /// Returns batches ready to be sent (may return multiple if events exceed batch size).
59    pub async fn add_many(&self, new_events: Vec<UdmEvent>) -> Vec<Vec<UdmEvent>> {
60        let mut events = self.events.lock().await;
61        let mut batches = Vec::new();
62
63        for event in new_events {
64            events.push(event);
65
66            if events.len() >= self.max_size {
67                let batch = std::mem::take(&mut *events);
68                *self.created_at.lock().await = Instant::now();
69                batches.push(batch);
70            }
71        }
72
73        batches
74    }
75
76    /// Check if the batch should be flushed.
77    pub async fn should_flush(&self) -> bool {
78        let events = self.events.lock().await;
79        self.should_flush_inner(&events).await
80    }
81
82    /// Flush the current batch regardless of conditions.
83    pub async fn flush(&self) -> Vec<UdmEvent> {
84        let mut events = self.events.lock().await;
85        let batch = std::mem::take(&mut *events);
86        *self.created_at.lock().await = Instant::now();
87        batch
88    }
89
90    /// Get current batch size.
91    pub async fn len(&self) -> usize {
92        self.events.lock().await.len()
93    }
94
95    /// Check if batch is empty.
96    pub async fn is_empty(&self) -> bool {
97        self.events.lock().await.is_empty()
98    }
99
100    /// Get age of current batch.
101    pub async fn age(&self) -> Duration {
102        self.created_at.lock().await.elapsed()
103    }
104
105    /// Send the batch using a transport.
106    pub async fn send_batch<T: Transport>(
107        &self,
108        transport: &T,
109    ) -> PhyTraceResult<Option<BatchResult>> {
110        let batch = self.flush().await;
111        if batch.is_empty() {
112            return Ok(None);
113        }
114
115        let result = transport.send_batch(&batch).await?;
116        Ok(Some(result))
117    }
118
119    // Private helper
120    async fn should_flush_inner(&self, events: &[UdmEvent]) -> bool {
121        // Check size
122        if events.len() >= self.max_size {
123            return true;
124        }
125
126        // Check age
127        let age = self.created_at.lock().await.elapsed();
128        if !events.is_empty() && age >= self.max_age {
129            return true;
130        }
131
132        false
133    }
134}
135
136/// Background batch flusher that periodically checks and sends batches.
137pub struct BatchFlusher<T: Transport> {
138    processor: Arc<BatchProcessor>,
139    transport: Arc<T>,
140    interval: Duration,
141}
142
143impl<T: Transport + 'static> BatchFlusher<T> {
144    /// Create a new batch flusher.
145    pub fn new(processor: Arc<BatchProcessor>, transport: Arc<T>, interval: Duration) -> Self {
146        Self {
147            processor,
148            transport,
149            interval,
150        }
151    }
152
153    /// Start the background flush loop.
154    ///
155    /// Returns a handle that can be used to stop the flusher.
156    pub fn start(self) -> FlushHandle {
157        let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
158
159        let processor = self.processor;
160        let transport = self.transport;
161        let interval = self.interval;
162
163        tokio::spawn(async move {
164            let mut interval_timer = tokio::time::interval(interval);
165
166            loop {
167                tokio::select! {
168                    _ = interval_timer.tick() => {
169                        if processor.should_flush().await {
170                            drop(processor.send_batch(transport.as_ref()).await);
171                        }
172                    }
173                    _ = &mut rx => {
174                        // Final flush before stopping
175                        drop(processor.send_batch(transport.as_ref()).await);
176                        break;
177                    }
178                }
179            }
180        });
181
182        FlushHandle { stop_tx: Some(tx) }
183    }
184}
185
186/// Handle to stop a background flusher.
187pub struct FlushHandle {
188    stop_tx: Option<tokio::sync::oneshot::Sender<()>>,
189}
190
191impl FlushHandle {
192    /// Stop the flusher.
193    pub fn stop(mut self) {
194        if let Some(tx) = self.stop_tx.take() {
195            // Receiver may already have been dropped if the flusher task exited;
196            // that's harmless since we just want to signal stop.
197            #[expect(
198                clippy::let_underscore_must_use,
199                reason = "send failure means the flusher task already exited, which is the desired post-condition"
200            )]
201            let _ = tx.send(());
202        }
203    }
204}
205
206impl Drop for FlushHandle {
207    fn drop(&mut self) {
208        if let Some(tx) = self.stop_tx.take() {
209            #[expect(
210                clippy::let_underscore_must_use,
211                reason = "send failure means the flusher task already exited, which is the desired post-condition"
212            )]
213            let _ = tx.send(());
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::core::config::BufferConfig;
222    use crate::models::domains::IdentityDomain;
223    use crate::models::enums::SourceType;
224    use crate::transport::mock::MockTransport;
225
226    fn test_event(id: &str) -> UdmEvent {
227        UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
228            source_id: Some(id.to_string()),
229            ..Default::default()
230        })
231    }
232
233    #[tokio::test]
234    async fn test_batch_add_below_threshold() {
235        let processor = BatchProcessor::new(5, Duration::from_secs(60));
236
237        let result = Box::pin(processor.add(test_event("1"))).await;
238        assert!(result.is_none());
239        assert_eq!(processor.len().await, 1);
240    }
241
242    #[tokio::test]
243    async fn test_batch_add_reaches_threshold() {
244        let processor = BatchProcessor::new(3, Duration::from_secs(60));
245
246        Box::pin(processor.add(test_event("1"))).await;
247        Box::pin(processor.add(test_event("2"))).await;
248        let result = Box::pin(processor.add(test_event("3"))).await;
249
250        assert!(result.is_some());
251        let batch = result.unwrap();
252        assert_eq!(batch.len(), 3);
253        assert!(processor.is_empty().await);
254    }
255
256    #[tokio::test]
257    async fn test_batch_add_many() {
258        let processor = BatchProcessor::new(3, Duration::from_secs(60));
259
260        let events = vec![
261            test_event("1"),
262            test_event("2"),
263            test_event("3"),
264            test_event("4"),
265            test_event("5"),
266        ];
267
268        let batches = processor.add_many(events).await;
269
270        // Should produce one full batch of 3
271        assert_eq!(batches.len(), 1);
272        assert_eq!(batches[0].len(), 3);
273        // Remaining 2 events in processor
274        assert_eq!(processor.len().await, 2);
275    }
276
277    #[tokio::test]
278    async fn test_batch_flush() {
279        let processor = BatchProcessor::new(10, Duration::from_secs(60));
280
281        Box::pin(processor.add(test_event("1"))).await;
282        Box::pin(processor.add(test_event("2"))).await;
283
284        let batch = processor.flush().await;
285
286        assert_eq!(batch.len(), 2);
287        assert!(processor.is_empty().await);
288    }
289
290    #[tokio::test]
291    async fn test_batch_age_trigger() {
292        let processor = BatchProcessor::new(100, Duration::from_millis(10));
293
294        Box::pin(processor.add(test_event("1"))).await;
295
296        // Wait for batch to age
297        tokio::time::sleep(Duration::from_millis(20)).await;
298
299        assert!(processor.should_flush().await);
300    }
301
302    #[tokio::test]
303    async fn test_empty_batch_no_flush() {
304        let processor = BatchProcessor::new(5, Duration::from_millis(1));
305
306        // Even with zero max age, empty batch shouldn't flush
307        tokio::time::sleep(Duration::from_millis(5)).await;
308
309        assert!(!processor.should_flush().await);
310    }
311
312    // Additional tests for improved coverage
313
314    #[tokio::test]
315    async fn test_from_config() {
316        let config = BufferConfig {
317            batch_size: 10,
318            flush_interval_secs: 30,
319            ..Default::default()
320        };
321
322        let processor = BatchProcessor::from_config(&config);
323
324        // Add 10 events - should trigger batch
325        for i in 0..9 {
326            let result = Box::pin(processor.add(test_event(&i.to_string()))).await;
327            assert!(result.is_none());
328        }
329        let result = Box::pin(processor.add(test_event("10"))).await;
330        assert!(result.is_some());
331        assert_eq!(result.unwrap().len(), 10);
332    }
333
334    #[tokio::test]
335    async fn test_batch_age() {
336        let processor = BatchProcessor::new(100, Duration::from_secs(60));
337
338        // Age should be very small initially
339        let age = processor.age().await;
340        assert!(age < Duration::from_secs(1));
341
342        // Wait a bit
343        tokio::time::sleep(Duration::from_millis(50)).await;
344
345        let age2 = processor.age().await;
346        assert!(age2 >= Duration::from_millis(50));
347    }
348
349    #[tokio::test]
350    async fn test_batch_is_empty() {
351        let processor = BatchProcessor::new(5, Duration::from_secs(60));
352
353        assert!(processor.is_empty().await);
354
355        Box::pin(processor.add(test_event("1"))).await;
356        assert!(!processor.is_empty().await);
357
358        processor.flush().await;
359        assert!(processor.is_empty().await);
360    }
361
362    #[tokio::test]
363    async fn test_batch_len() {
364        let processor = BatchProcessor::new(10, Duration::from_secs(60));
365
366        assert_eq!(processor.len().await, 0);
367
368        Box::pin(processor.add(test_event("1"))).await;
369        assert_eq!(processor.len().await, 1);
370
371        Box::pin(processor.add(test_event("2"))).await;
372        Box::pin(processor.add(test_event("3"))).await;
373        assert_eq!(processor.len().await, 3);
374    }
375
376    #[tokio::test]
377    async fn test_add_many_multiple_batches() {
378        let processor = BatchProcessor::new(3, Duration::from_secs(60));
379
380        // Add 7 events - should produce 2 full batches
381        let events: Vec<UdmEvent> = (0..7).map(|i| test_event(&i.to_string())).collect();
382
383        let batches = processor.add_many(events).await;
384
385        assert_eq!(batches.len(), 2);
386        assert_eq!(batches[0].len(), 3);
387        assert_eq!(batches[1].len(), 3);
388        // 1 event remaining
389        assert_eq!(processor.len().await, 1);
390    }
391
392    #[tokio::test]
393    async fn test_add_many_exact_multiple() {
394        let processor = BatchProcessor::new(3, Duration::from_secs(60));
395
396        // Add exactly 6 events - should produce 2 full batches with nothing remaining
397        let events: Vec<UdmEvent> = (0..6).map(|i| test_event(&i.to_string())).collect();
398
399        let batches = processor.add_many(events).await;
400
401        assert_eq!(batches.len(), 2);
402        assert!(processor.is_empty().await);
403    }
404
405    #[tokio::test]
406    async fn test_add_many_less_than_batch() {
407        let processor = BatchProcessor::new(10, Duration::from_secs(60));
408
409        let events: Vec<UdmEvent> = (0..5).map(|i| test_event(&i.to_string())).collect();
410
411        let batches = processor.add_many(events).await;
412
413        // No complete batches
414        assert!(batches.is_empty());
415        assert_eq!(processor.len().await, 5);
416    }
417
418    #[tokio::test]
419    async fn test_send_batch_with_events() {
420        let processor = BatchProcessor::new(10, Duration::from_secs(60));
421        let transport = MockTransport::new();
422
423        Box::pin(processor.add(test_event("1"))).await;
424        Box::pin(processor.add(test_event("2"))).await;
425
426        let result = processor.send_batch(&transport).await.unwrap();
427
428        assert!(result.is_some());
429        let batch_result = result.unwrap();
430        assert_eq!(batch_result.total, 2);
431        assert!(processor.is_empty().await);
432    }
433
434    #[tokio::test]
435    async fn test_send_batch_empty() {
436        let processor = BatchProcessor::new(10, Duration::from_secs(60));
437        let transport = MockTransport::new();
438
439        let result = processor.send_batch(&transport).await.unwrap();
440
441        assert!(result.is_none());
442    }
443
444    #[tokio::test]
445    async fn test_flush_resets_timer() {
446        let processor = BatchProcessor::new(100, Duration::from_millis(50));
447
448        Box::pin(processor.add(test_event("1"))).await;
449
450        // Wait for batch to age
451        tokio::time::sleep(Duration::from_millis(60)).await;
452        assert!(processor.should_flush().await);
453
454        // Flush resets the timer
455        processor.flush().await;
456
457        // Add new event
458        Box::pin(processor.add(test_event("2"))).await;
459
460        // Should not flush yet (timer was reset)
461        assert!(!processor.should_flush().await);
462    }
463
464    #[tokio::test]
465    async fn test_batch_flusher_start_and_stop() {
466        let processor = Arc::new(BatchProcessor::new(100, Duration::from_millis(10)));
467        let transport = Arc::new(MockTransport::new());
468
469        Box::pin(processor.add(test_event("1"))).await;
470
471        let flusher = BatchFlusher::new(
472            processor.clone(),
473            transport.clone(),
474            Duration::from_millis(20),
475        );
476
477        let handle = flusher.start();
478
479        // Wait for at least one flush cycle
480        tokio::time::sleep(Duration::from_millis(50)).await;
481
482        handle.stop();
483
484        // Transport should have received the event
485        assert!(transport.event_count() >= 1);
486    }
487
488    #[tokio::test]
489    async fn test_batch_flusher_final_flush_on_stop() {
490        let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
491        let transport = Arc::new(MockTransport::new());
492
493        Box::pin(processor.add(test_event("1"))).await;
494
495        let flusher = BatchFlusher::new(
496            processor.clone(),
497            transport.clone(),
498            Duration::from_secs(60), // Long interval - won't trigger naturally
499        );
500
501        let handle = flusher.start();
502
503        // Immediately stop - should trigger final flush
504        tokio::time::sleep(Duration::from_millis(10)).await;
505        handle.stop();
506
507        // Give time for final flush
508        tokio::time::sleep(Duration::from_millis(50)).await;
509
510        // Event should have been sent during final flush
511        assert_eq!(transport.event_count(), 1);
512    }
513
514    #[tokio::test]
515    async fn test_flush_handle_drop() {
516        let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
517        let transport = Arc::new(MockTransport::new());
518
519        Box::pin(processor.add(test_event("1"))).await;
520
521        let flusher = BatchFlusher::new(
522            processor.clone(),
523            transport.clone(),
524            Duration::from_secs(60),
525        );
526
527        {
528            let _handle = flusher.start();
529            // Handle dropped here
530        }
531
532        // Give time for cleanup
533        tokio::time::sleep(Duration::from_millis(50)).await;
534
535        // Event should have been sent during drop
536        assert_eq!(transport.event_count(), 1);
537    }
538
539    #[tokio::test]
540    async fn test_should_flush_size_threshold() {
541        let processor = BatchProcessor::new(3, Duration::from_secs(60));
542
543        Box::pin(processor.add(test_event("1"))).await;
544        assert!(!processor.should_flush().await);
545
546        Box::pin(processor.add(test_event("2"))).await;
547        assert!(!processor.should_flush().await);
548
549        // Note: third add will auto-flush, so we check just before
550        assert_eq!(processor.len().await, 2);
551    }
552
553    #[tokio::test]
554    async fn test_batch_processor_concurrent_access() {
555        let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
556
557        let mut handles = vec![];
558
559        for i in 0..10 {
560            let proc = processor.clone();
561            let handle = tokio::spawn(async move {
562                Box::pin(proc.add(test_event(&i.to_string()))).await;
563            });
564            handles.push(handle);
565        }
566
567        for handle in handles {
568            handle.await.unwrap();
569        }
570
571        assert_eq!(processor.len().await, 10);
572    }
573}