phytrace_sdk/emitters/
periodic.rs1use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{mpsc, Mutex, RwLock};
9use tokio::task::JoinHandle;
10
11use crate::core::builder::EventBuilder;
12use crate::core::config::{EmitterConfig, PhyTraceConfig};
13use crate::error::PhyTraceResult;
14use crate::models::event::UdmEvent;
15
16pub type StateCallback = Box<dyn Fn(&mut EventBuilder) + Send + Sync>;
18
19pub struct PeriodicEmitter {
21 config: EmitterConfig,
22 full_config: PhyTraceConfig,
23 callback: Arc<RwLock<Option<StateCallback>>>,
24 event_tx: mpsc::Sender<UdmEvent>,
25 handle: Mutex<Option<JoinHandle<()>>>,
26 running: Arc<std::sync::atomic::AtomicBool>,
27}
28
29impl PeriodicEmitter {
30 pub fn new(config: &PhyTraceConfig, event_tx: mpsc::Sender<UdmEvent>) -> Self {
32 Self {
33 config: config.emitter.clone(),
34 full_config: config.clone(),
35 callback: Arc::new(RwLock::new(None)),
36 event_tx,
37 handle: Mutex::new(None),
38 running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
39 }
40 }
41
42 pub async fn set_callback(&self, callback: StateCallback) {
47 *self.callback.write().await = Some(callback);
48 }
49
50 pub async fn start(&self) -> PhyTraceResult<()> {
52 use std::sync::atomic::Ordering;
53
54 if self.running.load(Ordering::Relaxed) {
55 return Ok(()); }
57
58 self.running.store(true, Ordering::Relaxed);
59
60 let interval = Duration::from_millis(self.config.interval_ms);
61 let callback = self.callback.clone();
62 let config = self.full_config.clone();
63 let tx = self.event_tx.clone();
64 let running = self.running.clone();
65
66 let handle = tokio::spawn(async move {
67 let mut interval_timer = tokio::time::interval(interval);
68
69 while running.load(Ordering::Relaxed) {
70 interval_timer.tick().await;
71
72 if !running.load(Ordering::Relaxed) {
73 break;
74 }
75
76 let mut builder = EventBuilder::new(&config)
78 .event_type(crate::models::enums::EventType::TelemetryPeriodic);
79
80 if let Some(ref cb) = *callback.read().await {
82 cb(&mut builder);
83 }
84
85 match builder.build() {
87 Ok(event) => {
88 if tx.send(event).await.is_err() {
89 break;
91 }
92 }
93 Err(e) => {
94 eprintln!("Failed to build event: {}", e);
96 }
97 }
98 }
99 });
100
101 *self.handle.lock().await = Some(handle);
102
103 Ok(())
104 }
105
106 pub async fn stop(&self) {
108 use std::sync::atomic::Ordering;
109
110 self.running.store(false, Ordering::Relaxed);
111
112 if let Some(handle) = self.handle.lock().await.take() {
113 handle.abort();
114 }
115 }
116
117 pub fn is_running(&self) -> bool {
119 self.running.load(std::sync::atomic::Ordering::Relaxed)
120 }
121
122 pub fn interval(&self) -> Duration {
124 Duration::from_millis(self.config.interval_ms)
125 }
126
127 pub fn set_interval(&mut self, interval: Duration) {
129 self.config.interval_ms = interval.as_millis() as u64;
130 }
131}
132
133impl Drop for PeriodicEmitter {
134 fn drop(&mut self) {
135 self.running
136 .store(false, std::sync::atomic::Ordering::Relaxed);
137 }
138}
139
140pub struct EmitterBuilder {
142 config: PhyTraceConfig,
143 event_tx: Option<mpsc::Sender<UdmEvent>>,
144 callback: Option<StateCallback>,
145}
146
147impl EmitterBuilder {
148 pub fn new(config: &PhyTraceConfig) -> Self {
150 Self {
151 config: config.clone(),
152 event_tx: None,
153 callback: None,
154 }
155 }
156
157 pub fn with_channel(mut self, tx: mpsc::Sender<UdmEvent>) -> Self {
159 self.event_tx = Some(tx);
160 self
161 }
162
163 pub fn with_callback<F>(mut self, callback: F) -> Self
165 where
166 F: Fn(&mut EventBuilder) + Send + Sync + 'static,
167 {
168 self.callback = Some(Box::new(callback));
169 self
170 }
171
172 pub fn with_interval(mut self, interval: Duration) -> Self {
174 self.config.emitter.interval_ms = interval.as_millis() as u64;
175 self
176 }
177
178 pub fn build(self) -> (PeriodicEmitter, mpsc::Receiver<UdmEvent>) {
182 let (tx, rx) = match self.event_tx {
183 Some(tx) => (tx, mpsc::channel(100).1), None => mpsc::channel(100),
185 };
186
187 let emitter = PeriodicEmitter::new(&self.config, tx);
188
189 (emitter, rx)
190 }
191
192 pub fn build_with_sender(self, tx: mpsc::Sender<UdmEvent>) -> PeriodicEmitter {
194 PeriodicEmitter::new(&self.config, tx)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::models::enums::SourceType;
202
203 fn test_config() -> PhyTraceConfig {
204 let mut config = PhyTraceConfig::new("test-robot");
205 config.source.source_type = SourceType::Amr;
206 config.emitter.interval_ms = 50; config
208 }
209
210 #[tokio::test]
211 async fn test_emitter_creation() {
212 let config = test_config();
213 let (tx, _rx) = mpsc::channel(10);
214 let emitter = PeriodicEmitter::new(&config, tx);
215
216 assert!(!emitter.is_running());
217 assert_eq!(emitter.interval(), Duration::from_millis(50));
218 }
219
220 #[tokio::test]
221 async fn test_emitter_start_stop() {
222 let config = test_config();
223 let (tx, _rx) = mpsc::channel(10);
224 let emitter = PeriodicEmitter::new(&config, tx);
225
226 emitter.start().await.unwrap();
227 assert!(emitter.is_running());
228
229 emitter.stop().await;
230 tokio::time::sleep(Duration::from_millis(10)).await;
232 assert!(!emitter.is_running());
233 }
234
235 #[tokio::test]
236 async fn test_emitter_produces_events() {
237 let config = test_config();
238 let (tx, mut rx) = mpsc::channel(10);
239 let emitter = PeriodicEmitter::new(&config, tx);
240
241 emitter.start().await.unwrap();
242
243 let event = tokio::time::timeout(Duration::from_millis(200), rx.recv())
245 .await
246 .expect("Timeout waiting for event")
247 .expect("Channel closed");
248
249 emitter.stop().await;
250
251 assert_eq!(
252 event.identity.unwrap().source_id,
253 Some("test-robot".to_string())
254 );
255 }
256
257 #[tokio::test]
258 async fn test_emitter_with_callback() {
259 let config = test_config();
260 let (tx, mut rx) = mpsc::channel(10);
261 let emitter = PeriodicEmitter::new(&config, tx);
262
263 emitter
264 .set_callback(Box::new(|builder| {
265 *builder = builder.clone().position_2d(42.0, 24.0);
266 }))
267 .await;
268
269 emitter.start().await.unwrap();
270
271 let event = tokio::time::timeout(Duration::from_millis(200), rx.recv())
272 .await
273 .expect("Timeout")
274 .expect("Channel closed");
275
276 emitter.stop().await;
277
278 let local = event.location.unwrap().local.unwrap();
279 assert_eq!(local.x_m, Some(42.0));
280 assert_eq!(local.y_m, Some(24.0));
281 }
282
283 #[tokio::test]
284 async fn test_emitter_builder() {
285 let config = test_config();
286 let (emitter, _rx) = EmitterBuilder::new(&config)
287 .with_interval(Duration::from_millis(100))
288 .build();
289
290 assert_eq!(emitter.interval(), Duration::from_millis(100));
291 }
292}