phytrace_sdk/transport/
http.rs

1//! HTTP transport implementation using reqwest.
2
3use async_trait::async_trait;
4use reqwest::Client;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::Arc;
7use std::time::Instant;
8
9use super::traits::{BatchResult, SendResult, Transport, TransportStats};
10use crate::core::config::PhyTraceConfig;
11use crate::error::{PhyTraceError, PhyTraceResult};
12use crate::models::event::UdmEvent;
13
14/// HTTP transport implementation.
15#[derive(Debug)]
16pub struct HttpTransport {
17    client: Client,
18    endpoint: String,
19    api_key: Option<String>,
20    connected: AtomicBool,
21    stats: Arc<HttpStats>,
22}
23
24#[derive(Debug, Default)]
25struct HttpStats {
26    events_sent: AtomicU64,
27    events_succeeded: AtomicU64,
28    events_failed: AtomicU64,
29    bytes_sent: AtomicU64,
30    total_latency_ms: AtomicU64,
31    retries: AtomicU64,
32    connections: AtomicU64,
33    connection_errors: AtomicU64,
34}
35
36impl HttpTransport {
37    /// Create a new HTTP transport from configuration.
38    pub fn from_config(config: &PhyTraceConfig) -> PhyTraceResult<Self> {
39        let client = Client::builder()
40            .connect_timeout(config.connect_timeout())
41            .timeout(config.request_timeout())
42            .gzip(true)
43            .build()
44            .map_err(|e| {
45                PhyTraceError::Transport(format!("Failed to create HTTP client: {}", e))
46            })?;
47
48        Ok(Self {
49            client,
50            endpoint: config.transport.endpoint.clone(),
51            api_key: config.transport.api_key.clone(),
52            connected: AtomicBool::new(false),
53            stats: Arc::new(HttpStats::default()),
54        })
55    }
56
57    /// Create a new HTTP transport with explicit parameters.
58    pub fn new(endpoint: impl Into<String>, api_key: Option<String>) -> PhyTraceResult<Self> {
59        let client = Client::builder().gzip(true).build().map_err(|e| {
60            PhyTraceError::Transport(format!("Failed to create HTTP client: {}", e))
61        })?;
62
63        Ok(Self {
64            client,
65            endpoint: endpoint.into(),
66            api_key,
67            connected: AtomicBool::new(false),
68            stats: Arc::new(HttpStats::default()),
69        })
70    }
71
72    /// Get the events endpoint URL.
73    fn events_url(&self) -> String {
74        format!("{}/api/v1/events", self.endpoint.trim_end_matches('/'))
75    }
76
77    /// Get the batch endpoint URL.
78    fn batch_url(&self) -> String {
79        format!(
80            "{}/api/v1/events/batch",
81            self.endpoint.trim_end_matches('/')
82        )
83    }
84
85    /// Get the health endpoint URL.
86    fn health_url(&self) -> String {
87        format!("{}/health", self.endpoint.trim_end_matches('/'))
88    }
89
90    /// Add authentication headers to request.
91    fn add_auth(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
92        if let Some(ref api_key) = self.api_key {
93            builder.header("Authorization", format!("Bearer {}", api_key))
94        } else {
95            builder
96        }
97    }
98}
99
100#[async_trait]
101impl Transport for HttpTransport {
102    async fn send(&self, event: &UdmEvent) -> PhyTraceResult<SendResult> {
103        self.stats.events_sent.fetch_add(1, Ordering::Relaxed);
104
105        let start = Instant::now();
106        let json = serde_json::to_string(event)
107            .map_err(|e| PhyTraceError::Serialization(e.to_string()))?;
108
109        let bytes_len = json.len() as u64;
110
111        let request = self.add_auth(
112            self.client
113                .post(self.events_url())
114                .header("Content-Type", "application/json")
115                .body(json),
116        );
117
118        let response = request.send().await.map_err(|e| {
119            self.stats.events_failed.fetch_add(1, Ordering::Relaxed);
120            PhyTraceError::Transport(format!("HTTP request failed: {}", e))
121        })?;
122
123        let latency_ms = start.elapsed().as_millis() as u64;
124        let status = response.status();
125
126        if status.is_success() {
127            self.stats.events_succeeded.fetch_add(1, Ordering::Relaxed);
128            self.stats
129                .bytes_sent
130                .fetch_add(bytes_len, Ordering::Relaxed);
131            self.stats
132                .total_latency_ms
133                .fetch_add(latency_ms, Ordering::Relaxed);
134
135            Ok(SendResult::success(latency_ms))
136        } else {
137            self.stats.events_failed.fetch_add(1, Ordering::Relaxed);
138
139            let message = response.text().await.unwrap_or_else(|_| status.to_string());
140            Ok(SendResult::failure(Some(status.as_u16()), message))
141        }
142    }
143
144    async fn send_batch(&self, events: &[UdmEvent]) -> PhyTraceResult<BatchResult> {
145        if events.is_empty() {
146            return Ok(BatchResult::all_success(0, 0));
147        }
148
149        let total = events.len();
150        self.stats
151            .events_sent
152            .fetch_add(total as u64, Ordering::Relaxed);
153
154        let start = Instant::now();
155        // Wrap events in the expected format: {"events": [...]}
156        let batch_request = serde_json::json!({ "events": events });
157        let json = serde_json::to_string(&batch_request)
158            .map_err(|e| PhyTraceError::Serialization(e.to_string()))?;
159
160        let bytes_len = json.len() as u64;
161
162        let request = self.add_auth(
163            self.client
164                .post(self.batch_url())
165                .header("Content-Type", "application/json")
166                .body(json),
167        );
168
169        let response = request.send().await.map_err(|e| {
170            self.stats
171                .events_failed
172                .fetch_add(total as u64, Ordering::Relaxed);
173            PhyTraceError::Transport(format!("HTTP batch request failed: {}", e))
174        })?;
175
176        let latency_ms = start.elapsed().as_millis() as u64;
177        let status = response.status();
178
179        if status.is_success() {
180            self.stats
181                .events_succeeded
182                .fetch_add(total as u64, Ordering::Relaxed);
183            self.stats
184                .bytes_sent
185                .fetch_add(bytes_len, Ordering::Relaxed);
186            self.stats
187                .total_latency_ms
188                .fetch_add(latency_ms, Ordering::Relaxed);
189
190            Ok(BatchResult::all_success(total, latency_ms))
191        } else {
192            self.stats
193                .events_failed
194                .fetch_add(total as u64, Ordering::Relaxed);
195
196            let message = response.text().await.unwrap_or_else(|_| status.to_string());
197            Ok(BatchResult::all_failed(total, &message))
198        }
199    }
200
201    async fn is_connected(&self) -> bool {
202        self.connected.load(Ordering::Relaxed)
203    }
204
205    async fn connect(&mut self) -> PhyTraceResult<()> {
206        self.stats.connections.fetch_add(1, Ordering::Relaxed);
207
208        // Try to reach the health endpoint
209        let response = self
210            .client
211            .get(self.health_url())
212            .send()
213            .await
214            .map_err(|e| {
215                self.stats.connection_errors.fetch_add(1, Ordering::Relaxed);
216                PhyTraceError::Transport(format!("Failed to connect: {}", e))
217            })?;
218
219        if response.status().is_success() {
220            self.connected.store(true, Ordering::Relaxed);
221            Ok(())
222        } else {
223            self.stats.connection_errors.fetch_add(1, Ordering::Relaxed);
224            Err(PhyTraceError::Transport(format!(
225                "Health check failed with status: {}",
226                response.status()
227            )))
228        }
229    }
230
231    async fn disconnect(&mut self) -> PhyTraceResult<()> {
232        self.connected.store(false, Ordering::Relaxed);
233        Ok(())
234    }
235
236    fn stats(&self) -> TransportStats {
237        TransportStats {
238            events_sent: self.stats.events_sent.load(Ordering::Relaxed),
239            events_succeeded: self.stats.events_succeeded.load(Ordering::Relaxed),
240            events_failed: self.stats.events_failed.load(Ordering::Relaxed),
241            bytes_sent: self.stats.bytes_sent.load(Ordering::Relaxed),
242            total_latency_ms: self.stats.total_latency_ms.load(Ordering::Relaxed),
243            retries: self.stats.retries.load(Ordering::Relaxed),
244            connections: self.stats.connections.load(Ordering::Relaxed),
245            connection_errors: self.stats.connection_errors.load(Ordering::Relaxed),
246        }
247    }
248
249    fn name(&self) -> &str {
250        "http"
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use crate::models::domains::IdentityDomain;
258    use crate::models::enums::SourceType;
259
260    #[allow(dead_code)]
261    fn test_event() -> UdmEvent {
262        UdmEvent::new(SourceType::Amr).with_identity(IdentityDomain {
263            source_id: Some("test-robot".to_string()),
264            ..Default::default()
265        })
266    }
267
268    #[test]
269    fn test_url_construction() {
270        let transport = HttpTransport::new("https://api.example.com", None).unwrap();
271        assert_eq!(
272            transport.events_url(),
273            "https://api.example.com/api/v1/events"
274        );
275        assert_eq!(
276            transport.batch_url(),
277            "https://api.example.com/api/v1/events/batch"
278        );
279        assert_eq!(transport.health_url(), "https://api.example.com/health");
280    }
281
282    #[test]
283    fn test_url_with_trailing_slash() {
284        let transport = HttpTransport::new("https://api.example.com/", None).unwrap();
285        assert_eq!(
286            transport.events_url(),
287            "https://api.example.com/api/v1/events"
288        );
289    }
290
291    #[test]
292    fn test_stats_initial() {
293        let transport = HttpTransport::new("https://api.example.com", None).unwrap();
294        let stats = transport.stats();
295        assert_eq!(stats.events_sent, 0);
296        assert_eq!(stats.events_succeeded, 0);
297    }
298}