1use async_trait::async_trait;
4use reqwest::Client;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use super::traits::{BatchResult, SendResult, Transport, TransportStats};
10use crate::core::config::PhyTraceConfig;
11use crate::error::{PhyTraceError, PhyTraceResult};
12use crate::models::event::UdmEvent;
13
14#[derive(Debug)]
16pub struct HttpTransport {
17 client: Client,
18 endpoint: String,
19 api_key: Option<String>,
20 connected: AtomicBool,
21 stats: Arc<HttpStats>,
22}
23
24#[derive(Debug, Default)]
25struct HttpStats {
26 events_sent: AtomicU64,
27 events_succeeded: AtomicU64,
28 events_failed: AtomicU64,
29 bytes_sent: AtomicU64,
30 total_latency_ms: AtomicU64,
31 retries: AtomicU64,
32 connections: AtomicU64,
33 connection_errors: AtomicU64,
34}
35
36impl HttpTransport {
37 pub fn from_config(config: &PhyTraceConfig) -> PhyTraceResult<Self> {
39 let client = Client::builder()
40 .connect_timeout(config.connect_timeout())
41 .timeout(config.request_timeout())
42 .gzip(true)
43 .build()
44 .map_err(|e| {
45 PhyTraceError::Transport(format!("Failed to create HTTP client: {}", e))
46 })?;
47
48 Ok(Self {
49 client,
50 endpoint: config.transport.endpoint.clone(),
51 api_key: config.transport.api_key.clone(),
52 connected: AtomicBool::new(false),
53 stats: Arc::new(HttpStats::default()),
54 })
55 }
56
57 pub fn new(endpoint: impl Into<String>, api_key: Option<String>) -> PhyTraceResult<Self> {
59 let client = Client::builder().gzip(true).build().map_err(|e| {
60 PhyTraceError::Transport(format!("Failed to create HTTP client: {}", e))
61 })?;
62
63 Ok(Self {
64 client,
65 endpoint: endpoint.into(),
66 api_key,
67 connected: AtomicBool::new(false),
68 stats: Arc::new(HttpStats::default()),
69 })
70 }
71
72 fn events_url(&self) -> String {
74 format!("{}/api/v1/events", self.endpoint.trim_end_matches('/'))
75 }
76
77 fn batch_url(&self) -> String {
79 format!(
80 "{}/api/v1/events/batch",
81 self.endpoint.trim_end_matches('/')
82 )
83 }
84
85 fn health_url(&self) -> String {
87 format!("{}/health", self.endpoint.trim_end_matches('/'))
88 }
89
90 fn add_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
92 if let Some(ref api_key) = self.api_key {
93 builder.header("Authorization", format!("Bearer {}", api_key))
94 } else {
95 builder
96 }
97 }
98}
99
100#[async_trait]
101impl Transport for HttpTransport {
102 async fn send(&self, event: &UdmEvent) -> PhyTraceResult<SendResult> {
103 self.stats.events_sent.fetch_add(1, Ordering::Relaxed);
104
105 let start = Instant::now();
106 let json = serde_json::to_string(event)
107 .map_err(|e| PhyTraceError::Serialization(e.to_string()))?;
108
109 let bytes_len = json.len() as u64;
110
111 let request = self.add_auth(
112 self.client
113 .post(self.events_url())
114 .header("Content-Type", "application/json")
115 .body(json),
116 );
117
118 let response = request.send().await.map_err(|e| {
119 self.stats.events_failed.fetch_add(1, Ordering::Relaxed);
120 PhyTraceError::Transport(format!("HTTP request failed: {}", e))
121 })?;
122
123 let latency_ms = start.elapsed().as_millis() as u64;
124 let status = response.status();
125
126 if status.is_success() {
127 self.stats.events_succeeded.fetch_add(1, Ordering::Relaxed);
128 self.stats
129 .bytes_sent
130 .fetch_add(bytes_len, Ordering::Relaxed);
131 self.stats
132 .total_latency_ms
133 .fetch_add(latency_ms, Ordering::Relaxed);
134
135 Ok(SendResult::success(latency_ms))
136 } else {
137 self.stats.events_failed.fetch_add(1, Ordering::Relaxed);
138
139 let message = response.text().await.unwrap_or_else(|_| status.to_string());
140 Ok(SendResult::failure(Some(status.as_u16()), message))
141 }
142 }
143
144 async fn send_batch(&self, events: &[UdmEvent]) -> PhyTraceResult<BatchResult> {
145 if events.is_empty() {
146 return Ok(BatchResult::all_success(0, 0));
147 }
148
149 let total = events.len();
150 self.stats
151 .events_sent
152 .fetch_add(total as u64, Ordering::Relaxed);
153
154 let start = Instant::now();
155 let batch_request = serde_json::json!({ "events": events });
157 let json = serde_json::to_string(&batch_request)
158 .map_err(|e| PhyTraceError::Serialization(e.to_string()))?;
159
160 let bytes_len = json.len() as u64;
161
162 let request = self.add_auth(
163 self.client
164 .post(self.batch_url())
165 .header("Content-Type", "application/json")
166 .body(json),
167 );
168
169 let response = request.send().await.map_err(|e| {
170 self.stats
171 .events_failed
172 .fetch_add(total as u64, Ordering::Relaxed);
173 PhyTraceError::Transport(format!("HTTP batch request failed: {}", e))
174 })?;
175
176 let latency_ms = start.elapsed().as_millis() as u64;
177 let status = response.status();
178
179 if status.is_success() {
180 self.stats
181 .events_succeeded
182 .fetch_add(total as u64, Ordering::Relaxed);
183 self.stats
184 .bytes_sent
185 .fetch_add(bytes_len, Ordering::Relaxed);
186 self.stats
187 .total_latency_ms
188 .fetch_add(latency_ms, Ordering::Relaxed);
189
190 Ok(BatchResult::all_success(total, latency_ms))
191 } else {
192 self.stats
193 .events_failed
194 .fetch_add(total as u64, Ordering::Relaxed);
195
196 let message = response.text().await.unwrap_or_else(|_| status.to_string());
197 Ok(BatchResult::all_failed(total, &message))
198 }
199 }
200
201 async fn is_connected(&self) -> bool {
202 self.connected.load(Ordering::Relaxed)
203 }
204
205 async fn connect(&mut self) -> PhyTraceResult<()> {
206 self.stats.connections.fetch_add(1, Ordering::Relaxed);
207
208 let response = self
210 .client
211 .get(self.health_url())
212 .send()
213 .await
214 .map_err(|e| {
215 self.stats.connection_errors.fetch_add(1, Ordering::Relaxed);
216 PhyTraceError::Transport(format!("Failed to connect: {}", e))
217 })?;
218
219 if response.status().is_success() {
220 self.connected.store(true, Ordering::Relaxed);
221 Ok(())
222 } else {
223 self.stats.connection_errors.fetch_add(1, Ordering::Relaxed);
224 Err(PhyTraceError::Transport(format!(
225 "Health check failed with status: {}",
226 response.status()
227 )))
228 }
229 }
230
231 async fn disconnect(&mut self) -> PhyTraceResult<()> {
232 self.connected.store(false, Ordering::Relaxed);
233 Ok(())
234 }
235
236 fn stats(&self) -> TransportStats {
237 TransportStats {
238 events_sent: self.stats.events_sent.load(Ordering::Relaxed),
239 events_succeeded: self.stats.events_succeeded.load(Ordering::Relaxed),
240 events_failed: self.stats.events_failed.load(Ordering::Relaxed),
241 bytes_sent: self.stats.bytes_sent.load(Ordering::Relaxed),
242 total_latency_ms: self.stats.total_latency_ms.load(Ordering::Relaxed),
243 retries: self.stats.retries.load(Ordering::Relaxed),
244 connections: self.stats.connections.load(Ordering::Relaxed),
245 connection_errors: self.stats.connection_errors.load(Ordering::Relaxed),
246 }
247 }
248
249 fn name(&self) -> &str {
250 "http"
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::models::domains::IdentityDomain;
258 use crate::models::enums::SourceType;
259
260 #[allow(dead_code)]
261 fn test_event() -> UdmEvent {
262 UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
263 source_id: Some("test-robot".to_string()),
264 ..Default::default()
265 })
266 }
267
268 #[test]
269 fn test_url_construction() {
270 let transport = HttpTransport::new("https://api.example.com", None).unwrap();
271 assert_eq!(
272 transport.events_url(),
273 "https://api.example.com/api/v1/events"
274 );
275 assert_eq!(
276 transport.batch_url(),
277 "https://api.example.com/api/v1/events/batch"
278 );
279 assert_eq!(transport.health_url(), "https://api.example.com/health");
280 }
281
282 #[test]
283 fn test_url_with_trailing_slash() {
284 let transport = HttpTransport::new("https://api.example.com/", None).unwrap();
285 assert_eq!(
286 transport.events_url(),
287 "https://api.example.com/api/v1/events"
288 );
289 }
290
291 #[test]
292 fn test_stats_initial() {
293 let transport = HttpTransport::new("https://api.example.com", None).unwrap();
294 let stats = transport.stats();
295 assert_eq!(stats.events_sent, 0);
296 assert_eq!(stats.events_succeeded, 0);
297 }
298}