phytrace_sdk/emitters/
periodic.rs

1//! Periodic event emitter.
2//!
3//! Emits events at regular intervals based on a callback that provides
4//! the current state.
5
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{mpsc, Mutex, RwLock};
9use tokio::task::JoinHandle;
10
11use crate::core::builder::EventBuilder;
12use crate::core::config::{EmitterConfig, PhyTraceConfig};
13use crate::error::PhyTraceResult;
14use crate::models::event::UdmEvent;
15
16/// Callback type for providing state updates.
17pub type StateCallback = Box<dyn Fn(&mut EventBuilder) + Send + Sync>;
18
19/// Periodic emitter that generates events at regular intervals.
20pub struct PeriodicEmitter {
21    config: EmitterConfig,
22    full_config: PhyTraceConfig,
23    callback: Arc<RwLock<Option<StateCallback>>>,
24    event_tx: mpsc::Sender<UdmEvent>,
25    handle: Mutex<Option<JoinHandle<()>>>,
26    running: Arc<std::sync::atomic::AtomicBool>,
27}
28
29impl PeriodicEmitter {
30    /// Create a new periodic emitter.
31    pub fn new(config: &PhyTraceConfig, event_tx: mpsc::Sender<UdmEvent>) -> Self {
32        Self {
33            config: config.emitter.clone(),
34            full_config: config.clone(),
35            callback: Arc::new(RwLock::new(None)),
36            event_tx,
37            handle: Mutex::new(None),
38            running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
39        }
40    }
41
42    /// Set the state callback.
43    ///
44    /// The callback is invoked before each emission to populate the event
45    /// with current state.
46    pub async fn set_callback(&self, callback: StateCallback) {
47        *self.callback.write().await = Some(callback);
48    }
49
50    /// Start the periodic emitter.
51    pub async fn start(&self) -> PhyTraceResult<()> {
52        use std::sync::atomic::Ordering;
53
54        if self.running.load(Ordering::Relaxed) {
55            return Ok(()); // Already running
56        }
57
58        self.running.store(true, Ordering::Relaxed);
59
60        let interval = Duration::from_millis(self.config.interval_ms);
61        let callback = self.callback.clone();
62        let config = self.full_config.clone();
63        let tx = self.event_tx.clone();
64        let running = self.running.clone();
65
66        let handle = tokio::spawn(async move {
67            let mut interval_timer = tokio::time::interval(interval);
68
69            while running.load(Ordering::Relaxed) {
70                interval_timer.tick().await;
71
72                if !running.load(Ordering::Relaxed) {
73                    break;
74                }
75
76                // Build the event
77                let mut builder = EventBuilder::new(&config)
78                    .event_type(crate::models::enums::EventType::TelemetryPeriodic);
79
80                // Apply callback if set
81                if let Some(ref cb) = *callback.read().await {
82                    cb(&mut builder);
83                }
84
85                // Build and send
86                match builder.build() {
87                    Ok(event) => {
88                        if tx.send(event).await.is_err() {
89                            // Channel closed, stop emitting
90                            break;
91                        }
92                    }
93                    Err(e) => {
94                        // Log error but continue
95                        eprintln!("Failed to build event: {}", e);
96                    }
97                }
98            }
99        });
100
101        *self.handle.lock().await = Some(handle);
102
103        Ok(())
104    }
105
106    /// Stop the periodic emitter.
107    pub async fn stop(&self) {
108        use std::sync::atomic::Ordering;
109
110        self.running.store(false, Ordering::Relaxed);
111
112        if let Some(handle) = self.handle.lock().await.take() {
113            handle.abort();
114        }
115    }
116
117    /// Check if the emitter is running.
118    pub fn is_running(&self) -> bool {
119        self.running.load(std::sync::atomic::Ordering::Relaxed)
120    }
121
122    /// Get the emission interval.
123    pub fn interval(&self) -> Duration {
124        Duration::from_millis(self.config.interval_ms)
125    }
126
127    /// Update the emission interval.
128    pub fn set_interval(&mut self, interval: Duration) {
129        self.config.interval_ms = interval.as_millis() as u64;
130    }
131}
132
133impl Drop for PeriodicEmitter {
134    fn drop(&mut self) {
135        self.running
136            .store(false, std::sync::atomic::Ordering::Relaxed);
137    }
138}
139
140/// Builder for creating emitters with custom configurations.
141pub struct EmitterBuilder {
142    config: PhyTraceConfig,
143    event_tx: Option<mpsc::Sender<UdmEvent>>,
144    callback: Option<StateCallback>,
145}
146
147impl EmitterBuilder {
148    /// Create a new emitter builder.
149    pub fn new(config: &PhyTraceConfig) -> Self {
150        Self {
151            config: config.clone(),
152            event_tx: None,
153            callback: None,
154        }
155    }
156
157    /// Set the event channel sender.
158    pub fn with_channel(mut self, tx: mpsc::Sender<UdmEvent>) -> Self {
159        self.event_tx = Some(tx);
160        self
161    }
162
163    /// Set the state callback.
164    pub fn with_callback<F>(mut self, callback: F) -> Self
165    where
166        F: Fn(&mut EventBuilder) + Send + Sync + 'static,
167    {
168        self.callback = Some(Box::new(callback));
169        self
170    }
171
172    /// Set the emission interval.
173    pub fn with_interval(mut self, interval: Duration) -> Self {
174        self.config.emitter.interval_ms = interval.as_millis() as u64;
175        self
176    }
177
178    /// Build the emitter.
179    ///
180    /// Returns the emitter and a receiver for events.
181    pub fn build(self) -> (PeriodicEmitter, mpsc::Receiver<UdmEvent>) {
182        let (tx, rx) = match self.event_tx {
183            Some(tx) => (tx, mpsc::channel(100).1), // Dummy receiver if tx provided
184            None => mpsc::channel(100),
185        };
186
187        let emitter = PeriodicEmitter::new(&self.config, tx);
188
189        (emitter, rx)
190    }
191
192    /// Build with an existing channel.
193    pub fn build_with_sender(self, tx: mpsc::Sender<UdmEvent>) -> PeriodicEmitter {
194        PeriodicEmitter::new(&self.config, tx)
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::models::enums::SourceType;
202
203    fn test_config() -> PhyTraceConfig {
204        let mut config = PhyTraceConfig::new("test-robot");
205        config.source.source_type = SourceType::Amr;
206        config.emitter.interval_ms = 50; // Fast for testing
207        config
208    }
209
210    #[tokio::test]
211    async fn test_emitter_creation() {
212        let config = test_config();
213        let (tx, _rx) = mpsc::channel(10);
214        let emitter = PeriodicEmitter::new(&config, tx);
215
216        assert!(!emitter.is_running());
217        assert_eq!(emitter.interval(), Duration::from_millis(50));
218    }
219
220    #[tokio::test]
221    async fn test_emitter_start_stop() {
222        let config = test_config();
223        let (tx, _rx) = mpsc::channel(10);
224        let emitter = PeriodicEmitter::new(&config, tx);
225
226        emitter.start().await.unwrap();
227        assert!(emitter.is_running());
228
229        emitter.stop().await;
230        // Give time for stop to propagate
231        tokio::time::sleep(Duration::from_millis(10)).await;
232        assert!(!emitter.is_running());
233    }
234
235    #[tokio::test]
236    async fn test_emitter_produces_events() {
237        let config = test_config();
238        let (tx, mut rx) = mpsc::channel(10);
239        let emitter = PeriodicEmitter::new(&config, tx);
240
241        emitter.start().await.unwrap();
242
243        // Wait for at least one event
244        let event = tokio::time::timeout(Duration::from_millis(200), rx.recv())
245            .await
246            .expect("Timeout waiting for event")
247            .expect("Channel closed");
248
249        emitter.stop().await;
250
251        assert_eq!(
252            event.identity.unwrap().source_id,
253            Some("test-robot".to_string())
254        );
255    }
256
257    #[tokio::test]
258    async fn test_emitter_with_callback() {
259        let config = test_config();
260        let (tx, mut rx) = mpsc::channel(10);
261        let emitter = PeriodicEmitter::new(&config, tx);
262
263        emitter
264            .set_callback(Box::new(|builder| {
265                *builder = builder.clone().position_2d(42.0, 24.0);
266            }))
267            .await;
268
269        emitter.start().await.unwrap();
270
271        let event = tokio::time::timeout(Duration::from_millis(200), rx.recv())
272            .await
273            .expect("Timeout")
274            .expect("Channel closed");
275
276        emitter.stop().await;
277
278        let local = event.location.unwrap().local.unwrap();
279        assert_eq!(local.x_m, Some(42.0));
280        assert_eq!(local.y_m, Some(24.0));
281    }
282
283    #[tokio::test]
284    async fn test_emitter_builder() {
285        let config = test_config();
286        let (emitter, _rx) = EmitterBuilder::new(&config)
287            .with_interval(Duration::from_millis(100))
288            .build();
289
290        assert_eq!(emitter.interval(), Duration::from_millis(100));
291    }
292}