phytrace_sdk/transport/
mock.rs1#![expect(
7 clippy::unwrap_used,
8 reason = "mock transport for testing uses unwrap for clarity"
9)]
10
11use async_trait::async_trait;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::{Arc, Mutex};
14use std::time::Duration;
15
16use super::traits::{BatchResult, SendResult, Transport, TransportStats};
17use crate::error::PhyTraceResult;
18use crate::models::event::UdmEvent;
19
20#[derive(Debug, Clone)]
22pub struct MockConfig {
23 pub connected: bool,
25
26 pub latency_ms: u64,
28
29 pub failure_rate: f64,
31
32 pub failure_status: u16,
34
35 pub failure_message: String,
37}
38
39impl Default for MockConfig {
40 fn default() -> Self {
41 Self {
42 connected: true,
43 latency_ms: 10,
44 failure_rate: 0.0,
45 failure_status: 500,
46 failure_message: "Mock failure".to_string(),
47 }
48 }
49}
50
51#[derive(Debug)]
53pub struct MockTransport {
54 config: Arc<Mutex<MockConfig>>,
55 connected: AtomicBool,
56 events: Arc<Mutex<Vec<UdmEvent>>>,
57 stats: Arc<MockStats>,
58}
59
60#[derive(Debug, Default)]
61struct MockStats {
62 events_sent: AtomicU64,
63 events_succeeded: AtomicU64,
64 events_failed: AtomicU64,
65 bytes_sent: AtomicU64,
66 total_latency_ms: AtomicU64,
67}
68
69impl MockTransport {
70 pub fn new() -> Self {
72 Self::with_config(MockConfig::default())
73 }
74
75 pub fn with_config(config: MockConfig) -> Self {
77 let connected = config.connected;
78 Self {
79 config: Arc::new(Mutex::new(config)),
80 connected: AtomicBool::new(connected),
81 events: Arc::new(Mutex::new(Vec::new())),
82 stats: Arc::new(MockStats::default()),
83 }
84 }
85
86 pub fn sent_events(&self) -> Vec<UdmEvent> {
88 self.events.lock().unwrap().clone()
89 }
90
91 pub fn last_event(&self) -> Option<UdmEvent> {
93 self.events.lock().unwrap().last().cloned()
94 }
95
96 pub fn event_count(&self) -> usize {
98 self.events.lock().unwrap().len()
99 }
100
101 pub fn clear_events(&self) {
103 self.events.lock().unwrap().clear();
104 }
105
106 pub fn set_config(&self, config: MockConfig) {
108 *self.config.lock().unwrap() = config;
109 }
110
111 pub fn set_failure_rate(&self, rate: f64) {
113 self.config.lock().unwrap().failure_rate = rate;
114 }
115
116 pub fn set_latency(&self, latency_ms: u64) {
118 self.config.lock().unwrap().latency_ms = latency_ms;
119 }
120
121 fn should_fail(&self) -> bool {
123 let config = self.config.lock().unwrap();
124 if config.failure_rate <= 0.0 {
125 return false;
126 }
127 if config.failure_rate >= 1.0 {
128 return true;
129 }
130 rand::random::<f64>() < config.failure_rate
131 }
132
133 fn get_latency(&self) -> u64 {
135 self.config.lock().unwrap().latency_ms
136 }
137
138 fn failure_info(&self) -> (u16, String) {
140 let config = self.config.lock().unwrap();
141 (config.failure_status, config.failure_message.clone())
142 }
143}
144
145impl Default for MockTransport {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151#[async_trait]
152impl Transport for MockTransport {
153 async fn send(&self, event: &UdmEvent) -> PhyTraceResult<SendResult> {
154 self.stats.events_sent.fetch_add(1, Ordering::Relaxed);
155
156 let latency = self.get_latency();
158 if latency > 0 {
159 tokio::time::sleep(Duration::from_millis(latency)).await;
160 }
161
162 self.events.lock().unwrap().push(event.clone());
164
165 if self.should_fail() {
167 self.stats.events_failed.fetch_add(1, Ordering::Relaxed);
168 let (status, message) = self.failure_info();
169 Ok(SendResult::failure(Some(status), message))
170 } else {
171 self.stats.events_succeeded.fetch_add(1, Ordering::Relaxed);
172 self.stats
173 .total_latency_ms
174 .fetch_add(latency, Ordering::Relaxed);
175
176 let bytes = serde_json::to_string(event)
177 .map(|s| s.len() as u64)
178 .unwrap_or(0);
179 self.stats.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
180
181 Ok(SendResult::success(latency))
182 }
183 }
184
185 async fn send_batch(&self, events: &[UdmEvent]) -> PhyTraceResult<BatchResult> {
186 if events.is_empty() {
187 return Ok(BatchResult::all_success(0, 0));
188 }
189
190 let total = events.len();
191 self.stats
192 .events_sent
193 .fetch_add(total as u64, Ordering::Relaxed);
194
195 let latency = self.get_latency();
197 if latency > 0 {
198 tokio::time::sleep(Duration::from_millis(latency)).await;
199 }
200
201 {
203 let mut stored = self.events.lock().unwrap();
204 stored.extend(events.iter().cloned());
205 }
206
207 if self.should_fail() {
209 self.stats
210 .events_failed
211 .fetch_add(total as u64, Ordering::Relaxed);
212 let (_, message) = self.failure_info();
213 Ok(BatchResult::all_failed(total, &message))
214 } else {
215 self.stats
216 .events_succeeded
217 .fetch_add(total as u64, Ordering::Relaxed);
218 self.stats
219 .total_latency_ms
220 .fetch_add(latency, Ordering::Relaxed);
221
222 let bytes = serde_json::to_string(events)
223 .map(|s| s.len() as u64)
224 .unwrap_or(0);
225 self.stats.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
226
227 Ok(BatchResult::all_success(total, latency))
228 }
229 }
230
231 async fn is_connected(&self) -> bool {
232 self.connected.load(Ordering::Relaxed)
233 }
234
235 async fn connect(&mut self) -> PhyTraceResult<()> {
236 self.connected.store(true, Ordering::Relaxed);
237 Ok(())
238 }
239
240 async fn disconnect(&mut self) -> PhyTraceResult<()> {
241 self.connected.store(false, Ordering::Relaxed);
242 Ok(())
243 }
244
245 fn stats(&self) -> TransportStats {
246 TransportStats {
247 events_sent: self.stats.events_sent.load(Ordering::Relaxed),
248 events_succeeded: self.stats.events_succeeded.load(Ordering::Relaxed),
249 events_failed: self.stats.events_failed.load(Ordering::Relaxed),
250 bytes_sent: self.stats.bytes_sent.load(Ordering::Relaxed),
251 total_latency_ms: self.stats.total_latency_ms.load(Ordering::Relaxed),
252 ..Default::default()
253 }
254 }
255
256 fn name(&self) -> &str {
257 "mock"
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::models::domains::IdentityDomain;
265 use crate::models::enums::SourceType;
266
267 fn test_event() -> UdmEvent {
268 UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
269 source_id: Some("test-robot".to_string()),
270 ..Default::default()
271 })
272 }
273
274 #[tokio::test]
275 async fn test_mock_send() {
276 let transport = MockTransport::new();
277 let event = test_event();
278
279 let result = transport.send(&event).await.unwrap();
280 assert!(result.success);
281 assert_eq!(transport.event_count(), 1);
282 }
283
284 #[tokio::test]
285 async fn test_mock_batch() {
286 let transport = MockTransport::new();
287 let events = vec![test_event(), test_event(), test_event()];
288
289 let result = transport.send_batch(&events).await.unwrap();
290 assert!(result.all_succeeded());
291 assert_eq!(transport.event_count(), 3);
292 }
293
294 #[tokio::test]
295 async fn test_mock_failure() {
296 let transport = MockTransport::with_config(MockConfig {
297 failure_rate: 1.0, failure_status: 503,
299 failure_message: "Service unavailable".to_string(),
300 ..Default::default()
301 });
302
303 let result = transport.send(&test_event()).await.unwrap();
304 assert!(!result.success);
305 assert_eq!(result.status_code, Some(503));
306 }
307
308 #[tokio::test]
309 async fn test_mock_stats() {
310 let transport = MockTransport::new();
311
312 transport.send(&test_event()).await.unwrap();
313 transport.send(&test_event()).await.unwrap();
314
315 let stats = transport.stats();
316 assert_eq!(stats.events_sent, 2);
317 assert_eq!(stats.events_succeeded, 2);
318 }
319
320 #[tokio::test]
321 async fn test_mock_get_events() {
322 let transport = MockTransport::new();
323 let event = test_event();
324
325 transport.send(&event).await.unwrap();
326
327 let sent = transport.sent_events();
328 assert_eq!(sent.len(), 1);
329 assert_eq!(
330 sent[0].identity.as_ref().unwrap().source_id,
331 Some("test-robot".to_string())
332 );
333 }
334}