phytrace_sdk/reliability/
batch.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::Mutex;
6
7use crate::core::config::BufferConfig;
8use crate::error::PhyTraceResult;
9use crate::models::event::UdmEvent;
10use crate::transport::traits::{BatchResult, Transport};
11
12#[derive(Debug)]
14pub struct BatchProcessor {
15 events: Arc<Mutex<Vec<UdmEvent>>>,
16 max_size: usize,
17 max_age: Duration,
18 created_at: Arc<Mutex<Instant>>,
19}
20
21impl BatchProcessor {
22 pub fn new(max_size: usize, max_age: Duration) -> Self {
24 Self {
25 events: Arc::new(Mutex::new(Vec::new())),
26 max_size,
27 max_age,
28 created_at: Arc::new(Mutex::new(Instant::now())),
29 }
30 }
31
32 pub fn from_config(config: &BufferConfig) -> Self {
34 Self::new(
35 config.batch_size,
36 Duration::from_secs(config.flush_interval_secs),
37 )
38 }
39
40 pub async fn add(&self, event: UdmEvent) -> Option<Vec<UdmEvent>> {
44 let mut events = self.events.lock().await;
45 events.push(event);
46
47 if self.should_flush_inner(&events).await {
48 let batch = std::mem::take(&mut *events);
49 *self.created_at.lock().await = Instant::now();
50 Some(batch)
51 } else {
52 None
53 }
54 }
55
56 pub async fn add_many(&self, new_events: Vec<UdmEvent>) -> Vec<Vec<UdmEvent>> {
60 let mut events = self.events.lock().await;
61 let mut batches = Vec::new();
62
63 for event in new_events {
64 events.push(event);
65
66 if events.len() >= self.max_size {
67 let batch = std::mem::take(&mut *events);
68 *self.created_at.lock().await = Instant::now();
69 batches.push(batch);
70 }
71 }
72
73 batches
74 }
75
76 pub async fn should_flush(&self) -> bool {
78 let events = self.events.lock().await;
79 self.should_flush_inner(&events).await
80 }
81
82 pub async fn flush(&self) -> Vec<UdmEvent> {
84 let mut events = self.events.lock().await;
85 let batch = std::mem::take(&mut *events);
86 *self.created_at.lock().await = Instant::now();
87 batch
88 }
89
90 pub async fn len(&self) -> usize {
92 self.events.lock().await.len()
93 }
94
95 pub async fn is_empty(&self) -> bool {
97 self.events.lock().await.is_empty()
98 }
99
100 pub async fn age(&self) -> Duration {
102 self.created_at.lock().await.elapsed()
103 }
104
105 pub async fn send_batch<T: Transport>(
107 &self,
108 transport: &T,
109 ) -> PhyTraceResult<Option<BatchResult>> {
110 let batch = self.flush().await;
111 if batch.is_empty() {
112 return Ok(None);
113 }
114
115 let result = transport.send_batch(&batch).await?;
116 Ok(Some(result))
117 }
118
119 async fn should_flush_inner(&self, events: &[UdmEvent]) -> bool {
121 if events.len() >= self.max_size {
123 return true;
124 }
125
126 let age = self.created_at.lock().await.elapsed();
128 if !events.is_empty() && age >= self.max_age {
129 return true;
130 }
131
132 false
133 }
134}
135
136pub struct BatchFlusher<T: Transport> {
138 processor: Arc<BatchProcessor>,
139 transport: Arc<T>,
140 interval: Duration,
141}
142
143impl<T: Transport + 'static> BatchFlusher<T> {
144 pub fn new(processor: Arc<BatchProcessor>, transport: Arc<T>, interval: Duration) -> Self {
146 Self {
147 processor,
148 transport,
149 interval,
150 }
151 }
152
153 pub fn start(self) -> FlushHandle {
157 let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
158
159 let processor = self.processor;
160 let transport = self.transport;
161 let interval = self.interval;
162
163 tokio::spawn(async move {
164 let mut interval_timer = tokio::time::interval(interval);
165
166 loop {
167 tokio::select! {
168 _ = interval_timer.tick() => {
169 if processor.should_flush().await {
170 let _ = processor.send_batch(transport.as_ref()).await;
171 }
172 }
173 _ = &mut rx => {
174 let _ = processor.send_batch(transport.as_ref()).await;
176 break;
177 }
178 }
179 }
180 });
181
182 FlushHandle { stop_tx: Some(tx) }
183 }
184}
185
186pub struct FlushHandle {
188 stop_tx: Option<tokio::sync::oneshot::Sender<()>>,
189}
190
191impl FlushHandle {
192 pub fn stop(mut self) {
194 if let Some(tx) = self.stop_tx.take() {
195 let _ = tx.send(());
196 }
197 }
198}
199
200impl Drop for FlushHandle {
201 fn drop(&mut self) {
202 if let Some(tx) = self.stop_tx.take() {
203 let _ = tx.send(());
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::core::config::BufferConfig;
212 use crate::models::domains::IdentityDomain;
213 use crate::models::enums::SourceType;
214 use crate::transport::mock::MockTransport;
215
216 fn test_event(id: &str) -> UdmEvent {
217 UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
218 source_id: Some(id.to_string()),
219 ..Default::default()
220 })
221 }
222
223 #[tokio::test]
224 async fn test_batch_add_below_threshold() {
225 let processor = BatchProcessor::new(5, Duration::from_secs(60));
226
227 let result = processor.add(test_event("1")).await;
228 assert!(result.is_none());
229 assert_eq!(processor.len().await, 1);
230 }
231
232 #[tokio::test]
233 async fn test_batch_add_reaches_threshold() {
234 let processor = BatchProcessor::new(3, Duration::from_secs(60));
235
236 processor.add(test_event("1")).await;
237 processor.add(test_event("2")).await;
238 let result = processor.add(test_event("3")).await;
239
240 assert!(result.is_some());
241 let batch = result.unwrap();
242 assert_eq!(batch.len(), 3);
243 assert!(processor.is_empty().await);
244 }
245
246 #[tokio::test]
247 async fn test_batch_add_many() {
248 let processor = BatchProcessor::new(3, Duration::from_secs(60));
249
250 let events = vec![
251 test_event("1"),
252 test_event("2"),
253 test_event("3"),
254 test_event("4"),
255 test_event("5"),
256 ];
257
258 let batches = processor.add_many(events).await;
259
260 assert_eq!(batches.len(), 1);
262 assert_eq!(batches[0].len(), 3);
263 assert_eq!(processor.len().await, 2);
265 }
266
267 #[tokio::test]
268 async fn test_batch_flush() {
269 let processor = BatchProcessor::new(10, Duration::from_secs(60));
270
271 processor.add(test_event("1")).await;
272 processor.add(test_event("2")).await;
273
274 let batch = processor.flush().await;
275
276 assert_eq!(batch.len(), 2);
277 assert!(processor.is_empty().await);
278 }
279
280 #[tokio::test]
281 async fn test_batch_age_trigger() {
282 let processor = BatchProcessor::new(100, Duration::from_millis(10));
283
284 processor.add(test_event("1")).await;
285
286 tokio::time::sleep(Duration::from_millis(20)).await;
288
289 assert!(processor.should_flush().await);
290 }
291
292 #[tokio::test]
293 async fn test_empty_batch_no_flush() {
294 let processor = BatchProcessor::new(5, Duration::from_millis(1));
295
296 tokio::time::sleep(Duration::from_millis(5)).await;
298
299 assert!(!processor.should_flush().await);
300 }
301
302 #[tokio::test]
305 async fn test_from_config() {
306 let config = BufferConfig {
307 batch_size: 10,
308 flush_interval_secs: 30,
309 ..Default::default()
310 };
311
312 let processor = BatchProcessor::from_config(&config);
313
314 for i in 0..9 {
316 let result = processor.add(test_event(&i.to_string())).await;
317 assert!(result.is_none());
318 }
319 let result = processor.add(test_event("10")).await;
320 assert!(result.is_some());
321 assert_eq!(result.unwrap().len(), 10);
322 }
323
324 #[tokio::test]
325 async fn test_batch_age() {
326 let processor = BatchProcessor::new(100, Duration::from_secs(60));
327
328 let age = processor.age().await;
330 assert!(age < Duration::from_secs(1));
331
332 tokio::time::sleep(Duration::from_millis(50)).await;
334
335 let age2 = processor.age().await;
336 assert!(age2 >= Duration::from_millis(50));
337 }
338
339 #[tokio::test]
340 async fn test_batch_is_empty() {
341 let processor = BatchProcessor::new(5, Duration::from_secs(60));
342
343 assert!(processor.is_empty().await);
344
345 processor.add(test_event("1")).await;
346 assert!(!processor.is_empty().await);
347
348 processor.flush().await;
349 assert!(processor.is_empty().await);
350 }
351
352 #[tokio::test]
353 async fn test_batch_len() {
354 let processor = BatchProcessor::new(10, Duration::from_secs(60));
355
356 assert_eq!(processor.len().await, 0);
357
358 processor.add(test_event("1")).await;
359 assert_eq!(processor.len().await, 1);
360
361 processor.add(test_event("2")).await;
362 processor.add(test_event("3")).await;
363 assert_eq!(processor.len().await, 3);
364 }
365
366 #[tokio::test]
367 async fn test_add_many_multiple_batches() {
368 let processor = BatchProcessor::new(3, Duration::from_secs(60));
369
370 let events: Vec<UdmEvent> = (0..7).map(|i| test_event(&i.to_string())).collect();
372
373 let batches = processor.add_many(events).await;
374
375 assert_eq!(batches.len(), 2);
376 assert_eq!(batches[0].len(), 3);
377 assert_eq!(batches[1].len(), 3);
378 assert_eq!(processor.len().await, 1);
380 }
381
382 #[tokio::test]
383 async fn test_add_many_exact_multiple() {
384 let processor = BatchProcessor::new(3, Duration::from_secs(60));
385
386 let events: Vec<UdmEvent> = (0..6).map(|i| test_event(&i.to_string())).collect();
388
389 let batches = processor.add_many(events).await;
390
391 assert_eq!(batches.len(), 2);
392 assert!(processor.is_empty().await);
393 }
394
395 #[tokio::test]
396 async fn test_add_many_less_than_batch() {
397 let processor = BatchProcessor::new(10, Duration::from_secs(60));
398
399 let events: Vec<UdmEvent> = (0..5).map(|i| test_event(&i.to_string())).collect();
400
401 let batches = processor.add_many(events).await;
402
403 assert!(batches.is_empty());
405 assert_eq!(processor.len().await, 5);
406 }
407
408 #[tokio::test]
409 async fn test_send_batch_with_events() {
410 let processor = BatchProcessor::new(10, Duration::from_secs(60));
411 let transport = MockTransport::new();
412
413 processor.add(test_event("1")).await;
414 processor.add(test_event("2")).await;
415
416 let result = processor.send_batch(&transport).await.unwrap();
417
418 assert!(result.is_some());
419 let batch_result = result.unwrap();
420 assert_eq!(batch_result.total, 2);
421 assert!(processor.is_empty().await);
422 }
423
424 #[tokio::test]
425 async fn test_send_batch_empty() {
426 let processor = BatchProcessor::new(10, Duration::from_secs(60));
427 let transport = MockTransport::new();
428
429 let result = processor.send_batch(&transport).await.unwrap();
430
431 assert!(result.is_none());
432 }
433
434 #[tokio::test]
435 async fn test_flush_resets_timer() {
436 let processor = BatchProcessor::new(100, Duration::from_millis(50));
437
438 processor.add(test_event("1")).await;
439
440 tokio::time::sleep(Duration::from_millis(60)).await;
442 assert!(processor.should_flush().await);
443
444 processor.flush().await;
446
447 processor.add(test_event("2")).await;
449
450 assert!(!processor.should_flush().await);
452 }
453
454 #[tokio::test]
455 async fn test_batch_flusher_start_and_stop() {
456 let processor = Arc::new(BatchProcessor::new(100, Duration::from_millis(10)));
457 let transport = Arc::new(MockTransport::new());
458
459 processor.add(test_event("1")).await;
460
461 let flusher = BatchFlusher::new(
462 processor.clone(),
463 transport.clone(),
464 Duration::from_millis(20),
465 );
466
467 let handle = flusher.start();
468
469 tokio::time::sleep(Duration::from_millis(50)).await;
471
472 handle.stop();
473
474 assert!(transport.event_count() >= 1);
476 }
477
478 #[tokio::test]
479 async fn test_batch_flusher_final_flush_on_stop() {
480 let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
481 let transport = Arc::new(MockTransport::new());
482
483 processor.add(test_event("1")).await;
484
485 let flusher = BatchFlusher::new(
486 processor.clone(),
487 transport.clone(),
488 Duration::from_secs(60), );
490
491 let handle = flusher.start();
492
493 tokio::time::sleep(Duration::from_millis(10)).await;
495 handle.stop();
496
497 tokio::time::sleep(Duration::from_millis(50)).await;
499
500 assert_eq!(transport.event_count(), 1);
502 }
503
504 #[tokio::test]
505 async fn test_flush_handle_drop() {
506 let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
507 let transport = Arc::new(MockTransport::new());
508
509 processor.add(test_event("1")).await;
510
511 let flusher = BatchFlusher::new(
512 processor.clone(),
513 transport.clone(),
514 Duration::from_secs(60),
515 );
516
517 {
518 let _handle = flusher.start();
519 }
521
522 tokio::time::sleep(Duration::from_millis(50)).await;
524
525 assert_eq!(transport.event_count(), 1);
527 }
528
529 #[tokio::test]
530 async fn test_should_flush_size_threshold() {
531 let processor = BatchProcessor::new(3, Duration::from_secs(60));
532
533 processor.add(test_event("1")).await;
534 assert!(!processor.should_flush().await);
535
536 processor.add(test_event("2")).await;
537 assert!(!processor.should_flush().await);
538
539 assert_eq!(processor.len().await, 2);
541 }
542
543 #[tokio::test]
544 async fn test_batch_processor_concurrent_access() {
545 let processor = Arc::new(BatchProcessor::new(100, Duration::from_secs(60)));
546
547 let mut handles = vec![];
548
549 for i in 0..10 {
550 let proc = processor.clone();
551 let handle = tokio::spawn(async move {
552 proc.add(test_event(&i.to_string())).await;
553 });
554 handles.push(handle);
555 }
556
557 for handle in handles {
558 handle.await.unwrap();
559 }
560
561 assert_eq!(processor.len().await, 10);
562 }
563}