phytrace_sdk/transport/
mock.rs1use async_trait::async_trait;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Mutex};
9use std::time::Duration;
10
11use super::traits::{BatchResult, SendResult, Transport, TransportStats};
12use crate::error::PhyTraceResult;
13use crate::models::event::UdmEvent;
14
15#[derive(Debug, Clone)]
17pub struct MockConfig {
18 pub connected: bool,
20
21 pub latency_ms: u64,
23
24 pub failure_rate: f64,
26
27 pub failure_status: u16,
29
30 pub failure_message: String,
32}
33
34impl Default for MockConfig {
35 fn default() -> Self {
36 Self {
37 connected: true,
38 latency_ms: 10,
39 failure_rate: 0.0,
40 failure_status: 500,
41 failure_message: "Mock failure".to_string(),
42 }
43 }
44}
45
46#[derive(Debug)]
48pub struct MockTransport {
49 config: Arc<Mutex<MockConfig>>,
50 connected: AtomicBool,
51 events: Arc<Mutex<Vec<UdmEvent>>>,
52 stats: Arc<MockStats>,
53}
54
55#[derive(Debug, Default)]
56struct MockStats {
57 events_sent: AtomicU64,
58 events_succeeded: AtomicU64,
59 events_failed: AtomicU64,
60 bytes_sent: AtomicU64,
61 total_latency_ms: AtomicU64,
62}
63
64impl MockTransport {
65 pub fn new() -> Self {
67 Self::with_config(MockConfig::default())
68 }
69
70 pub fn with_config(config: MockConfig) -> Self {
72 let connected = config.connected;
73 Self {
74 config: Arc::new(Mutex::new(config)),
75 connected: AtomicBool::new(connected),
76 events: Arc::new(Mutex::new(Vec::new())),
77 stats: Arc::new(MockStats::default()),
78 }
79 }
80
81 pub fn sent_events(&self) -> Vec<UdmEvent> {
83 self.events.lock().unwrap().clone()
84 }
85
86 pub fn last_event(&self) -> Option<UdmEvent> {
88 self.events.lock().unwrap().last().cloned()
89 }
90
91 pub fn event_count(&self) -> usize {
93 self.events.lock().unwrap().len()
94 }
95
96 pub fn clear_events(&self) {
98 self.events.lock().unwrap().clear();
99 }
100
101 pub fn set_config(&self, config: MockConfig) {
103 *self.config.lock().unwrap() = config;
104 }
105
106 pub fn set_failure_rate(&self, rate: f64) {
108 self.config.lock().unwrap().failure_rate = rate;
109 }
110
111 pub fn set_latency(&self, latency_ms: u64) {
113 self.config.lock().unwrap().latency_ms = latency_ms;
114 }
115
116 fn should_fail(&self) -> bool {
118 let config = self.config.lock().unwrap();
119 if config.failure_rate <= 0.0 {
120 return false;
121 }
122 if config.failure_rate >= 1.0 {
123 return true;
124 }
125 rand::random::<f64>() < config.failure_rate
126 }
127
128 fn get_latency(&self) -> u64 {
130 self.config.lock().unwrap().latency_ms
131 }
132
133 fn failure_info(&self) -> (u16, String) {
135 let config = self.config.lock().unwrap();
136 (config.failure_status, config.failure_message.clone())
137 }
138}
139
140impl Default for MockTransport {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146#[async_trait]
147impl Transport for MockTransport {
148 async fn send(&self, event: &UdmEvent) -> PhyTraceResult<SendResult> {
149 self.stats.events_sent.fetch_add(1, Ordering::Relaxed);
150
151 let latency = self.get_latency();
153 if latency > 0 {
154 tokio::time::sleep(Duration::from_millis(latency)).await;
155 }
156
157 self.events.lock().unwrap().push(event.clone());
159
160 if self.should_fail() {
162 self.stats.events_failed.fetch_add(1, Ordering::Relaxed);
163 let (status, message) = self.failure_info();
164 Ok(SendResult::failure(Some(status), message))
165 } else {
166 self.stats.events_succeeded.fetch_add(1, Ordering::Relaxed);
167 self.stats
168 .total_latency_ms
169 .fetch_add(latency, Ordering::Relaxed);
170
171 let bytes = serde_json::to_string(event)
172 .map(|s| s.len() as u64)
173 .unwrap_or(0);
174 self.stats.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
175
176 Ok(SendResult::success(latency))
177 }
178 }
179
180 async fn send_batch(&self, events: &[UdmEvent]) -> PhyTraceResult<BatchResult> {
181 if events.is_empty() {
182 return Ok(BatchResult::all_success(0, 0));
183 }
184
185 let total = events.len();
186 self.stats
187 .events_sent
188 .fetch_add(total as u64, Ordering::Relaxed);
189
190 let latency = self.get_latency();
192 if latency > 0 {
193 tokio::time::sleep(Duration::from_millis(latency)).await;
194 }
195
196 {
198 let mut stored = self.events.lock().unwrap();
199 stored.extend(events.iter().cloned());
200 }
201
202 if self.should_fail() {
204 self.stats
205 .events_failed
206 .fetch_add(total as u64, Ordering::Relaxed);
207 let (_, message) = self.failure_info();
208 Ok(BatchResult::all_failed(total, &message))
209 } else {
210 self.stats
211 .events_succeeded
212 .fetch_add(total as u64, Ordering::Relaxed);
213 self.stats
214 .total_latency_ms
215 .fetch_add(latency, Ordering::Relaxed);
216
217 let bytes = serde_json::to_string(events)
218 .map(|s| s.len() as u64)
219 .unwrap_or(0);
220 self.stats.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
221
222 Ok(BatchResult::all_success(total, latency))
223 }
224 }
225
226 async fn is_connected(&self) -> bool {
227 self.connected.load(Ordering::Relaxed)
228 }
229
230 async fn connect(&mut self) -> PhyTraceResult<()> {
231 self.connected.store(true, Ordering::Relaxed);
232 Ok(())
233 }
234
235 async fn disconnect(&mut self) -> PhyTraceResult<()> {
236 self.connected.store(false, Ordering::Relaxed);
237 Ok(())
238 }
239
240 fn stats(&self) -> TransportStats {
241 TransportStats {
242 events_sent: self.stats.events_sent.load(Ordering::Relaxed),
243 events_succeeded: self.stats.events_succeeded.load(Ordering::Relaxed),
244 events_failed: self.stats.events_failed.load(Ordering::Relaxed),
245 bytes_sent: self.stats.bytes_sent.load(Ordering::Relaxed),
246 total_latency_ms: self.stats.total_latency_ms.load(Ordering::Relaxed),
247 ..Default::default()
248 }
249 }
250
251 fn name(&self) -> &str {
252 "mock"
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use crate::models::domains::IdentityDomain;
260 use crate::models::enums::SourceType;
261
262 fn test_event() -> UdmEvent {
263 UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
264 source_id: Some("test-robot".to_string()),
265 ..Default::default()
266 })
267 }
268
269 #[tokio::test]
270 async fn test_mock_send() {
271 let transport = MockTransport::new();
272 let event = test_event();
273
274 let result = transport.send(&event).await.unwrap();
275 assert!(result.success);
276 assert_eq!(transport.event_count(), 1);
277 }
278
279 #[tokio::test]
280 async fn test_mock_batch() {
281 let transport = MockTransport::new();
282 let events = vec![test_event(), test_event(), test_event()];
283
284 let result = transport.send_batch(&events).await.unwrap();
285 assert!(result.all_succeeded());
286 assert_eq!(transport.event_count(), 3);
287 }
288
289 #[tokio::test]
290 async fn test_mock_failure() {
291 let transport = MockTransport::with_config(MockConfig {
292 failure_rate: 1.0, failure_status: 503,
294 failure_message: "Service unavailable".to_string(),
295 ..Default::default()
296 });
297
298 let result = transport.send(&test_event()).await.unwrap();
299 assert!(!result.success);
300 assert_eq!(result.status_code, Some(503));
301 }
302
303 #[tokio::test]
304 async fn test_mock_stats() {
305 let transport = MockTransport::new();
306
307 transport.send(&test_event()).await.unwrap();
308 transport.send(&test_event()).await.unwrap();
309
310 let stats = transport.stats();
311 assert_eq!(stats.events_sent, 2);
312 assert_eq!(stats.events_succeeded, 2);
313 }
314
315 #[tokio::test]
316 async fn test_mock_get_events() {
317 let transport = MockTransport::new();
318 let event = test_event();
319
320 transport.send(&event).await.unwrap();
321
322 let sent = transport.sent_events();
323 assert_eq!(sent.len(), 1);
324 assert_eq!(
325 sent[0].identity.as_ref().unwrap().source_id,
326 Some("test-robot".to_string())
327 );
328 }
329}