1use std::path::Path;
37use std::sync::Arc;
38use tokio::sync::{mpsc, Mutex, RwLock};
39
40use crate::core::builder::EventBuilder;
41use crate::core::config::PhyTraceConfig;
42use crate::core::license::{LicenseError, LicenseStatus, LicenseValidator};
43use crate::core::provenance::ProvenanceSigner;
44use crate::core::validation::{ValidationLevel, Validator};
45use crate::error::{PhyTraceError, PhyTraceResult};
46use crate::models::event::UdmEvent;
47use crate::reliability::batch::BatchProcessor;
48use crate::reliability::buffer::FileBuffer;
49use crate::reliability::retry::RetryHandler;
50#[cfg(feature = "http")]
51use crate::transport::http::HttpTransport;
52use crate::transport::mock::MockTransport;
53use crate::transport::traits::{Transport, TransportStats};
54
55pub struct PhyTraceAgent {
64 config: PhyTraceConfig,
65 transport: Arc<RwLock<Box<dyn Transport>>>,
66 buffer: Option<Arc<FileBuffer>>,
67 batch_processor: Arc<BatchProcessor>,
68 retry_handler: RetryHandler,
69 signer: Option<ProvenanceSigner>,
70 validator: Validator,
71 license_validator: Option<Arc<LicenseValidator>>,
72 event_tx: mpsc::Sender<UdmEvent>,
73 running: Arc<std::sync::atomic::AtomicBool>,
74 worker_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
75 license_heartbeat_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
76}
77
78impl PhyTraceAgent {
79 pub async fn from_config(config: PhyTraceConfig) -> PhyTraceResult<Self> {
81 config.validate().map_err(|e| {
82 PhyTraceError::Config(crate::error::ConfigError::Validation(e.to_string()))
83 })?;
84
85 let transport: Box<dyn Transport> = if config.transport.transport_type == "mock" {
87 Box::new(MockTransport::new())
88 } else {
89 #[cfg(feature = "http")]
90 {
91 Box::new(HttpTransport::from_config(&config)?)
92 }
93 #[cfg(not(feature = "http"))]
94 {
95 return Err(PhyTraceError::Config(
96 "HTTP transport not available. Enable the 'http' feature.".to_string(),
97 ));
98 }
99 };
100
101 Self::with_transport(config, transport).await
102 }
103
104 pub async fn with_transport(
106 config: PhyTraceConfig,
107 transport: Box<dyn Transport>,
108 ) -> PhyTraceResult<Self> {
109 let license_validator = if config.transport.transport_type != "mock" {
111 let validator = LicenseValidator::new(
112 config.license.token.clone(),
113 Some(config.transport.endpoint.clone()),
114 config.license.grace_period_hours,
115 );
116
117 let status = validator.validate();
119 if !status.allows_operation() {
120 return Err(PhyTraceError::License(LicenseError::ValidationFailed(
121 format!(
122 "License validation failed: {}. \
123 Set PHYWARE_DEV_MODE=1 for local development, \
124 or provide a valid license token.",
125 status.as_str()
126 ),
127 )));
128 }
129
130 if status == LicenseStatus::GracePeriod {
131 tracing::warn!(
132 "License is in grace period. Events will be quarantined until license is renewed."
133 );
134 }
135
136 Some(Arc::new(validator))
137 } else {
138 None
139 };
140
141 let buffer = if config.buffer.enabled {
143 Some(Arc::new(FileBuffer::new(&config.buffer)?))
144 } else {
145 None
146 };
147
148 let batch_processor = Arc::new(BatchProcessor::from_config(&config.buffer));
150
151 let retry_handler = RetryHandler::new(&config.retry);
153
154 let signer = if config.provenance.enabled {
156 Some(ProvenanceSigner::from_config(&config.provenance)?)
157 } else {
158 None
159 };
160
161 let (event_tx, event_rx) = mpsc::channel(1000);
163
164 let agent = Self {
165 config,
166 transport: Arc::new(RwLock::new(transport)),
167 buffer,
168 batch_processor,
169 retry_handler,
170 signer,
171 validator: Validator::new(ValidationLevel::Basic),
172 license_validator,
173 event_tx,
174 running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
175 worker_handle: Mutex::new(None),
176 license_heartbeat_handle: Mutex::new(None),
177 };
178
179 agent.start_worker(event_rx).await;
181
182 Ok(agent)
183 }
184
185 pub async fn from_file<P: AsRef<Path>>(path: P) -> PhyTraceResult<Self> {
187 let config = PhyTraceConfig::from_file(path)?;
188 Self::from_config(config).await
189 }
190
191 pub async fn start(&self) -> PhyTraceResult<()> {
193 use std::sync::atomic::Ordering;
194
195 self.transport.write().await.connect().await?;
197 self.running.store(true, Ordering::Relaxed);
198
199 if let Some(ref validator) = self.license_validator {
201 if !validator.is_dev_mode() {
202 let validator = validator.clone();
203 let running = self.running.clone();
204
205 let handle = tokio::spawn(async move {
206 loop {
207 tokio::time::sleep(std::time::Duration::from_secs(3600)).await; if !running.load(Ordering::Relaxed) {
210 break;
211 }
212
213 let status = validator.validate_online().await;
214 match status {
215 LicenseStatus::Expired => {
216 tracing::error!(
217 "License has expired. Please renew your license to continue."
218 );
219 }
220 LicenseStatus::GracePeriod => {
221 tracing::warn!(
222 "License is in grace period. Events are being quarantined."
223 );
224 }
225 _ => {}
226 }
227 }
228 });
229
230 *self.license_heartbeat_handle.lock().await = Some(handle);
231 }
232 }
233
234 Ok(())
235 }
236
237 pub async fn shutdown(&self) -> PhyTraceResult<()> {
239 use std::sync::atomic::Ordering;
240
241 self.running.store(false, Ordering::Relaxed);
242
243 if let Some(handle) = self.license_heartbeat_handle.lock().await.take() {
245 handle.abort();
246 }
247
248 self.flush().await?;
250
251 self.transport.write().await.disconnect().await?;
253
254 if let Some(handle) = self.worker_handle.lock().await.take() {
256 handle.abort();
257 }
258
259 Ok(())
260 }
261
262 pub fn emit(&self) -> AgentEventBuilder<'_> {
264 AgentEventBuilder {
265 builder: EventBuilder::new(&self.config),
266 agent: self,
267 }
268 }
269
270 pub async fn send(&self, mut event: UdmEvent) -> PhyTraceResult<()> {
272 self.validator.validate(&event)?;
274
275 if let Some(ref signer) = self.signer {
277 signer.sign(&mut event)?;
278 }
279
280 if let Some(ref validator) = self.license_validator {
282 let metadata = validator.get_license_metadata().await;
283 event.inject_license_metadata(&metadata);
284 }
285
286 Box::pin(self.event_tx.send(event))
288 .await
289 .map_err(|e| PhyTraceError::Transport(format!("Event channel closed: {e}")))?;
290
291 Ok(())
292 }
293
294 pub async fn flush(&self) -> PhyTraceResult<()> {
296 let batch = self.batch_processor.flush().await;
297 if !batch.is_empty() {
298 let transport = self.transport.read().await;
299 self.retry_handler
300 .execute(|| async {
301 transport.send_batch(&batch).await?;
302 Ok(())
303 })
304 .await?;
305 }
306 Ok(())
307 }
308
309 pub async fn stats(&self) -> TransportStats {
311 self.transport.read().await.stats()
312 }
313
314 pub fn is_running(&self) -> bool {
316 self.running.load(std::sync::atomic::Ordering::Relaxed)
317 }
318
319 pub fn config(&self) -> &PhyTraceConfig {
321 &self.config
322 }
323
324 pub fn set_validation_level(&mut self, level: ValidationLevel) {
326 self.validator = Validator::new(level);
327 }
328
329 async fn start_worker(&self, mut event_rx: mpsc::Receiver<UdmEvent>) {
331 let transport = self.transport.clone();
332 let batch_processor = self.batch_processor.clone();
333 let retry_handler = self.retry_handler.clone();
334 let buffer = self.buffer.clone();
335 let running = self.running.clone();
336 let config = self.config.clone();
337
338 let handle = tokio::spawn(async move {
339 use std::sync::atomic::Ordering;
340 let flush_interval = std::time::Duration::from_secs(config.buffer.flush_interval_secs);
341 let mut flush_timer = tokio::time::interval(flush_interval);
342 flush_timer.tick().await;
344
345 loop {
346 tokio::select! {
347 Some(event) = event_rx.recv() => {
349 if let Some(batch) = Box::pin(batch_processor.add(event)).await {
350 Self::send_batch_with_retry(
351 &transport,
352 &retry_handler,
353 &buffer,
354 batch,
355 ).await;
356 }
357 }
358
359 _ = flush_timer.tick() => {
361 if !running.load(Ordering::Relaxed) {
362 break;
363 }
364
365 if batch_processor.should_flush().await {
366 let batch = batch_processor.flush().await;
367 if !batch.is_empty() {
368 Self::send_batch_with_retry(
369 &transport,
370 &retry_handler,
371 &buffer,
372 batch,
373 ).await;
374 }
375 }
376
377 if let Some(ref buf) = buffer {
379 Box::pin(Self::replay_buffered_events(&transport, &retry_handler, buf)).await;
380 }
381 }
382 }
383 }
384 });
385
386 *self.worker_handle.lock().await = Some(handle);
387 }
388
389 async fn send_batch_with_retry(
390 transport: &Arc<RwLock<Box<dyn Transport>>>,
391 retry_handler: &RetryHandler,
392 buffer: &Option<Arc<FileBuffer>>,
393 batch: Vec<UdmEvent>,
394 ) {
395 let transport_ref = transport.read().await;
396
397 let result = retry_handler
398 .execute(|| async {
399 transport_ref.send_batch(&batch).await?;
400 Ok(())
401 })
402 .await;
403
404 if result.is_err() {
406 if let Some(ref buf) = buffer {
407 drop(buf.buffer_events(&batch));
408 }
409 }
410 }
411
412 async fn replay_buffered_events(
413 transport: &Arc<RwLock<Box<dyn Transport>>>,
414 retry_handler: &RetryHandler,
415 buffer: &Arc<FileBuffer>,
416 ) {
417 if !transport.read().await.is_connected().await {
419 return;
420 }
421
422 if let Ok(buffered) = buffer.read_events() {
423 let transport_ref = transport.read().await;
424
425 for buffered_event in buffered {
426 let result = retry_handler
427 .execute(|| async {
428 transport_ref.send(&buffered_event.event).await?;
429 Ok(())
430 })
431 .await;
432
433 if result.is_ok() {
434 drop(buffer.remove_file(&buffered_event.filepath));
435 } else {
436 break;
438 }
439 }
440 }
441 }
442}
443
444pub struct AgentEventBuilder<'a> {
446 builder: EventBuilder,
447 agent: &'a PhyTraceAgent,
448}
449
450impl<'a> AgentEventBuilder<'a> {
451 pub fn source_id(mut self, source_id: impl Into<String>) -> Self {
453 self.builder = self.builder.source_id(source_id);
454 self
455 }
456
457 pub fn source_type(mut self, source_type: crate::models::enums::SourceType) -> Self {
459 self.builder = self.builder.source_type(source_type);
460 self
461 }
462
463 pub fn position_2d(mut self, x: f64, y: f64) -> Self {
465 self.builder = self.builder.position_2d(x, y);
466 self
467 }
468
469 pub fn gps(mut self, latitude: f64, longitude: f64) -> Self {
471 self.builder = self.builder.gps(latitude, longitude);
472 self
473 }
474
475 pub fn position_3d(mut self, x: f64, y: f64, z: f64) -> Self {
477 self.builder = self.builder.position_3d(x, y, z);
478 self
479 }
480
481 pub fn heading(mut self, heading_rad: f64) -> Self {
483 self.builder = self.builder.heading(heading_rad);
484 self
485 }
486
487 pub fn speed(mut self, speed_mps: f64) -> Self {
489 self.builder = self.builder.speed(speed_mps);
490 self
491 }
492
493 pub fn battery_soc(mut self, soc_percent: f64) -> Self {
495 self.builder = self.builder.battery_soc(soc_percent);
496 self
497 }
498
499 pub fn operational_mode(mut self, mode: crate::models::enums::OperationalMode) -> Self {
501 self.builder = self.builder.operational_mode(mode);
502 self
503 }
504
505 pub fn safety_state(mut self, state: crate::models::enums::SafetyState) -> Self {
507 self.builder = self.builder.safety_state(state);
508 self
509 }
510
511 pub fn event_type(mut self, event_type: crate::models::enums::EventType) -> Self {
513 self.builder = self.builder.event_type(event_type);
514 self
515 }
516
517 pub fn tag(mut self, tag: impl Into<String>) -> Self {
519 self.builder = self.builder.tag(tag);
520 self
521 }
522
523 pub fn extensions(mut self, extensions: serde_json::Value) -> Self {
525 self.builder = self.builder.extensions(extensions);
526 self
527 }
528
529 pub fn with_builder<F>(mut self, f: F) -> Self
531 where
532 F: FnOnce(EventBuilder) -> EventBuilder,
533 {
534 self.builder = f(self.builder);
535 self
536 }
537
538 pub async fn send(self) -> PhyTraceResult<()> {
540 let event = self.builder.build()?;
541 Box::pin(self.agent.send(event)).await
542 }
543
544 pub fn build(self) -> PhyTraceResult<UdmEvent> {
546 self.builder.build()
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use crate::core::validation::ValidationLevel;
554 use crate::models::enums::{EventType, OperationalMode, SafetyState, SourceType};
555 use crate::transport::mock::MockTransport;
556 use tempfile::tempdir;
557
558 fn test_config() -> PhyTraceConfig {
559 let dir = tempdir().unwrap();
560 let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
561 config.transport.transport_type = "mock".to_string();
562 config.transport.endpoint = "http://localhost:8080".to_string();
563 config.buffer.path = dir.path().to_string_lossy().to_string();
564 config.buffer.enabled = false; config
566 }
567
568 fn test_config_with_buffer() -> (PhyTraceConfig, tempfile::TempDir) {
569 let dir = tempdir().unwrap();
570 let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
571 config.transport.transport_type = "mock".to_string();
572 config.transport.endpoint = "http://localhost:8080".to_string();
573 config.buffer.path = dir.path().to_string_lossy().to_string();
574 config.buffer.enabled = true;
575 config.buffer.flush_interval_secs = 1;
576 config.buffer.batch_size = 5;
577 (config, dir)
578 }
579
580 #[tokio::test]
581 async fn test_agent_creation() {
582 let config = test_config();
583 let agent = PhyTraceAgent::from_config(config).await.unwrap();
584
585 assert!(!agent.is_running());
586 }
587
588 #[tokio::test]
589 async fn test_agent_with_mock_transport() {
590 let config = test_config();
591 let mock = MockTransport::new();
592
593 let agent = PhyTraceAgent::with_transport(config, Box::new(mock))
594 .await
595 .unwrap();
596
597 agent.start().await.unwrap();
598 assert!(agent.is_running());
599
600 agent.shutdown().await.unwrap();
601 }
602
603 #[tokio::test]
604 async fn test_agent_emit() {
605 let config = test_config();
606
607 let agent = PhyTraceAgent::with_transport(config, Box::new(MockTransport::new()))
608 .await
609 .unwrap();
610
611 agent.start().await.unwrap();
612
613 Box::pin(
615 agent
616 .emit()
617 .position_2d(10.0, 20.0)
618 .battery_soc(85.0)
619 .send(),
620 )
621 .await
622 .unwrap();
623
624 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
626
627 agent.shutdown().await.unwrap();
628 }
629
630 #[tokio::test]
631 async fn test_agent_fluent_builder() {
632 let config = test_config();
633 let agent = PhyTraceAgent::from_config(config).await.unwrap();
634
635 let event = agent
636 .emit()
637 .position_2d(42.0, 24.0)
638 .heading(1.57)
639 .speed(2.5)
640 .battery_soc(90.0)
641 .tag("test")
642 .build()
643 .unwrap();
644
645 assert!(event.location.is_some());
646 assert!(event.motion.is_some());
647 assert!(event.power.is_some());
648 }
649
650 #[tokio::test]
651 async fn test_agent_from_yaml_file() {
652 let dir = tempdir().unwrap();
653 let config_path = dir.path().join("config.yaml");
654
655 let yaml_content = format!(
656 r#"
657source:
658 source_id: "yaml-robot"
659 source_type: amr
660transport:
661 transport_type: mock
662 endpoint: "http://localhost:8080"
663buffer:
664 enabled: false
665 path: "{}"
666"#,
667 dir.path().to_string_lossy().replace('\\', "/")
668 );
669
670 std::fs::write(&config_path, yaml_content).unwrap();
671
672 let agent = PhyTraceAgent::from_file(&config_path).await.unwrap();
673 assert_eq!(agent.config().source.source_id, "yaml-robot");
674 }
675
676 #[tokio::test]
677 async fn test_agent_config_validation_fails_empty_source_id() {
678 let dir = tempdir().unwrap();
679 let mut config = PhyTraceConfig::new("");
680 config.transport.transport_type = "mock".to_string();
681 config.buffer.path = dir.path().to_string_lossy().to_string();
682 config.buffer.enabled = false;
683
684 let result = PhyTraceAgent::from_config(config).await;
685 assert!(result.is_err());
686 }
687
688 #[tokio::test]
689 async fn test_agent_stats() {
690 let config = test_config();
691 let agent = PhyTraceAgent::from_config(config).await.unwrap();
692
693 let stats = agent.stats().await;
694 assert_eq!(stats.events_sent, 0);
695 assert_eq!(stats.events_succeeded, 0);
696 }
697
698 #[tokio::test]
699 async fn test_agent_set_validation_level() {
700 let config = test_config();
701 let mut agent = PhyTraceAgent::from_config(config).await.unwrap();
702
703 agent.set_validation_level(ValidationLevel::Full);
704 let event = agent.emit().position_2d(1.0, 2.0).build().unwrap();
706 assert!(event.location.is_some());
707
708 agent.set_validation_level(ValidationLevel::None);
709 let event = agent.emit().position_2d(3.0, 4.0).build().unwrap();
710 assert!(event.location.is_some());
711 }
712
713 #[tokio::test]
714 async fn test_agent_flush() {
715 let config = test_config();
716 let agent = PhyTraceAgent::from_config(config).await.unwrap();
717 agent.start().await.unwrap();
718
719 Box::pin(agent.emit().position_2d(1.0, 2.0).send())
721 .await
722 .unwrap();
723 Box::pin(agent.emit().position_2d(3.0, 4.0).send())
724 .await
725 .unwrap();
726
727 agent.flush().await.unwrap();
729
730 agent.shutdown().await.unwrap();
731 }
732
733 #[tokio::test]
734 async fn test_agent_config_access() {
735 let config = test_config();
736 let agent = PhyTraceAgent::from_config(config).await.unwrap();
737
738 let cfg = agent.config();
739 assert_eq!(cfg.source.source_id, "test-robot");
740 assert_eq!(cfg.source.source_type, SourceType::Amr);
741 assert_eq!(cfg.transport.transport_type, "mock");
742 }
743
744 #[tokio::test]
745 async fn test_agent_builder_position_3d() {
746 let config = test_config();
747 let agent = PhyTraceAgent::from_config(config).await.unwrap();
748
749 let event = agent.emit().position_3d(1.0, 2.0, 3.0).build().unwrap();
750
751 assert!(event.location.is_some());
752 let loc = event.location.unwrap();
753 assert!(loc.local.is_some());
754 let local = loc.local.unwrap();
755 assert_eq!(local.x_m, Some(1.0));
756 assert_eq!(local.y_m, Some(2.0));
757 assert_eq!(local.z_m, Some(3.0));
758 }
759
760 #[tokio::test]
761 async fn test_agent_builder_operational_mode() {
762 let config = test_config();
763 let agent = PhyTraceAgent::from_config(config).await.unwrap();
764
765 let event = agent
766 .emit()
767 .operational_mode(OperationalMode::Manual)
768 .build()
769 .unwrap();
770
771 assert!(event.operational.is_some());
772 assert_eq!(
773 event.operational.unwrap().mode,
774 Some(OperationalMode::Manual)
775 );
776 }
777
778 #[tokio::test]
779 async fn test_agent_builder_safety_state() {
780 let config = test_config();
781 let agent = PhyTraceAgent::from_config(config).await.unwrap();
782
783 let event = agent
784 .emit()
785 .safety_state(SafetyState::Warning)
786 .build()
787 .unwrap();
788
789 assert!(event.safety.is_some());
790 }
791
792 #[tokio::test]
793 async fn test_agent_builder_event_type() {
794 let config = test_config();
795 let agent = PhyTraceAgent::from_config(config).await.unwrap();
796
797 let event = agent
798 .emit()
799 .event_type(EventType::SafetyViolation)
800 .build()
801 .unwrap();
802
803 assert_eq!(event.event_type, EventType::SafetyViolation);
804 }
805
806 #[tokio::test]
807 async fn test_agent_builder_extensions() {
808 let config = test_config();
809 let agent = PhyTraceAgent::from_config(config).await.unwrap();
810
811 let extensions = serde_json::json!({
812 "custom_field": "custom_value",
813 "numeric_field": 42
814 });
815
816 let event = agent.emit().extensions(extensions.clone()).build().unwrap();
817
818 assert!(event.extensions.is_some());
819 let ext = event.extensions.unwrap();
820 assert_eq!(ext["custom_field"], "custom_value");
821 assert_eq!(ext["numeric_field"], 42);
822 }
823
824 #[tokio::test]
825 async fn test_agent_builder_with_builder() {
826 let config = test_config();
827 let agent = PhyTraceAgent::from_config(config).await.unwrap();
828
829 let event = agent
830 .emit()
831 .with_builder(|b| b.position_2d(100.0, 200.0).speed(5.0))
832 .build()
833 .unwrap();
834
835 assert!(event.location.is_some());
836 assert!(event.motion.is_some());
837 }
838
839 #[tokio::test]
840 async fn test_agent_multiple_tags() {
841 let config = test_config();
842 let agent = PhyTraceAgent::from_config(config).await.unwrap();
843
844 let event = agent
845 .emit()
846 .tag("tag1")
847 .tag("tag2")
848 .tag("tag3")
849 .build()
850 .unwrap();
851
852 assert!(event.identity.is_some());
854 let identity = event.identity.unwrap();
855 assert!(identity.tags.is_some());
856 let tags = identity.tags.unwrap();
857 assert!(tags.contains(&"tag1".to_string()));
858 assert!(tags.contains(&"tag2".to_string()));
859 assert!(tags.contains(&"tag3".to_string()));
860 }
861
862 #[tokio::test]
863 async fn test_agent_with_buffer_enabled() {
864 let (config, _dir) = test_config_with_buffer();
865 let agent = PhyTraceAgent::from_config(config).await.unwrap();
866
867 agent.start().await.unwrap();
868
869 for i in 0..3 {
871 Box::pin(agent.emit().position_2d(i as f64, i as f64 * 2.0).send())
872 .await
873 .unwrap();
874 }
875
876 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
878
879 agent.shutdown().await.unwrap();
880 }
881
882 #[tokio::test]
883 async fn test_agent_complete_event_builder() {
884 let config = test_config();
885 let agent = PhyTraceAgent::from_config(config).await.unwrap();
886
887 let event = agent
888 .emit()
889 .position_3d(1.0, 2.0, 3.0)
890 .heading(std::f64::consts::PI / 2.0)
891 .speed(1.5)
892 .battery_soc(75.0)
893 .operational_mode(OperationalMode::Autonomous)
894 .safety_state(SafetyState::Normal)
895 .event_type(EventType::TelemetryPeriodic)
896 .tag("integration-test")
897 .extensions(serde_json::json!({"test": true}))
898 .build()
899 .unwrap();
900
901 assert!(event.location.is_some());
902 assert!(event.motion.is_some());
903 assert!(event.power.is_some());
904 assert!(event.operational.is_some());
905 assert!(event.safety.is_some());
906 assert_eq!(event.event_type, EventType::TelemetryPeriodic);
907 assert!(event
908 .identity
909 .as_ref()
910 .unwrap()
911 .tags
912 .as_ref()
913 .unwrap()
914 .contains(&"integration-test".to_string()));
915 assert!(event.extensions.is_some());
916 }
917
918 #[tokio::test]
919 async fn test_agent_shutdown_not_running() {
920 let config = test_config();
921 let agent = PhyTraceAgent::from_config(config).await.unwrap();
922
923 agent.shutdown().await.unwrap();
925 assert!(!agent.is_running());
926 }
927
928 #[tokio::test]
929 async fn test_agent_start_stop_cycle() {
930 let config = test_config();
931 let agent = PhyTraceAgent::from_config(config).await.unwrap();
932
933 agent.start().await.unwrap();
935 assert!(agent.is_running());
936
937 Box::pin(agent.emit().position_2d(1.0, 2.0).send())
939 .await
940 .unwrap();
941
942 agent.shutdown().await.unwrap();
944 assert!(!agent.is_running());
945 }
946
947 #[tokio::test]
948 async fn test_agent_emit_without_start() {
949 let config = test_config();
950 let agent = PhyTraceAgent::from_config(config).await.unwrap();
951
952 Box::pin(agent.emit().position_2d(1.0, 2.0).send())
954 .await
955 .unwrap();
956 }
957
958 #[tokio::test]
959 async fn test_agent_stats_after_emit() {
960 let config = test_config();
961 let agent = PhyTraceAgent::from_config(config).await.unwrap();
962
963 agent.start().await.unwrap();
964
965 Box::pin(agent.emit().position_2d(1.0, 2.0).send())
967 .await
968 .unwrap();
969
970 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
972
973 let stats = agent.stats().await;
974 assert_eq!(
976 stats.events_sent,
977 stats.events_succeeded + stats.events_failed
978 );
979
980 agent.shutdown().await.unwrap();
981 }
982
983 #[tokio::test]
984 async fn test_agent_builder_heading_and_speed() {
985 let config = test_config();
986 let agent = PhyTraceAgent::from_config(config).await.unwrap();
987
988 let event = agent.emit().heading(1.57).speed(3.0).build().unwrap();
989
990 assert!(event.motion.is_some());
991 let motion = event.motion.unwrap();
992 assert_eq!(motion.speed_mps, Some(3.0));
993 assert!(event.location.is_some());
995 }
996}