phytrace_sdk/reliability/
batch.rs

1//! Batch processing for efficient event transmission.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::Mutex;
6
7use crate::core::config::BufferConfig;
8use crate::error::PhyTraceResult;
9use crate::models::event::UdmEvent;
10use crate::transport::traits::{BatchResult, Transport};
11
12/// Batch processor for aggregating events before transmission.
13#[derive(Debug)]
14pub struct BatchProcessor {
15    events: Arc<Mutex<Vec<UdmEvent>>>,
16    max_size: usize,
17    max_age: Duration,
18    created_at: Arc<Mutex<Instant>>,
19}
20
21impl BatchProcessor {
22    /// Create a new batch processor.
23    pub fn new(max_size: usize, max_age: Duration) -> Self {
24        Self {
25            events: Arc::new(Mutex::new(Vec::new())),
26            max_size,
27            max_age,
28            created_at: Arc::new(Mutex::new(Instant::now())),
29        }
30    }
31
32    /// Create from buffer configuration.
33    pub fn from_config(config: &BufferConfig) -> Self {
34        Self::new(
35            config.batch_size,
36            Duration::from_secs(config.flush_interval_secs),
37        )
38    }
39
40    /// Add an event to the batch.
41    ///
42    /// Returns `Some(events)` if the batch is ready to be sent.
43    pub async fn add(&self, event: UdmEvent) -> Option<Vec<UdmEvent>> {
44        let mut events = self.events.lock().await;
45        events.push(event);
46
47        if self.should_flush_inner(&events).await {
48            let batch = std::mem::take(&mut *events);
49            *self.created_at.lock().await = Instant::now();
50            Some(batch)
51        } else {
52            None
53        }
54    }
55
56    /// Add multiple events.
57    ///
58    /// Returns batches ready to be sent (may return multiple if events exceed batch size).
59    pub async fn add_many(&self, new_events: Vec<UdmEvent>) -> Vec<Vec<UdmEvent>> {
60        let mut events = self.events.lock().await;
61        let mut batches = Vec::new();
62
63        for event in new_events {
64            events.push(event);
65
66            if events.len() >= self.max_size {
67                let batch = std::mem::take(&mut *events);
68                *self.created_at.lock().await = Instant::now();
69                batches.push(batch);
70            }
71        }
72
73        batches
74    }
75
76    /// Check if the batch should be flushed.
77    pub async fn should_flush(&self) -> bool {
78        let events = self.events.lock().await;
79        self.should_flush_inner(&events).await
80    }
81
82    /// Flush the current batch regardless of conditions.
83    pub async fn flush(&self) -> Vec<UdmEvent> {
84        let mut events = self.events.lock().await;
85        let batch = std::mem::take(&mut *events);
86        *self.created_at.lock().await = Instant::now();
87        batch
88    }
89
90    /// Get current batch size.
91    pub async fn len(&self) -> usize {
92        self.events.lock().await.len()
93    }
94
95    /// Check if batch is empty.
96    pub async fn is_empty(&self) -> bool {
97        self.events.lock().await.is_empty()
98    }
99
100    /// Get age of current batch.
101    pub async fn age(&self) -> Duration {
102        self.created_at.lock().await.elapsed()
103    }
104
105    /// Send the batch using a transport.
106    pub async fn send_batch<T: Transport>(
107        &self,
108        transport: &T,
109    ) -> PhyTraceResult<Option<BatchResult>> {
110        let batch = self.flush().await;
111        if batch.is_empty() {
112            return Ok(None);
113        }
114
115        let result = transport.send_batch(&batch).await?;
116        Ok(Some(result))
117    }
118
119    // Private helper
120    async fn should_flush_inner(&self, events: &[UdmEvent]) -> bool {
121        // Check size
122        if events.len() >= self.max_size {
123            return true;
124        }
125
126        // Check age
127        let age = self.created_at.lock().await.elapsed();
128        if !events.is_empty() && age >= self.max_age {
129            return true;
130        }
131
132        false
133    }
134}
135
136/// Background batch flusher that periodically checks and sends batches.
137pub struct BatchFlusher<T: Transport> {
138    processor: Arc<BatchProcessor>,
139    transport: Arc<T>,
140    interval: Duration,
141}
142
143impl<T: Transport + 'static> BatchFlusher<T> {
144    /// Create a new batch flusher.
145    pub fn new(processor: Arc<BatchProcessor>, transport: Arc<T>, interval: Duration) -> Self {
146        Self {
147            processor,
148            transport,
149            interval,
150        }
151    }
152
153    /// Start the background flush loop.
154    ///
155    /// Returns a handle that can be used to stop the flusher.
156    pub fn start(self) -> FlushHandle {
157        let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
158
159        let processor = self.processor;
160        let transport = self.transport;
161        let interval = self.interval;
162
163        tokio::spawn(async move {
164            let mut interval_timer = tokio::time::interval(interval);
165
166            loop {
167                tokio::select! {
168                    _ = interval_timer.tick() => {
169                        if processor.should_flush().await {
170                            let _ = processor.send_batch(transport.as_ref()).await;
171                        }
172                    }
173                    _ = &mut rx => {
174                        // Final flush before stopping
175                        let _ = processor.send_batch(transport.as_ref()).await;
176                        break;
177                    }
178                }
179            }
180        });
181
182        FlushHandle { stop_tx: Some(tx) }
183    }
184}
185
186/// Handle to stop a background flusher.
187pub struct FlushHandle {
188    stop_tx: Option<tokio::sync::oneshot::Sender<()>>,
189}
190
191impl FlushHandle {
192    /// Stop the flusher.
193    pub fn stop(mut self) {
194        if let Some(tx) = self.stop_tx.take() {
195            let _ = tx.send(());
196        }
197    }
198}
199
200impl Drop for FlushHandle {
201    fn drop(&mut self) {
202        if let Some(tx) = self.stop_tx.take() {
203            let _ = tx.send(());
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::core::config::BufferConfig;
212    use crate::models::domains::IdentityDomain;
213    use crate::models::enums::SourceType;
214    use crate::transport::mock::MockTransport;
215
216    fn test_event(id: &str) -> UdmEvent {
217        UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
218            source_id: Some(id.to_string()),
219            ..Default::default()
220        })
221    }
222
223    #[tokio::test]
224    async fn test_batch_add_below_threshold() {
225        let processor = BatchProcessor::new(5, Duration::from_secs(60));
226
227        let result = processor.add(test_event("1")).await;
228        assert!(result.is_none());
229        assert_eq!(processor.len().await, 1);
230    }
231
232    #[tokio::test]
233    async fn test_batch_add_reaches_threshold() {
234        let processor = BatchProcessor::new(3, Duration::from_secs(60));
235
236        processor.add(test_event("1")).await;
237        processor.add(test_event("2")).await;
238        let result = processor.add(test_event("3")).await;
239
240        assert!(result.is_some());
241        let batch = result.unwrap();
242        assert_eq!(batch.len(), 3);
243        assert!(processor.is_empty().await);
244    }
245
246    #[tokio::test]
247    async fn test_batch_add_many() {
248        let processor = BatchProcessor::new(3, Duration::from_secs(60));
249
250        let events = vec![
251            test_event("1"),
252            test_event("2"),
253            test_event("3"),
254            test_event("4"),
255            test_event("5"),
256        ];
257
258        let batches = processor.add_many(events).await;
259
260        // Should produce one full batch of 3
261        assert_eq!(batches.len(), 1);
262        assert_eq!(batches[0].len(), 3);
263        // Remaining 2 events in processor
264        assert_eq!(processor.len().await, 2);
265    }
266
267    #[tokio::test]
268    async fn test_batch_flush() {
269        let processor = BatchProcessor::new(10, Duration::from_secs(60));
270
271        processor.add(test_event("1")).await;
272        processor.add(test_event("2")).await;
273
274        let batch = processor.flush().await;
275
276        assert_eq!(batch.len(), 2);
277        assert!(processor.is_empty().await);
278    }
279
280    #[tokio::test]
281    async fn test_batch_age_trigger() {
282        let processor = BatchProcessor::new(100, Duration::from_millis(10));
283
284        processor.add(test_event("1")).await;
285
286        // Wait for batch to age
287        tokio::time::sleep(Duration::from_millis(20)).await;
288
289        assert!(processor.should_flush().await);
290    }
291
292    #[tokio::test]
293    async fn test_empty_batch_no_flush() {
294        let processor = BatchProcessor::new(5, Duration::from_millis(1));
295
296        // Even with zero max age, empty batch shouldn't flush
297        tokio::time::sleep(Duration::from_millis(5)).await;
298
299        assert!(!processor.should_flush().await);
300    }
301
302    // Additional tests for improved coverage
303
304    #[tokio::test]
305    async fn test_from_config() {
306        let config = BufferConfig {
307            batch_size: 10,
308            flush_interval_secs: 30,
309            ..Default::default()
310        };
311
312        let processor = BatchProcessor::from_config(&config);
313
314        // Add 10 events - should trigger batch
315        for i in 0..9 {
316            let result = processor.add(test_event(&i.to_string())).await;
317            assert!(result.is_none());
318        }
319        let result = processor.add(test_event("10")).await;
320        assert!(result.is_some());
321        assert_eq!(result.unwrap().len(), 10);
322    }
323
324    #[tokio::test]
325    async fn test_batch_age() {
326        let processor = BatchProcessor::new(100, Duration::from_secs(60));
327
328        // Age should be very small initially
329        let age = processor.age().await;
330        assert!(age < Duration::from_secs(1));
331
332        // Wait a bit
333        tokio::time::sleep(Duration::from_millis(50)).await;
334
335        let age2 = processor.age().await;
336        assert!(age2 >= Duration::from_millis(50));
337    }
338
339    #[tokio::test]
340    async fn test_batch_is_empty() {
341        let processor = BatchProcessor::new(5, Duration::from_secs(60));
342
343        assert!(processor.is_empty().await);
344
345        processor.add(test_event("1")).await;
346        assert!(!processor.is_empty().await);
347
348        processor.flush().await;
349        assert!(processor.is_empty().await);
350    }
351
352    #[tokio::test]
353    async fn test_batch_len() {
354        let processor = BatchProcessor::new(10, Duration::from_secs(60));
355
356        assert_eq!(processor.len().await, 0);
357
358        processor.add(test_event("1")).await;
359        assert_eq!(processor.len().await, 1);
360
361        processor.add(test_event("2")).await;
362        processor.add(test_event("3")).await;
363        assert_eq!(processor.len().await, 3);
364    }
365
366    #[tokio::test]
367    async fn test_add_many_multiple_batches() {
368        let processor = BatchProcessor::new(3, Duration::from_secs(60));
369
370        // Add 7 events - should produce 2 full batches
371        let events: Vec<UdmEvent> = (0..7).map(|i| test_event(&i.to_string())).collect();
372
373        let batches = processor.add_many(events).await;
374
375        assert_eq!(batches.len(), 2);
376        assert_eq!(batches[0].len(), 3);
377        assert_eq!(batches[1].len(), 3);
378        // 1 event remaining
379        assert_eq!(processor.len().await, 1);
380    }
381
382    #[tokio::test]
383    async fn test_add_many_exact_multiple() {
384        let processor = BatchProcessor::new(3, Duration::from_secs(60));
385
386        // Add exactly 6 events - should produce 2 full batches with nothing remaining
387        let events: Vec<UdmEvent> = (0..6).map(|i| test_event(&i.to_string())).collect();
388
389        let batches = processor.add_many(events).await;
390
391        assert_eq!(batches.len(), 2);
392        assert!(processor.is_empty().await);
393    }
394
395    #[tokio::test]
396    async fn test_add_many_less_than_batch() {
397        let processor = BatchProcessor::new(10, Duration::from_secs(60));
398
399        let events: Vec<UdmEvent> = (0..5).map(|i| test_event(&i.to_string())).collect();
400
401        let batches = processor.add_many(events).await;
402
403        // No complete batches
404        assert!(batches.is_empty());
405        assert_eq!(processor.len().await, 5);
406    }
407
408    #[tokio::test]
409    async fn test_send_batch_with_events() {
410        let processor = BatchProcessor::new(10, Duration::from_secs(60));
411        let transport = MockTransport::new();
412
413        processor.add(test_event("1")).await;
414        processor.add(test_event("2")).await;
415
416        let result = processor.send_batch(&transport).await.unwrap();
417
418        assert!(result.is_some());
419        let batch_result = result.unwrap();
420        assert_eq!(batch_result.total, 2);
421        assert!(processor.is_empty().await);
422    }
423
424    #[tokio::test]
425    async fn test_send_batch_empty() {
426        let processor = BatchProcessor::new(10, Duration::from_secs(60));
427        let transport = MockTransport::new();
428
429        let result = processor.send_batch(&transport).await.unwrap();
430
431        assert!(result.is_none());
432    }
433
434    #[tokio::test]
435    async fn test_flush_resets_timer() {
436        let processor = BatchProcessor::new(100, Duration::from_millis(50));
437
438        processor.add(test_event("1")).await;
439
440        // Wait for batch to age
441        tokio::time::sleep(Duration::from_millis(60)).await;
442        assert!(processor.should_flush().await);
443
444        // Flush resets the timer
445        processor.flush().await;
446
447        // Add new event
448        processor.add(test_event("2")).await;
449
450        // Should not flush yet (timer was reset)
451        assert!(!processor.should_flush().await);
452    }
453
454    #[tokio::test]
455    async fn test_batch_flusher_start_and_stop() {
456        let processor = Arc::new(BatchProcessor::new(100, Duration::from_millis(10)));
457        let transport = Arc::new(MockTransport::new());
458
459        processor.add(test_event("1")).await;
460
461        let flusher = BatchFlusher::new(
462            processor.clone(),
463            transport.clone(),
464            Duration::from_millis(20),
465        );
466
467        let handle = flusher.start();
468
469        // Wait for at least one flush cycle
470        tokio::time::sleep(Duration::from_millis(50)).await;
471
472        handle.stop();
473
474        // Transport should have received the event
475        assert!(transport.event_count() >= 1);
476    }
477
478    #[tokio::test]
479    async fn test_batch_flusher_final_flush_on_stop() {
480        let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
481        let transport = Arc::new(MockTransport::new());
482
483        processor.add(test_event("1")).await;
484
485        let flusher = BatchFlusher::new(
486            processor.clone(),
487            transport.clone(),
488            Duration::from_secs(60), // Long interval - won't trigger naturally
489        );
490
491        let handle = flusher.start();
492
493        // Immediately stop - should trigger final flush
494        tokio::time::sleep(Duration::from_millis(10)).await;
495        handle.stop();
496
497        // Give time for final flush
498        tokio::time::sleep(Duration::from_millis(50)).await;
499
500        // Event should have been sent during final flush
501        assert_eq!(transport.event_count(), 1);
502    }
503
504    #[tokio::test]
505    async fn test_flush_handle_drop() {
506        let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
507        let transport = Arc::new(MockTransport::new());
508
509        processor.add(test_event("1")).await;
510
511        let flusher = BatchFlusher::new(
512            processor.clone(),
513            transport.clone(),
514            Duration::from_secs(60),
515        );
516
517        {
518            let _handle = flusher.start();
519            // Handle dropped here
520        }
521
522        // Give time for cleanup
523        tokio::time::sleep(Duration::from_millis(50)).await;
524
525        // Event should have been sent during drop
526        assert_eq!(transport.event_count(), 1);
527    }
528
529    #[tokio::test]
530    async fn test_should_flush_size_threshold() {
531        let processor = BatchProcessor::new(3, Duration::from_secs(60));
532
533        processor.add(test_event("1")).await;
534        assert!(!processor.should_flush().await);
535
536        processor.add(test_event("2")).await;
537        assert!(!processor.should_flush().await);
538
539        // Note: third add will auto-flush, so we check just before
540        assert_eq!(processor.len().await, 2);
541    }
542
543    #[tokio::test]
544    async fn test_batch_processor_concurrent_access() {
545        let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
546
547        let mut handles = vec![];
548
549        for i in 0..10 {
550            let proc = processor.clone();
551            let handle = tokio::spawn(async move {
552                proc.add(test_event(&i.to_string())).await;
553            });
554            handles.push(handle);
555        }
556
557        for handle in handles {
558            handle.await.unwrap();
559        }
560
561        assert_eq!(processor.len().await, 10);
562    }
563}