phytrace_sdk/reliability/
retry.rs1use rand::Rng;
4use std::time::Duration;
5
6use crate::core::config::RetryConfig;
7use crate::error::{PhyTraceError, PhyTraceResult};
8use crate::transport::traits::SendResult;
9
10#[derive(Debug, Clone)]
12pub struct RetryHandler {
13 config: RetryConfig,
14}
15
16impl RetryHandler {
17 pub fn new(config: &RetryConfig) -> Self {
19 Self {
20 config: config.clone(),
21 }
22 }
23
24 pub fn default_handler() -> Self {
26 Self::new(&RetryConfig::default())
27 }
28
29 pub async fn execute<F, Fut, T>(&self, mut operation: F) -> PhyTraceResult<T>
31 where
32 F: FnMut() -> Fut,
33 Fut: std::future::Future<Output = PhyTraceResult<T>>,
34 {
35 let mut attempt = 0;
36 let mut last_error = None;
37
38 while attempt <= self.config.max_retries {
39 match operation().await {
40 Ok(result) => return Ok(result),
41 Err(e) => {
42 if !self.is_retryable(&e) {
43 return Err(e);
44 }
45
46 last_error = Some(e);
47 attempt += 1;
48
49 if attempt <= self.config.max_retries {
50 let delay = self.calculate_delay(attempt);
51 tokio::time::sleep(delay).await;
52 }
53 }
54 }
55 }
56
57 Err(last_error
58 .unwrap_or_else(|| PhyTraceError::Transport("Max retries exceeded".to_string())))
59 }
60
61 pub async fn execute_with_stats<F, Fut, T>(
63 &self,
64 mut operation: F,
65 ) -> (PhyTraceResult<T>, RetryStats)
66 where
67 F: FnMut() -> Fut,
68 Fut: std::future::Future<Output = PhyTraceResult<T>>,
69 {
70 let mut stats = RetryStats::default();
71 let mut last_error = None;
72
73 while stats.attempts <= self.config.max_retries {
74 stats.attempts += 1;
75
76 match operation().await {
77 Ok(result) => {
78 stats.succeeded = true;
79 return (Ok(result), stats);
80 }
81 Err(e) => {
82 if !self.is_retryable(&e) {
83 return (Err(e), stats);
84 }
85
86 last_error = Some(e);
87 stats.retries += 1;
88
89 if stats.attempts <= self.config.max_retries {
90 let delay = self.calculate_delay(stats.attempts);
91 stats.total_delay_ms += delay.as_millis() as u64;
92 tokio::time::sleep(delay).await;
93 }
94 }
95 }
96 }
97
98 (
99 Err(last_error
100 .unwrap_or_else(|| PhyTraceError::Transport("Max retries exceeded".to_string()))),
101 stats,
102 )
103 }
104
105 pub fn should_retry(&self, result: &SendResult) -> bool {
107 !result.success && result.is_retryable()
108 }
109
110 pub fn calculate_delay(&self, attempt: u32) -> Duration {
112 let base_delay = self.config.initial_backoff_ms as f64;
113 let multiplier = self.config.backoff_multiplier;
114
115 let delay = base_delay * multiplier.powi(attempt.saturating_sub(1) as i32);
117
118 let delay = delay.min(self.config.max_backoff_ms as f64);
120
121 let delay = if self.config.jitter {
123 let jitter = rand::thread_rng().gen_range(0.0..=0.3);
124 delay * (1.0 + jitter)
125 } else {
126 delay
127 };
128
129 Duration::from_millis(delay as u64)
130 }
131
132 fn is_retryable(&self, error: &PhyTraceError) -> bool {
134 matches!(
135 error,
136 PhyTraceError::Transport(_) | PhyTraceError::Timeout(_)
137 )
138 }
139
140 pub fn max_retries(&self) -> u32 {
142 self.config.max_retries
143 }
144
145 pub fn initial_backoff(&self) -> Duration {
147 Duration::from_millis(self.config.initial_backoff_ms)
148 }
149}
150
151#[derive(Debug, Clone, Default)]
153pub struct RetryStats {
154 pub attempts: u32,
156 pub retries: u32,
158 pub total_delay_ms: u64,
160 pub succeeded: bool,
162}
163
164impl RetryStats {
165 pub fn had_retries(&self) -> bool {
167 self.retries > 0
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use std::sync::atomic::{AtomicU32, Ordering};
175
176 #[test]
177 fn test_calculate_delay() {
178 let config = RetryConfig {
179 initial_backoff_ms: 100,
180 max_backoff_ms: 10000,
181 backoff_multiplier: 2.0,
182 jitter: false,
183 max_retries: 5,
184 };
185 let handler = RetryHandler::new(&config);
186
187 assert_eq!(handler.calculate_delay(1), Duration::from_millis(100));
189 assert_eq!(handler.calculate_delay(2), Duration::from_millis(200));
190 assert_eq!(handler.calculate_delay(3), Duration::from_millis(400));
191 assert_eq!(handler.calculate_delay(4), Duration::from_millis(800));
192 }
193
194 #[test]
195 fn test_delay_capped() {
196 let config = RetryConfig {
197 initial_backoff_ms: 1000,
198 max_backoff_ms: 5000,
199 backoff_multiplier: 2.0,
200 jitter: false,
201 max_retries: 10,
202 };
203 let handler = RetryHandler::new(&config);
204
205 assert_eq!(handler.calculate_delay(5), Duration::from_millis(5000));
207 assert_eq!(handler.calculate_delay(6), Duration::from_millis(5000));
208 }
209
210 #[tokio::test]
211 async fn test_execute_success_first_try() {
212 let handler = RetryHandler::default_handler();
213
214 let result = handler
215 .execute(|| async { Ok::<_, PhyTraceError>(42) })
216 .await;
217
218 assert_eq!(result.unwrap(), 42);
219 }
220
221 #[tokio::test]
222 async fn test_execute_success_after_retry() {
223 let handler = RetryHandler::new(&RetryConfig {
224 max_retries: 3,
225 initial_backoff_ms: 1, ..Default::default()
227 });
228
229 let counter = AtomicU32::new(0);
230
231 let result = handler
232 .execute(|| async {
233 let count = counter.fetch_add(1, Ordering::Relaxed);
234 if count < 2 {
235 Err(PhyTraceError::Transport("temporary failure".to_string()))
236 } else {
237 Ok(42)
238 }
239 })
240 .await;
241
242 assert_eq!(result.unwrap(), 42);
243 assert_eq!(counter.load(Ordering::Relaxed), 3);
244 }
245
246 #[tokio::test]
247 async fn test_execute_all_retries_fail() {
248 let handler = RetryHandler::new(&RetryConfig {
249 max_retries: 2,
250 initial_backoff_ms: 1,
251 ..Default::default()
252 });
253
254 let counter = AtomicU32::new(0);
255
256 let result = handler
257 .execute(|| async {
258 counter.fetch_add(1, Ordering::Relaxed);
259 Err::<i32, _>(PhyTraceError::Transport("persistent failure".to_string()))
260 })
261 .await;
262
263 assert!(result.is_err());
264 assert_eq!(counter.load(Ordering::Relaxed), 3); }
266
267 #[tokio::test]
268 async fn test_execute_with_stats() {
269 let handler = RetryHandler::new(&RetryConfig {
270 max_retries: 3,
271 initial_backoff_ms: 1,
272 ..Default::default()
273 });
274
275 let counter = AtomicU32::new(0);
276
277 let (result, stats) = handler
278 .execute_with_stats(|| async {
279 let count = counter.fetch_add(1, Ordering::Relaxed);
280 if count < 2 {
281 Err(PhyTraceError::Transport("temporary".to_string()))
282 } else {
283 Ok(42)
284 }
285 })
286 .await;
287
288 assert!(result.is_ok());
289 assert_eq!(stats.attempts, 3);
290 assert_eq!(stats.retries, 2);
291 assert!(stats.succeeded);
292 }
293
294 #[test]
295 fn test_should_retry() {
296 let handler = RetryHandler::default_handler();
297
298 let retryable = SendResult::failure(Some(500), "Server error");
299 assert!(handler.should_retry(&retryable));
300
301 let not_retryable = SendResult::failure(Some(400), "Bad request");
302 assert!(!handler.should_retry(¬_retryable));
303
304 let success = SendResult::success(50);
305 assert!(!handler.should_retry(&success));
306 }
307}