phytrace_sdk/reliability/
batch.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::Mutex;
6
7use crate::core::config::BufferConfig;
8use crate::error::PhyTraceResult;
9use crate::models::event::UdmEvent;
10use crate::transport::traits::{BatchResult, Transport};
11
12#[derive(Debug)]
14pub struct BatchProcessor {
15 events: Arc<Mutex<Vec<UdmEvent>>>,
16 max_size: usize,
17 max_age: Duration,
18 created_at: Arc<Mutex<Instant>>,
19}
20
21impl BatchProcessor {
22 pub fn new(max_size: usize, max_age: Duration) -> Self {
24 Self {
25 events: Arc::new(Mutex::new(Vec::new())),
26 max_size,
27 max_age,
28 created_at: Arc::new(Mutex::new(Instant::now())),
29 }
30 }
31
32 pub fn from_config(config: &BufferConfig) -> Self {
34 Self::new(
35 config.batch_size,
36 Duration::from_secs(config.flush_interval_secs),
37 )
38 }
39
40 pub async fn add(&self, event: UdmEvent) -> Option<Vec<UdmEvent>> {
44 let mut events = self.events.lock().await;
45 events.push(event);
46
47 if self.should_flush_inner(&events).await {
48 let batch = std::mem::take(&mut *events);
49 *self.created_at.lock().await = Instant::now();
50 Some(batch)
51 } else {
52 None
53 }
54 }
55
56 pub async fn add_many(&self, new_events: Vec<UdmEvent>) -> Vec<Vec<UdmEvent>> {
60 let mut events = self.events.lock().await;
61 let mut batches = Vec::new();
62
63 for event in new_events {
64 events.push(event);
65
66 if events.len() >= self.max_size {
67 let batch = std::mem::take(&mut *events);
68 *self.created_at.lock().await = Instant::now();
69 batches.push(batch);
70 }
71 }
72
73 batches
74 }
75
76 pub async fn should_flush(&self) -> bool {
78 let events = self.events.lock().await;
79 self.should_flush_inner(&events).await
80 }
81
82 pub async fn flush(&self) -> Vec<UdmEvent> {
84 let mut events = self.events.lock().await;
85 let batch = std::mem::take(&mut *events);
86 *self.created_at.lock().await = Instant::now();
87 batch
88 }
89
90 pub async fn len(&self) -> usize {
92 self.events.lock().await.len()
93 }
94
95 pub async fn is_empty(&self) -> bool {
97 self.events.lock().await.is_empty()
98 }
99
100 pub async fn age(&self) -> Duration {
102 self.created_at.lock().await.elapsed()
103 }
104
105 pub async fn send_batch<T: Transport>(
107 &self,
108 transport: &T,
109 ) -> PhyTraceResult<Option<BatchResult>> {
110 let batch = self.flush().await;
111 if batch.is_empty() {
112 return Ok(None);
113 }
114
115 let result = transport.send_batch(&batch).await?;
116 Ok(Some(result))
117 }
118
119 async fn should_flush_inner(&self, events: &[UdmEvent]) -> bool {
121 if events.len() >= self.max_size {
123 return true;
124 }
125
126 let age = self.created_at.lock().await.elapsed();
128 if !events.is_empty() && age >= self.max_age {
129 return true;
130 }
131
132 false
133 }
134}
135
136pub struct BatchFlusher<T: Transport> {
138 processor: Arc<BatchProcessor>,
139 transport: Arc<T>,
140 interval: Duration,
141}
142
143impl<T: Transport + 'static> BatchFlusher<T> {
144 pub fn new(processor: Arc<BatchProcessor>, transport: Arc<T>, interval: Duration) -> Self {
146 Self {
147 processor,
148 transport,
149 interval,
150 }
151 }
152
153 pub fn start(self) -> FlushHandle {
157 let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
158
159 let processor = self.processor;
160 let transport = self.transport;
161 let interval = self.interval;
162
163 tokio::spawn(async move {
164 let mut interval_timer = tokio::time::interval(interval);
165
166 loop {
167 tokio::select! {
168 _ = interval_timer.tick() => {
169 if processor.should_flush().await {
170 drop(processor.send_batch(transport.as_ref()).await);
171 }
172 }
173 _ = &mut rx => {
174 drop(processor.send_batch(transport.as_ref()).await);
176 break;
177 }
178 }
179 }
180 });
181
182 FlushHandle { stop_tx: Some(tx) }
183 }
184}
185
186pub struct FlushHandle {
188 stop_tx: Option<tokio::sync::oneshot::Sender<()>>,
189}
190
191impl FlushHandle {
192 pub fn stop(mut self) {
194 if let Some(tx) = self.stop_tx.take() {
195 #[expect(
198 clippy::let_underscore_must_use,
199 reason = "send failure means the flusher task already exited, which is the desired post-condition"
200 )]
201 let _ = tx.send(());
202 }
203 }
204}
205
206impl Drop for FlushHandle {
207 fn drop(&mut self) {
208 if let Some(tx) = self.stop_tx.take() {
209 #[expect(
210 clippy::let_underscore_must_use,
211 reason = "send failure means the flusher task already exited, which is the desired post-condition"
212 )]
213 let _ = tx.send(());
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use crate::core::config::BufferConfig;
222 use crate::models::domains::IdentityDomain;
223 use crate::models::enums::SourceType;
224 use crate::transport::mock::MockTransport;
225
226 fn test_event(id: &str) -> UdmEvent {
227 UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
228 source_id: Some(id.to_string()),
229 ..Default::default()
230 })
231 }
232
233 #[tokio::test]
234 async fn test_batch_add_below_threshold() {
235 let processor = BatchProcessor::new(5, Duration::from_secs(60));
236
237 let result = Box::pin(processor.add(test_event("1"))).await;
238 assert!(result.is_none());
239 assert_eq!(processor.len().await, 1);
240 }
241
242 #[tokio::test]
243 async fn test_batch_add_reaches_threshold() {
244 let processor = BatchProcessor::new(3, Duration::from_secs(60));
245
246 Box::pin(processor.add(test_event("1"))).await;
247 Box::pin(processor.add(test_event("2"))).await;
248 let result = Box::pin(processor.add(test_event("3"))).await;
249
250 assert!(result.is_some());
251 let batch = result.unwrap();
252 assert_eq!(batch.len(), 3);
253 assert!(processor.is_empty().await);
254 }
255
256 #[tokio::test]
257 async fn test_batch_add_many() {
258 let processor = BatchProcessor::new(3, Duration::from_secs(60));
259
260 let events = vec![
261 test_event("1"),
262 test_event("2"),
263 test_event("3"),
264 test_event("4"),
265 test_event("5"),
266 ];
267
268 let batches = processor.add_many(events).await;
269
270 assert_eq!(batches.len(), 1);
272 assert_eq!(batches[0].len(), 3);
273 assert_eq!(processor.len().await, 2);
275 }
276
277 #[tokio::test]
278 async fn test_batch_flush() {
279 let processor = BatchProcessor::new(10, Duration::from_secs(60));
280
281 Box::pin(processor.add(test_event("1"))).await;
282 Box::pin(processor.add(test_event("2"))).await;
283
284 let batch = processor.flush().await;
285
286 assert_eq!(batch.len(), 2);
287 assert!(processor.is_empty().await);
288 }
289
290 #[tokio::test]
291 async fn test_batch_age_trigger() {
292 let processor = BatchProcessor::new(100, Duration::from_millis(10));
293
294 Box::pin(processor.add(test_event("1"))).await;
295
296 tokio::time::sleep(Duration::from_millis(20)).await;
298
299 assert!(processor.should_flush().await);
300 }
301
302 #[tokio::test]
303 async fn test_empty_batch_no_flush() {
304 let processor = BatchProcessor::new(5, Duration::from_millis(1));
305
306 tokio::time::sleep(Duration::from_millis(5)).await;
308
309 assert!(!processor.should_flush().await);
310 }
311
312 #[tokio::test]
315 async fn test_from_config() {
316 let config = BufferConfig {
317 batch_size: 10,
318 flush_interval_secs: 30,
319 ..Default::default()
320 };
321
322 let processor = BatchProcessor::from_config(&config);
323
324 for i in 0..9 {
326 let result = Box::pin(processor.add(test_event(&i.to_string()))).await;
327 assert!(result.is_none());
328 }
329 let result = Box::pin(processor.add(test_event("10"))).await;
330 assert!(result.is_some());
331 assert_eq!(result.unwrap().len(), 10);
332 }
333
334 #[tokio::test]
335 async fn test_batch_age() {
336 let processor = BatchProcessor::new(100, Duration::from_secs(60));
337
338 let age = processor.age().await;
340 assert!(age < Duration::from_secs(1));
341
342 tokio::time::sleep(Duration::from_millis(50)).await;
344
345 let age2 = processor.age().await;
346 assert!(age2 >= Duration::from_millis(50));
347 }
348
349 #[tokio::test]
350 async fn test_batch_is_empty() {
351 let processor = BatchProcessor::new(5, Duration::from_secs(60));
352
353 assert!(processor.is_empty().await);
354
355 Box::pin(processor.add(test_event("1"))).await;
356 assert!(!processor.is_empty().await);
357
358 processor.flush().await;
359 assert!(processor.is_empty().await);
360 }
361
362 #[tokio::test]
363 async fn test_batch_len() {
364 let processor = BatchProcessor::new(10, Duration::from_secs(60));
365
366 assert_eq!(processor.len().await, 0);
367
368 Box::pin(processor.add(test_event("1"))).await;
369 assert_eq!(processor.len().await, 1);
370
371 Box::pin(processor.add(test_event("2"))).await;
372 Box::pin(processor.add(test_event("3"))).await;
373 assert_eq!(processor.len().await, 3);
374 }
375
376 #[tokio::test]
377 async fn test_add_many_multiple_batches() {
378 let processor = BatchProcessor::new(3, Duration::from_secs(60));
379
380 let events: Vec<UdmEvent> = (0..7).map(|i| test_event(&i.to_string())).collect();
382
383 let batches = processor.add_many(events).await;
384
385 assert_eq!(batches.len(), 2);
386 assert_eq!(batches[0].len(), 3);
387 assert_eq!(batches[1].len(), 3);
388 assert_eq!(processor.len().await, 1);
390 }
391
392 #[tokio::test]
393 async fn test_add_many_exact_multiple() {
394 let processor = BatchProcessor::new(3, Duration::from_secs(60));
395
396 let events: Vec<UdmEvent> = (0..6).map(|i| test_event(&i.to_string())).collect();
398
399 let batches = processor.add_many(events).await;
400
401 assert_eq!(batches.len(), 2);
402 assert!(processor.is_empty().await);
403 }
404
405 #[tokio::test]
406 async fn test_add_many_less_than_batch() {
407 let processor = BatchProcessor::new(10, Duration::from_secs(60));
408
409 let events: Vec<UdmEvent> = (0..5).map(|i| test_event(&i.to_string())).collect();
410
411 let batches = processor.add_many(events).await;
412
413 assert!(batches.is_empty());
415 assert_eq!(processor.len().await, 5);
416 }
417
418 #[tokio::test]
419 async fn test_send_batch_with_events() {
420 let processor = BatchProcessor::new(10, Duration::from_secs(60));
421 let transport = MockTransport::new();
422
423 Box::pin(processor.add(test_event("1"))).await;
424 Box::pin(processor.add(test_event("2"))).await;
425
426 let result = processor.send_batch(&transport).await.unwrap();
427
428 assert!(result.is_some());
429 let batch_result = result.unwrap();
430 assert_eq!(batch_result.total, 2);
431 assert!(processor.is_empty().await);
432 }
433
434 #[tokio::test]
435 async fn test_send_batch_empty() {
436 let processor = BatchProcessor::new(10, Duration::from_secs(60));
437 let transport = MockTransport::new();
438
439 let result = processor.send_batch(&transport).await.unwrap();
440
441 assert!(result.is_none());
442 }
443
444 #[tokio::test]
445 async fn test_flush_resets_timer() {
446 let processor = BatchProcessor::new(100, Duration::from_millis(50));
447
448 Box::pin(processor.add(test_event("1"))).await;
449
450 tokio::time::sleep(Duration::from_millis(60)).await;
452 assert!(processor.should_flush().await);
453
454 processor.flush().await;
456
457 Box::pin(processor.add(test_event("2"))).await;
459
460 assert!(!processor.should_flush().await);
462 }
463
464 #[tokio::test]
465 async fn test_batch_flusher_start_and_stop() {
466 let processor = Arc::new(BatchProcessor::new(100, Duration::from_millis(10)));
467 let transport = Arc::new(MockTransport::new());
468
469 Box::pin(processor.add(test_event("1"))).await;
470
471 let flusher = BatchFlusher::new(
472 processor.clone(),
473 transport.clone(),
474 Duration::from_millis(20),
475 );
476
477 let handle = flusher.start();
478
479 tokio::time::sleep(Duration::from_millis(50)).await;
481
482 handle.stop();
483
484 assert!(transport.event_count() >= 1);
486 }
487
488 #[tokio::test]
489 async fn test_batch_flusher_final_flush_on_stop() {
490 let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
491 let transport = Arc::new(MockTransport::new());
492
493 Box::pin(processor.add(test_event("1"))).await;
494
495 let flusher = BatchFlusher::new(
496 processor.clone(),
497 transport.clone(),
498 Duration::from_secs(60), );
500
501 let handle = flusher.start();
502
503 tokio::time::sleep(Duration::from_millis(10)).await;
505 handle.stop();
506
507 tokio::time::sleep(Duration::from_millis(50)).await;
509
510 assert_eq!(transport.event_count(), 1);
512 }
513
514 #[tokio::test]
515 async fn test_flush_handle_drop() {
516 let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
517 let transport = Arc::new(MockTransport::new());
518
519 Box::pin(processor.add(test_event("1"))).await;
520
521 let flusher = BatchFlusher::new(
522 processor.clone(),
523 transport.clone(),
524 Duration::from_secs(60),
525 );
526
527 {
528 let _handle = flusher.start();
529 }
531
532 tokio::time::sleep(Duration::from_millis(50)).await;
534
535 assert_eq!(transport.event_count(), 1);
537 }
538
539 #[tokio::test]
540 async fn test_should_flush_size_threshold() {
541 let processor = BatchProcessor::new(3, Duration::from_secs(60));
542
543 Box::pin(processor.add(test_event("1"))).await;
544 assert!(!processor.should_flush().await);
545
546 Box::pin(processor.add(test_event("2"))).await;
547 assert!(!processor.should_flush().await);
548
549 assert_eq!(processor.len().await, 2);
551 }
552
553 #[tokio::test]
554 async fn test_batch_processor_concurrent_access() {
555 let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
556
557 let mut handles = vec![];
558
559 for i in 0..10 {
560 let proc = processor.clone();
561 let handle = tokio::spawn(async move {
562 Box::pin(proc.add(test_event(&i.to_string()))).await;
563 });
564 handles.push(handle);
565 }
566
567 for handle in handles {
568 handle.await.unwrap();
569 }
570
571 assert_eq!(processor.len().await, 10);
572 }
573}