1use std::path::Path;
37use std::sync::Arc;
38use tokio::sync::{mpsc, Mutex, RwLock};
39
40use crate::core::builder::EventBuilder;
41use crate::core::config::PhyTraceConfig;
42use crate::core::license::{LicenseError, LicenseStatus, LicenseValidator};
43use crate::core::provenance::ProvenanceSigner;
44use crate::core::validation::{ValidationLevel, Validator};
45use crate::error::{PhyTraceError, PhyTraceResult};
46use crate::models::event::UdmEvent;
47use crate::reliability::batch::BatchProcessor;
48use crate::reliability::buffer::FileBuffer;
49use crate::reliability::retry::RetryHandler;
50#[cfg(feature = "http")]
51use crate::transport::http::HttpTransport;
52use crate::transport::mock::MockTransport;
53use crate::transport::traits::{Transport, TransportStats};
54
55pub struct PhyTraceAgent {
64 config: PhyTraceConfig,
65 transport: Arc<RwLock<Box<dyn Transport>>>,
66 buffer: Option<Arc<FileBuffer>>,
67 batch_processor: Arc<BatchProcessor>,
68 retry_handler: RetryHandler,
69 signer: Option<ProvenanceSigner>,
70 validator: Validator,
71 license_validator: Option<Arc<LicenseValidator>>,
72 event_tx: mpsc::Sender<UdmEvent>,
73 running: Arc<std::sync::atomic::AtomicBool>,
74 worker_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
75 license_heartbeat_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
76}
77
78impl PhyTraceAgent {
79 pub async fn from_config(config: PhyTraceConfig) -> PhyTraceResult<Self> {
81 config.validate().map_err(|e| {
82 PhyTraceError::Config(crate::error::ConfigError::Validation(e.to_string()))
83 })?;
84
85 let transport: Box<dyn Transport> = if config.transport.transport_type == "mock" {
87 Box::new(MockTransport::new())
88 } else {
89 #[cfg(feature = "http")]
90 {
91 Box::new(HttpTransport::from_config(&config)?)
92 }
93 #[cfg(not(feature = "http"))]
94 {
95 return Err(PhyTraceError::Config(
96 "HTTP transport not available. Enable the 'http' feature.".to_string(),
97 ));
98 }
99 };
100
101 Self::with_transport(config, transport).await
102 }
103
104 pub async fn with_transport(
106 config: PhyTraceConfig,
107 transport: Box<dyn Transport>,
108 ) -> PhyTraceResult<Self> {
109 let license_validator = if config.transport.transport_type != "mock" {
111 let validator = LicenseValidator::new(
112 config.license.token.clone(),
113 Some(config.transport.endpoint.clone()),
114 config.license.grace_period_hours,
115 );
116
117 let status = validator.validate();
119 if !status.allows_operation() {
120 return Err(PhyTraceError::License(LicenseError::ValidationFailed(
121 format!(
122 "License validation failed: {}. \
123 Set PHYWARE_DEV_MODE=1 for local development, \
124 or provide a valid license token.",
125 status.as_str()
126 ),
127 )));
128 }
129
130 if status == LicenseStatus::GracePeriod {
131 tracing::warn!(
132 "License is in grace period. Events will be quarantined until license is renewed."
133 );
134 }
135
136 Some(Arc::new(validator))
137 } else {
138 None
139 };
140
141 let buffer = if config.buffer.enabled {
143 Some(Arc::new(FileBuffer::new(&config.buffer)?))
144 } else {
145 None
146 };
147
148 let batch_processor = Arc::new(BatchProcessor::from_config(&config.buffer));
150
151 let retry_handler = RetryHandler::new(&config.retry);
153
154 let signer = if config.provenance.enabled {
156 Some(ProvenanceSigner::from_config(&config.provenance)?)
157 } else {
158 None
159 };
160
161 let (event_tx, event_rx) = mpsc::channel(1000);
163
164 let agent = Self {
165 config,
166 transport: Arc::new(RwLock::new(transport)),
167 buffer,
168 batch_processor,
169 retry_handler,
170 signer,
171 validator: Validator::new(ValidationLevel::Basic),
172 license_validator,
173 event_tx,
174 running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
175 worker_handle: Mutex::new(None),
176 license_heartbeat_handle: Mutex::new(None),
177 };
178
179 agent.start_worker(event_rx).await;
181
182 Ok(agent)
183 }
184
185 pub async fn from_file<P: AsRef<Path>>(path: P) -> PhyTraceResult<Self> {
187 let config = PhyTraceConfig::from_file(path)?;
188 Self::from_config(config).await
189 }
190
191 pub async fn start(&self) -> PhyTraceResult<()> {
193 use std::sync::atomic::Ordering;
194
195 self.transport.write().await.connect().await?;
197 self.running.store(true, Ordering::Relaxed);
198
199 if let Some(ref validator) = self.license_validator {
201 if !validator.is_dev_mode() {
202 let validator = validator.clone();
203 let running = self.running.clone();
204
205 let handle = tokio::spawn(async move {
206 loop {
207 tokio::time::sleep(std::time::Duration::from_secs(3600)).await; if !running.load(Ordering::Relaxed) {
210 break;
211 }
212
213 let status = validator.validate_online().await;
214 match status {
215 LicenseStatus::Expired => {
216 tracing::error!(
217 "License has expired. Please renew your license to continue."
218 );
219 }
220 LicenseStatus::GracePeriod => {
221 tracing::warn!(
222 "License is in grace period. Events are being quarantined."
223 );
224 }
225 _ => {}
226 }
227 }
228 });
229
230 *self.license_heartbeat_handle.lock().await = Some(handle);
231 }
232 }
233
234 Ok(())
235 }
236
237 pub async fn shutdown(&self) -> PhyTraceResult<()> {
239 use std::sync::atomic::Ordering;
240
241 self.running.store(false, Ordering::Relaxed);
242
243 if let Some(handle) = self.license_heartbeat_handle.lock().await.take() {
245 handle.abort();
246 }
247
248 self.flush().await?;
250
251 self.transport.write().await.disconnect().await?;
253
254 if let Some(handle) = self.worker_handle.lock().await.take() {
256 handle.abort();
257 }
258
259 Ok(())
260 }
261
262 pub fn emit(&self) -> AgentEventBuilder<'_> {
264 AgentEventBuilder {
265 builder: EventBuilder::new(&self.config),
266 agent: self,
267 }
268 }
269
270 pub async fn send(&self, mut event: UdmEvent) -> PhyTraceResult<()> {
272 self.validator.validate(&event)?;
274
275 if let Some(ref signer) = self.signer {
277 signer.sign(&mut event)?;
278 }
279
280 if let Some(ref validator) = self.license_validator {
282 let metadata = validator.get_license_metadata().await;
283 event.inject_license_metadata(&metadata);
284 }
285
286 self.event_tx
288 .send(event)
289 .await
290 .map_err(|_| PhyTraceError::Transport("Event channel closed".to_string()))?;
291
292 Ok(())
293 }
294
295 pub async fn flush(&self) -> PhyTraceResult<()> {
297 let batch = self.batch_processor.flush().await;
298 if !batch.is_empty() {
299 let transport = self.transport.read().await;
300 self.retry_handler
301 .execute(|| async {
302 transport.send_batch(&batch).await?;
303 Ok(())
304 })
305 .await?;
306 }
307 Ok(())
308 }
309
310 pub async fn stats(&self) -> TransportStats {
312 self.transport.read().await.stats()
313 }
314
315 pub fn is_running(&self) -> bool {
317 self.running.load(std::sync::atomic::Ordering::Relaxed)
318 }
319
320 pub fn config(&self) -> &PhyTraceConfig {
322 &self.config
323 }
324
325 pub fn set_validation_level(&mut self, level: ValidationLevel) {
327 self.validator = Validator::new(level);
328 }
329
330 async fn start_worker(&self, mut event_rx: mpsc::Receiver<UdmEvent>) {
332 let transport = self.transport.clone();
333 let batch_processor = self.batch_processor.clone();
334 let retry_handler = self.retry_handler.clone();
335 let buffer = self.buffer.clone();
336 let running = self.running.clone();
337 let config = self.config.clone();
338
339 let handle = tokio::spawn(async move {
340 use std::sync::atomic::Ordering;
341 let flush_interval = std::time::Duration::from_secs(config.buffer.flush_interval_secs);
342 let mut flush_timer = tokio::time::interval(flush_interval);
343 flush_timer.tick().await;
345
346 loop {
347 tokio::select! {
348 Some(event) = event_rx.recv() => {
350 if let Some(batch) = batch_processor.add(event).await {
351 Self::send_batch_with_retry(
352 &transport,
353 &retry_handler,
354 &buffer,
355 batch,
356 ).await;
357 }
358 }
359
360 _ = flush_timer.tick() => {
362 if !running.load(Ordering::Relaxed) {
363 break;
364 }
365
366 if batch_processor.should_flush().await {
367 let batch = batch_processor.flush().await;
368 if !batch.is_empty() {
369 Self::send_batch_with_retry(
370 &transport,
371 &retry_handler,
372 &buffer,
373 batch,
374 ).await;
375 }
376 }
377
378 if let Some(ref buf) = buffer {
380 Self::replay_buffered_events(&transport, &retry_handler, buf).await;
381 }
382 }
383 }
384 }
385 });
386
387 *self.worker_handle.lock().await = Some(handle);
388 }
389
390 async fn send_batch_with_retry(
391 transport: &Arc<RwLock<Box<dyn Transport>>>,
392 retry_handler: &RetryHandler,
393 buffer: &Option<Arc<FileBuffer>>,
394 batch: Vec<UdmEvent>,
395 ) {
396 let transport_ref = transport.read().await;
397
398 let result = retry_handler
399 .execute(|| async {
400 transport_ref.send_batch(&batch).await?;
401 Ok(())
402 })
403 .await;
404
405 if result.is_err() {
407 if let Some(ref buf) = buffer {
408 let _ = buf.buffer_events(&batch);
409 }
410 }
411 }
412
413 async fn replay_buffered_events(
414 transport: &Arc<RwLock<Box<dyn Transport>>>,
415 retry_handler: &RetryHandler,
416 buffer: &Arc<FileBuffer>,
417 ) {
418 if !transport.read().await.is_connected().await {
420 return;
421 }
422
423 if let Ok(buffered) = buffer.read_events() {
424 let transport_ref = transport.read().await;
425
426 for buffered_event in buffered {
427 let result = retry_handler
428 .execute(|| async {
429 transport_ref.send(&buffered_event.event).await?;
430 Ok(())
431 })
432 .await;
433
434 if result.is_ok() {
435 let _ = buffer.remove_file(&buffered_event.filepath);
436 } else {
437 break;
439 }
440 }
441 }
442 }
443}
444
445pub struct AgentEventBuilder<'a> {
447 builder: EventBuilder,
448 agent: &'a PhyTraceAgent,
449}
450
451impl<'a> AgentEventBuilder<'a> {
452 pub fn source_id(mut self, source_id: impl Into<String>) -> Self {
454 self.builder = self.builder.source_id(source_id);
455 self
456 }
457
458 pub fn source_type(mut self, source_type: crate::models::enums::SourceType) -> Self {
460 self.builder = self.builder.source_type(source_type);
461 self
462 }
463
464 pub fn position_2d(mut self, x: f64, y: f64) -> Self {
466 self.builder = self.builder.position_2d(x, y);
467 self
468 }
469
470 pub fn gps(mut self, latitude: f64, longitude: f64) -> Self {
472 self.builder = self.builder.gps(latitude, longitude);
473 self
474 }
475
476 pub fn position_3d(mut self, x: f64, y: f64, z: f64) -> Self {
478 self.builder = self.builder.position_3d(x, y, z);
479 self
480 }
481
482 pub fn heading(mut self, heading_rad: f64) -> Self {
484 self.builder = self.builder.heading(heading_rad);
485 self
486 }
487
488 pub fn speed(mut self, speed_mps: f64) -> Self {
490 self.builder = self.builder.speed(speed_mps);
491 self
492 }
493
494 pub fn battery_soc(mut self, soc_percent: f64) -> Self {
496 self.builder = self.builder.battery_soc(soc_percent);
497 self
498 }
499
500 pub fn operational_mode(mut self, mode: crate::models::enums::OperationalMode) -> Self {
502 self.builder = self.builder.operational_mode(mode);
503 self
504 }
505
506 pub fn safety_state(mut self, state: crate::models::enums::SafetyState) -> Self {
508 self.builder = self.builder.safety_state(state);
509 self
510 }
511
512 pub fn event_type(mut self, event_type: crate::models::enums::EventType) -> Self {
514 self.builder = self.builder.event_type(event_type);
515 self
516 }
517
518 pub fn tag(mut self, tag: impl Into<String>) -> Self {
520 self.builder = self.builder.tag(tag);
521 self
522 }
523
524 pub fn extensions(mut self, extensions: serde_json::Value) -> Self {
526 self.builder = self.builder.extensions(extensions);
527 self
528 }
529
530 pub fn with_builder<F>(mut self, f: F) -> Self
532 where
533 F: FnOnce(EventBuilder) -> EventBuilder,
534 {
535 self.builder = f(self.builder);
536 self
537 }
538
539 pub async fn send(self) -> PhyTraceResult<()> {
541 let event = self.builder.build()?;
542 self.agent.send(event).await
543 }
544
545 pub fn build(self) -> PhyTraceResult<UdmEvent> {
547 self.builder.build()
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554 use crate::core::validation::ValidationLevel;
555 use crate::models::enums::{EventType, OperationalMode, SafetyState, SourceType};
556 use crate::transport::mock::MockTransport;
557 use tempfile::tempdir;
558
559 fn test_config() -> PhyTraceConfig {
560 let dir = tempdir().unwrap();
561 let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
562 config.transport.transport_type = "mock".to_string();
563 config.transport.endpoint = "http://localhost:8080".to_string();
564 config.buffer.path = dir.path().to_string_lossy().to_string();
565 config.buffer.enabled = false; config
567 }
568
569 fn test_config_with_buffer() -> (PhyTraceConfig, tempfile::TempDir) {
570 let dir = tempdir().unwrap();
571 let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
572 config.transport.transport_type = "mock".to_string();
573 config.transport.endpoint = "http://localhost:8080".to_string();
574 config.buffer.path = dir.path().to_string_lossy().to_string();
575 config.buffer.enabled = true;
576 config.buffer.flush_interval_secs = 1;
577 config.buffer.batch_size = 5;
578 (config, dir)
579 }
580
581 #[tokio::test]
582 async fn test_agent_creation() {
583 let config = test_config();
584 let agent = PhyTraceAgent::from_config(config).await.unwrap();
585
586 assert!(!agent.is_running());
587 }
588
589 #[tokio::test]
590 async fn test_agent_with_mock_transport() {
591 let config = test_config();
592 let mock = MockTransport::new();
593
594 let agent = PhyTraceAgent::with_transport(config, Box::new(mock))
595 .await
596 .unwrap();
597
598 agent.start().await.unwrap();
599 assert!(agent.is_running());
600
601 agent.shutdown().await.unwrap();
602 }
603
604 #[tokio::test]
605 async fn test_agent_emit() {
606 let config = test_config();
607
608 let agent = PhyTraceAgent::with_transport(config, Box::new(MockTransport::new()))
609 .await
610 .unwrap();
611
612 agent.start().await.unwrap();
613
614 agent
616 .emit()
617 .position_2d(10.0, 20.0)
618 .battery_soc(85.0)
619 .send()
620 .await
621 .unwrap();
622
623 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
625
626 agent.shutdown().await.unwrap();
627 }
628
629 #[tokio::test]
630 async fn test_agent_fluent_builder() {
631 let config = test_config();
632 let agent = PhyTraceAgent::from_config(config).await.unwrap();
633
634 let event = agent
635 .emit()
636 .position_2d(42.0, 24.0)
637 .heading(1.57)
638 .speed(2.5)
639 .battery_soc(90.0)
640 .tag("test")
641 .build()
642 .unwrap();
643
644 assert!(event.location.is_some());
645 assert!(event.motion.is_some());
646 assert!(event.power.is_some());
647 }
648
649 #[tokio::test]
650 async fn test_agent_from_yaml_file() {
651 let dir = tempdir().unwrap();
652 let config_path = dir.path().join("config.yaml");
653
654 let yaml_content = format!(
655 r#"
656source:
657 source_id: "yaml-robot"
658 source_type: amr
659transport:
660 transport_type: mock
661 endpoint: "http://localhost:8080"
662buffer:
663 enabled: false
664 path: "{}"
665"#,
666 dir.path().to_string_lossy().replace('\\', "/")
667 );
668
669 std::fs::write(&config_path, yaml_content).unwrap();
670
671 let agent = PhyTraceAgent::from_file(&config_path).await.unwrap();
672 assert_eq!(agent.config().source.source_id, "yaml-robot");
673 }
674
675 #[tokio::test]
676 async fn test_agent_config_validation_fails_empty_source_id() {
677 let dir = tempdir().unwrap();
678 let mut config = PhyTraceConfig::new("");
679 config.transport.transport_type = "mock".to_string();
680 config.buffer.path = dir.path().to_string_lossy().to_string();
681 config.buffer.enabled = false;
682
683 let result = PhyTraceAgent::from_config(config).await;
684 assert!(result.is_err());
685 }
686
687 #[tokio::test]
688 async fn test_agent_stats() {
689 let config = test_config();
690 let agent = PhyTraceAgent::from_config(config).await.unwrap();
691
692 let stats = agent.stats().await;
693 assert_eq!(stats.events_sent, 0);
694 assert_eq!(stats.events_succeeded, 0);
695 }
696
697 #[tokio::test]
698 async fn test_agent_set_validation_level() {
699 let config = test_config();
700 let mut agent = PhyTraceAgent::from_config(config).await.unwrap();
701
702 agent.set_validation_level(ValidationLevel::Full);
703 let event = agent.emit().position_2d(1.0, 2.0).build().unwrap();
705 assert!(event.location.is_some());
706
707 agent.set_validation_level(ValidationLevel::None);
708 let event = agent.emit().position_2d(3.0, 4.0).build().unwrap();
709 assert!(event.location.is_some());
710 }
711
712 #[tokio::test]
713 async fn test_agent_flush() {
714 let config = test_config();
715 let agent = PhyTraceAgent::from_config(config).await.unwrap();
716 agent.start().await.unwrap();
717
718 agent.emit().position_2d(1.0, 2.0).send().await.unwrap();
720 agent.emit().position_2d(3.0, 4.0).send().await.unwrap();
721
722 agent.flush().await.unwrap();
724
725 agent.shutdown().await.unwrap();
726 }
727
728 #[tokio::test]
729 async fn test_agent_config_access() {
730 let config = test_config();
731 let agent = PhyTraceAgent::from_config(config).await.unwrap();
732
733 let cfg = agent.config();
734 assert_eq!(cfg.source.source_id, "test-robot");
735 assert_eq!(cfg.source.source_type, SourceType::Amr);
736 assert_eq!(cfg.transport.transport_type, "mock");
737 }
738
739 #[tokio::test]
740 async fn test_agent_builder_position_3d() {
741 let config = test_config();
742 let agent = PhyTraceAgent::from_config(config).await.unwrap();
743
744 let event = agent.emit().position_3d(1.0, 2.0, 3.0).build().unwrap();
745
746 assert!(event.location.is_some());
747 let loc = event.location.unwrap();
748 assert!(loc.local.is_some());
749 let local = loc.local.unwrap();
750 assert_eq!(local.x_m, Some(1.0));
751 assert_eq!(local.y_m, Some(2.0));
752 assert_eq!(local.z_m, Some(3.0));
753 }
754
755 #[tokio::test]
756 async fn test_agent_builder_operational_mode() {
757 let config = test_config();
758 let agent = PhyTraceAgent::from_config(config).await.unwrap();
759
760 let event = agent
761 .emit()
762 .operational_mode(OperationalMode::Manual)
763 .build()
764 .unwrap();
765
766 assert!(event.operational.is_some());
767 assert_eq!(
768 event.operational.unwrap().mode,
769 Some(OperationalMode::Manual)
770 );
771 }
772
773 #[tokio::test]
774 async fn test_agent_builder_safety_state() {
775 let config = test_config();
776 let agent = PhyTraceAgent::from_config(config).await.unwrap();
777
778 let event = agent
779 .emit()
780 .safety_state(SafetyState::Warning)
781 .build()
782 .unwrap();
783
784 assert!(event.safety.is_some());
785 }
786
787 #[tokio::test]
788 async fn test_agent_builder_event_type() {
789 let config = test_config();
790 let agent = PhyTraceAgent::from_config(config).await.unwrap();
791
792 let event = agent
793 .emit()
794 .event_type(EventType::SafetyViolation)
795 .build()
796 .unwrap();
797
798 assert_eq!(event.event_type, EventType::SafetyViolation);
799 }
800
801 #[tokio::test]
802 async fn test_agent_builder_extensions() {
803 let config = test_config();
804 let agent = PhyTraceAgent::from_config(config).await.unwrap();
805
806 let extensions = serde_json::json!({
807 "custom_field": "custom_value",
808 "numeric_field": 42
809 });
810
811 let event = agent.emit().extensions(extensions.clone()).build().unwrap();
812
813 assert!(event.extensions.is_some());
814 let ext = event.extensions.unwrap();
815 assert_eq!(ext["custom_field"], "custom_value");
816 assert_eq!(ext["numeric_field"], 42);
817 }
818
819 #[tokio::test]
820 async fn test_agent_builder_with_builder() {
821 let config = test_config();
822 let agent = PhyTraceAgent::from_config(config).await.unwrap();
823
824 let event = agent
825 .emit()
826 .with_builder(|b| b.position_2d(100.0, 200.0).speed(5.0))
827 .build()
828 .unwrap();
829
830 assert!(event.location.is_some());
831 assert!(event.motion.is_some());
832 }
833
834 #[tokio::test]
835 async fn test_agent_multiple_tags() {
836 let config = test_config();
837 let agent = PhyTraceAgent::from_config(config).await.unwrap();
838
839 let event = agent
840 .emit()
841 .tag("tag1")
842 .tag("tag2")
843 .tag("tag3")
844 .build()
845 .unwrap();
846
847 assert!(event.identity.is_some());
849 let identity = event.identity.unwrap();
850 assert!(identity.tags.is_some());
851 let tags = identity.tags.unwrap();
852 assert!(tags.contains(&"tag1".to_string()));
853 assert!(tags.contains(&"tag2".to_string()));
854 assert!(tags.contains(&"tag3".to_string()));
855 }
856
857 #[tokio::test]
858 async fn test_agent_with_buffer_enabled() {
859 let (config, _dir) = test_config_with_buffer();
860 let agent = PhyTraceAgent::from_config(config).await.unwrap();
861
862 agent.start().await.unwrap();
863
864 for i in 0..3 {
866 agent
867 .emit()
868 .position_2d(i as f64, i as f64 * 2.0)
869 .send()
870 .await
871 .unwrap();
872 }
873
874 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
876
877 agent.shutdown().await.unwrap();
878 }
879
880 #[tokio::test]
881 async fn test_agent_complete_event_builder() {
882 let config = test_config();
883 let agent = PhyTraceAgent::from_config(config).await.unwrap();
884
885 let event = agent
886 .emit()
887 .position_3d(1.0, 2.0, 3.0)
888 .heading(std::f64::consts::PI / 2.0)
889 .speed(1.5)
890 .battery_soc(75.0)
891 .operational_mode(OperationalMode::Autonomous)
892 .safety_state(SafetyState::Normal)
893 .event_type(EventType::TelemetryPeriodic)
894 .tag("integration-test")
895 .extensions(serde_json::json!({"test": true}))
896 .build()
897 .unwrap();
898
899 assert!(event.location.is_some());
900 assert!(event.motion.is_some());
901 assert!(event.power.is_some());
902 assert!(event.operational.is_some());
903 assert!(event.safety.is_some());
904 assert_eq!(event.event_type, EventType::TelemetryPeriodic);
905 assert!(event
906 .identity
907 .as_ref()
908 .unwrap()
909 .tags
910 .as_ref()
911 .unwrap()
912 .contains(&"integration-test".to_string()));
913 assert!(event.extensions.is_some());
914 }
915
916 #[tokio::test]
917 async fn test_agent_shutdown_not_running() {
918 let config = test_config();
919 let agent = PhyTraceAgent::from_config(config).await.unwrap();
920
921 agent.shutdown().await.unwrap();
923 assert!(!agent.is_running());
924 }
925
926 #[tokio::test]
927 async fn test_agent_start_stop_cycle() {
928 let config = test_config();
929 let agent = PhyTraceAgent::from_config(config).await.unwrap();
930
931 agent.start().await.unwrap();
933 assert!(agent.is_running());
934
935 agent.emit().position_2d(1.0, 2.0).send().await.unwrap();
937
938 agent.shutdown().await.unwrap();
940 assert!(!agent.is_running());
941 }
942
943 #[tokio::test]
944 async fn test_agent_emit_without_start() {
945 let config = test_config();
946 let agent = PhyTraceAgent::from_config(config).await.unwrap();
947
948 let result = agent.emit().position_2d(1.0, 2.0).send().await;
950 assert!(result.is_ok());
951 }
952
953 #[tokio::test]
954 async fn test_agent_stats_after_emit() {
955 let config = test_config();
956 let agent = PhyTraceAgent::from_config(config).await.unwrap();
957
958 agent.start().await.unwrap();
959
960 agent.emit().position_2d(1.0, 2.0).send().await.unwrap();
962
963 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
965
966 let stats = agent.stats().await;
967 assert_eq!(
969 stats.events_sent,
970 stats.events_succeeded + stats.events_failed
971 );
972
973 agent.shutdown().await.unwrap();
974 }
975
976 #[tokio::test]
977 async fn test_agent_builder_heading_and_speed() {
978 let config = test_config();
979 let agent = PhyTraceAgent::from_config(config).await.unwrap();
980
981 let event = agent.emit().heading(1.57).speed(3.0).build().unwrap();
982
983 assert!(event.motion.is_some());
984 let motion = event.motion.unwrap();
985 assert_eq!(motion.speed_mps, Some(3.0));
986 assert!(event.location.is_some());
988 }
989}