phytrace_sdk/reliability/
retry.rs

1//! Retry logic with exponential backoff and jitter.
2
3use rand::Rng;
4use std::time::Duration;
5
6use crate::core::config::RetryConfig;
7use crate::error::{PhyTraceError, PhyTraceResult};
8use crate::transport::traits::SendResult;
9
10/// Retry handler with exponential backoff.
11#[derive(Debug, Clone)]
12pub struct RetryHandler {
13    config: RetryConfig,
14}
15
16impl RetryHandler {
17    /// Create a new retry handler from configuration.
18    pub fn new(config: &RetryConfig) -> Self {
19        Self {
20            config: config.clone(),
21        }
22    }
23
24    /// Create with default configuration.
25    pub fn default_handler() -> Self {
26        Self::new(&RetryConfig::default())
27    }
28
29    /// Execute an async operation with retries.
30    pub async fn execute<F, Fut, T>(&self, mut operation: F) -> PhyTraceResult<T>
31    where
32        F: FnMut() -> Fut,
33        Fut: std::future::Future<Output = PhyTraceResult<T>>,
34    {
35        let mut attempt = 0;
36        let mut last_error = None;
37
38        while attempt <= self.config.max_retries {
39            match operation().await {
40                Ok(result) => return Ok(result),
41                Err(e) => {
42                    if !self.is_retryable(&e) {
43                        return Err(e);
44                    }
45
46                    last_error = Some(e);
47                    attempt += 1;
48
49                    if attempt <= self.config.max_retries {
50                        let delay = self.calculate_delay(attempt);
51                        tokio::time::sleep(delay).await;
52                    }
53                }
54            }
55        }
56
57        Err(last_error
58            .unwrap_or_else(|| PhyTraceError::Transport("Max retries exceeded".to_string())))
59    }
60
61    /// Execute with retries and return attempt count.
62    pub async fn execute_with_stats<F, Fut, T>(
63        &self,
64        mut operation: F,
65    ) -> (PhyTraceResult<T>, RetryStats)
66    where
67        F: FnMut() -> Fut,
68        Fut: std::future::Future<Output = PhyTraceResult<T>>,
69    {
70        let mut stats = RetryStats::default();
71        let mut last_error = None;
72
73        while stats.attempts <= self.config.max_retries {
74            stats.attempts += 1;
75
76            match operation().await {
77                Ok(result) => {
78                    stats.succeeded = true;
79                    return (Ok(result), stats);
80                }
81                Err(e) => {
82                    if !self.is_retryable(&e) {
83                        return (Err(e), stats);
84                    }
85
86                    last_error = Some(e);
87                    stats.retries += 1;
88
89                    if stats.attempts <= self.config.max_retries {
90                        let delay = self.calculate_delay(stats.attempts);
91                        stats.total_delay_ms += delay.as_millis() as u64;
92                        tokio::time::sleep(delay).await;
93                    }
94                }
95            }
96        }
97
98        (
99            Err(last_error
100                .unwrap_or_else(|| PhyTraceError::Transport("Max retries exceeded".to_string()))),
101            stats,
102        )
103    }
104
105    /// Check if a send result indicates a retryable failure.
106    pub fn should_retry(&self, result: &SendResult) -> bool {
107        !result.success && result.is_retryable()
108    }
109
110    /// Calculate delay for a given attempt number.
111    pub fn calculate_delay(&self, attempt: u32) -> Duration {
112        let base_delay = self.config.initial_backoff_ms as f64;
113        let multiplier = self.config.backoff_multiplier;
114
115        // Exponential backoff
116        let delay = base_delay * multiplier.powi(attempt.saturating_sub(1) as i32);
117
118        // Cap at max backoff
119        let delay = delay.min(self.config.max_backoff_ms as f64);
120
121        // Add jitter if enabled
122        let delay = if self.config.jitter {
123            let jitter = rand::thread_rng().gen_range(0.0..=0.3);
124            delay * (1.0 + jitter)
125        } else {
126            delay
127        };
128
129        Duration::from_millis(delay as u64)
130    }
131
132    /// Check if an error is retryable.
133    fn is_retryable(&self, error: &PhyTraceError) -> bool {
134        matches!(
135            error,
136            PhyTraceError::Transport(_) | PhyTraceError::Timeout(_)
137        )
138    }
139
140    /// Get the maximum number of retries.
141    pub fn max_retries(&self) -> u32 {
142        self.config.max_retries
143    }
144
145    /// Get the initial backoff duration.
146    pub fn initial_backoff(&self) -> Duration {
147        Duration::from_millis(self.config.initial_backoff_ms)
148    }
149}
150
151/// Statistics from a retry operation.
152#[derive(Debug, Clone, Default)]
153pub struct RetryStats {
154    /// Number of attempts made.
155    pub attempts: u32,
156    /// Number of retries (attempts - 1 if successful on first try).
157    pub retries: u32,
158    /// Total delay time in milliseconds.
159    pub total_delay_ms: u64,
160    /// Whether the operation eventually succeeded.
161    pub succeeded: bool,
162}
163
164impl RetryStats {
165    /// Check if any retries were needed.
166    pub fn had_retries(&self) -> bool {
167        self.retries > 0
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use std::sync::atomic::{AtomicU32, Ordering};
175
176    #[test]
177    fn test_calculate_delay() {
178        let config = RetryConfig {
179            initial_backoff_ms: 100,
180            max_backoff_ms: 10000,
181            backoff_multiplier: 2.0,
182            jitter: false,
183            max_retries: 5,
184        };
185        let handler = RetryHandler::new(&config);
186
187        // Without jitter, delays should be predictable
188        assert_eq!(handler.calculate_delay(1), Duration::from_millis(100));
189        assert_eq!(handler.calculate_delay(2), Duration::from_millis(200));
190        assert_eq!(handler.calculate_delay(3), Duration::from_millis(400));
191        assert_eq!(handler.calculate_delay(4), Duration::from_millis(800));
192    }
193
194    #[test]
195    fn test_delay_capped() {
196        let config = RetryConfig {
197            initial_backoff_ms: 1000,
198            max_backoff_ms: 5000,
199            backoff_multiplier: 2.0,
200            jitter: false,
201            max_retries: 10,
202        };
203        let handler = RetryHandler::new(&config);
204
205        // Should be capped at max
206        assert_eq!(handler.calculate_delay(5), Duration::from_millis(5000));
207        assert_eq!(handler.calculate_delay(6), Duration::from_millis(5000));
208    }
209
210    #[tokio::test]
211    async fn test_execute_success_first_try() {
212        let handler = RetryHandler::default_handler();
213
214        let result = handler
215            .execute(|| async { Ok::<_, PhyTraceError>(42) })
216            .await;
217
218        assert_eq!(result.unwrap(), 42);
219    }
220
221    #[tokio::test]
222    async fn test_execute_success_after_retry() {
223        let handler = RetryHandler::new(&RetryConfig {
224            max_retries: 3,
225            initial_backoff_ms: 1, // Minimal delay for testing
226            ..Default::default()
227        });
228
229        let counter = AtomicU32::new(0);
230
231        let result = handler
232            .execute(|| async {
233                let count = counter.fetch_add(1, Ordering::Relaxed);
234                if count < 2 {
235                    Err(PhyTraceError::Transport("temporary failure".to_string()))
236                } else {
237                    Ok(42)
238                }
239            })
240            .await;
241
242        assert_eq!(result.unwrap(), 42);
243        assert_eq!(counter.load(Ordering::Relaxed), 3);
244    }
245
246    #[tokio::test]
247    async fn test_execute_all_retries_fail() {
248        let handler = RetryHandler::new(&RetryConfig {
249            max_retries: 2,
250            initial_backoff_ms: 1,
251            ..Default::default()
252        });
253
254        let counter = AtomicU32::new(0);
255
256        let result = handler
257            .execute(|| async {
258                counter.fetch_add(1, Ordering::Relaxed);
259                Err::<i32, _>(PhyTraceError::Transport("persistent failure".to_string()))
260            })
261            .await;
262
263        assert!(result.is_err());
264        assert_eq!(counter.load(Ordering::Relaxed), 3); // Initial + 2 retries
265    }
266
267    #[tokio::test]
268    async fn test_execute_with_stats() {
269        let handler = RetryHandler::new(&RetryConfig {
270            max_retries: 3,
271            initial_backoff_ms: 1,
272            ..Default::default()
273        });
274
275        let counter = AtomicU32::new(0);
276
277        let (result, stats) = handler
278            .execute_with_stats(|| async {
279                let count = counter.fetch_add(1, Ordering::Relaxed);
280                if count < 2 {
281                    Err(PhyTraceError::Transport("temporary".to_string()))
282                } else {
283                    Ok(42)
284                }
285            })
286            .await;
287
288        assert!(result.is_ok());
289        assert_eq!(stats.attempts, 3);
290        assert_eq!(stats.retries, 2);
291        assert!(stats.succeeded);
292    }
293
294    #[test]
295    fn test_should_retry() {
296        let handler = RetryHandler::default_handler();
297
298        let retryable = SendResult::failure(Some(500), "Server error");
299        assert!(handler.should_retry(&retryable));
300
301        let not_retryable = SendResult::failure(Some(400), "Bad request");
302        assert!(!handler.should_retry(&not_retryable));
303
304        let success = SendResult::success(50);
305        assert!(!handler.should_retry(&success));
306    }
307}