Skip to main content

phytrace_sdk/
agent.rs

1//! PhyTrace Agent - The main SDK entry point.
2//!
3//! The agent orchestrates event building, emission, buffering, and transmission.
4//!
5//! # Example
6//!
7//! ```rust,no_run
8//! use phytrace_sdk::{PhyTraceAgent, PhyTraceConfig, SourceType};
9//!
10//! #[tokio::main]
11//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
12//!     let config = PhyTraceConfig::new("robot-001")
13//!         .with_endpoint("https://api.phyware.io")
14//!         .with_api_key("your-api-key")
15//!         .with_source_type(SourceType::Amr);
16//!
17//!     let agent = PhyTraceAgent::from_config(config).await?;
18//!     
19//!     // Start the agent
20//!     agent.start().await?;
21//!
22//!     // Emit events
23//!     agent.emit()
24//!         .position_2d(10.0, 20.0)
25//!         .battery_soc(85.0)
26//!         .send()
27//!         .await?;
28//!
29//!     // Shutdown gracefully
30//!     agent.shutdown().await?;
31//!     
32//!     Ok(())
33//! }
34//! ```
35
36use std::path::Path;
37use std::sync::Arc;
38use tokio::sync::{mpsc, Mutex, RwLock};
39
40use crate::core::builder::EventBuilder;
41use crate::core::config::PhyTraceConfig;
42use crate::core::license::{LicenseError, LicenseStatus, LicenseValidator};
43use crate::core::provenance::ProvenanceSigner;
44use crate::core::validation::{ValidationLevel, Validator};
45use crate::error::{PhyTraceError, PhyTraceResult};
46use crate::models::event::UdmEvent;
47use crate::reliability::batch::BatchProcessor;
48use crate::reliability::buffer::FileBuffer;
49use crate::reliability::retry::RetryHandler;
50#[cfg(feature = "http")]
51use crate::transport::http::HttpTransport;
52use crate::transport::mock::MockTransport;
53use crate::transport::traits::{Transport, TransportStats};
54
55/// The main PhyTrace SDK agent.
56///
57/// Manages the complete event lifecycle:
58/// - License validation
59/// - Event building and validation
60/// - Provenance signing (optional)
61/// - Batching and buffering
62/// - Transport with retry logic
63pub struct PhyTraceAgent {
64    config: PhyTraceConfig,
65    transport: Arc<RwLock<Box<dyn Transport>>>,
66    buffer: Option<Arc<FileBuffer>>,
67    batch_processor: Arc<BatchProcessor>,
68    retry_handler: RetryHandler,
69    signer: Option<ProvenanceSigner>,
70    validator: Validator,
71    license_validator: Option<Arc<LicenseValidator>>,
72    event_tx: mpsc::Sender<UdmEvent>,
73    running: Arc<std::sync::atomic::AtomicBool>,
74    worker_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
75    license_heartbeat_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
76}
77
78impl PhyTraceAgent {
79    /// Create an agent from configuration.
80    pub async fn from_config(config: PhyTraceConfig) -> PhyTraceResult<Self> {
81        config.validate().map_err(|e| {
82            PhyTraceError::Config(crate::error::ConfigError::Validation(e.to_string()))
83        })?;
84
85        // Create transport based on config
86        let transport: Box<dyn Transport> = if config.transport.transport_type == "mock" {
87            Box::new(MockTransport::new())
88        } else {
89            #[cfg(feature = "http")]
90            {
91                Box::new(HttpTransport::from_config(&config)?)
92            }
93            #[cfg(not(feature = "http"))]
94            {
95                return Err(PhyTraceError::Config(
96                    "HTTP transport not available. Enable the 'http' feature.".to_string(),
97                ));
98            }
99        };
100
101        Self::with_transport(config, transport).await
102    }
103
104    /// Create an agent with a custom transport.
105    pub async fn with_transport(
106        config: PhyTraceConfig,
107        transport: Box<dyn Transport>,
108    ) -> PhyTraceResult<Self> {
109        // Create license validator (skip for mock transport)
110        let license_validator = if config.transport.transport_type != "mock" {
111            let validator = LicenseValidator::new(
112                config.license.token.clone(),
113                Some(config.transport.endpoint.clone()),
114                config.license.grace_period_hours,
115            );
116
117            // Validate license on initialization
118            let status = validator.validate();
119            if !status.allows_operation() {
120                return Err(PhyTraceError::License(LicenseError::ValidationFailed(
121                    format!(
122                        "License validation failed: {}. \
123                        Set PHYWARE_DEV_MODE=1 for local development, \
124                        or provide a valid license token.",
125                        status.as_str()
126                    ),
127                )));
128            }
129
130            if status == LicenseStatus::GracePeriod {
131                tracing::warn!(
132                    "License is in grace period. Events will be quarantined until license is renewed."
133                );
134            }
135
136            Some(Arc::new(validator))
137        } else {
138            None
139        };
140
141        // Create buffer if enabled
142        let buffer = if config.buffer.enabled {
143            Some(Arc::new(FileBuffer::new(&config.buffer)?))
144        } else {
145            None
146        };
147
148        // Create batch processor
149        let batch_processor = Arc::new(BatchProcessor::from_config(&config.buffer));
150
151        // Create retry handler
152        let retry_handler = RetryHandler::new(&config.retry);
153
154        // Create signer if enabled
155        let signer = if config.provenance.enabled {
156            Some(ProvenanceSigner::from_config(&config.provenance)?)
157        } else {
158            None
159        };
160
161        // Create event channel
162        let (event_tx, event_rx) = mpsc::channel(1000);
163
164        let agent = Self {
165            config,
166            transport: Arc::new(RwLock::new(transport)),
167            buffer,
168            batch_processor,
169            retry_handler,
170            signer,
171            validator: Validator::new(ValidationLevel::Basic),
172            license_validator,
173            event_tx,
174            running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
175            worker_handle: Mutex::new(None),
176            license_heartbeat_handle: Mutex::new(None),
177        };
178
179        // Start background worker
180        agent.start_worker(event_rx).await;
181
182        Ok(agent)
183    }
184
185    /// Load configuration from a YAML file and create agent.
186    pub async fn from_file<P: AsRef<Path>>(path: P) -> PhyTraceResult<Self> {
187        let config = PhyTraceConfig::from_file(path)?;
188        Self::from_config(config).await
189    }
190
191    /// Start the agent.
192    pub async fn start(&self) -> PhyTraceResult<()> {
193        use std::sync::atomic::Ordering;
194
195        // Connect transport
196        self.transport.write().await.connect().await?;
197        self.running.store(true, Ordering::Relaxed);
198
199        // Start license heartbeat (hourly validation)
200        if let Some(ref validator) = self.license_validator {
201            if !validator.is_dev_mode() {
202                let validator = validator.clone();
203                let running = self.running.clone();
204
205                let handle = tokio::spawn(async move {
206                    loop {
207                        tokio::time::sleep(std::time::Duration::from_secs(3600)).await; // 1 hour
208
209                        if !running.load(Ordering::Relaxed) {
210                            break;
211                        }
212
213                        let status = validator.validate_online().await;
214                        match status {
215                            LicenseStatus::Expired => {
216                                tracing::error!(
217                                    "License has expired. Please renew your license to continue."
218                                );
219                            }
220                            LicenseStatus::GracePeriod => {
221                                tracing::warn!(
222                                    "License is in grace period. Events are being quarantined."
223                                );
224                            }
225                            _ => {}
226                        }
227                    }
228                });
229
230                *self.license_heartbeat_handle.lock().await = Some(handle);
231            }
232        }
233
234        Ok(())
235    }
236
237    /// Stop the agent gracefully.
238    pub async fn shutdown(&self) -> PhyTraceResult<()> {
239        use std::sync::atomic::Ordering;
240
241        self.running.store(false, Ordering::Relaxed);
242
243        // Stop license heartbeat
244        if let Some(handle) = self.license_heartbeat_handle.lock().await.take() {
245            handle.abort();
246        }
247
248        // Flush remaining batched events
249        self.flush().await?;
250
251        // Disconnect transport
252        self.transport.write().await.disconnect().await?;
253
254        // Stop worker
255        if let Some(handle) = self.worker_handle.lock().await.take() {
256            handle.abort();
257        }
258
259        Ok(())
260    }
261
262    /// Create an event builder for fluent emission.
263    pub fn emit(&self) -> AgentEventBuilder<'_> {
264        AgentEventBuilder {
265            builder: EventBuilder::new(&self.config),
266            agent: self,
267        }
268    }
269
270    /// Send a pre-built event.
271    pub async fn send(&self, mut event: UdmEvent) -> PhyTraceResult<()> {
272        // Validate
273        self.validator.validate(&event)?;
274
275        // Sign if configured
276        if let Some(ref signer) = self.signer {
277            signer.sign(&mut event)?;
278        }
279
280        // Inject license metadata
281        if let Some(ref validator) = self.license_validator {
282            let metadata = validator.get_license_metadata().await;
283            event.inject_license_metadata(&metadata);
284        }
285
286        // Send to worker
287        Box::pin(self.event_tx.send(event))
288            .await
289            .map_err(|e| PhyTraceError::Transport(format!("Event channel closed: {e}")))?;
290
291        Ok(())
292    }
293
294    /// Flush buffered events immediately.
295    pub async fn flush(&self) -> PhyTraceResult<()> {
296        let batch = self.batch_processor.flush().await;
297        if !batch.is_empty() {
298            let transport = self.transport.read().await;
299            self.retry_handler
300                .execute(|| async {
301                    transport.send_batch(&batch).await?;
302                    Ok(())
303                })
304                .await?;
305        }
306        Ok(())
307    }
308
309    /// Get transport statistics.
310    pub async fn stats(&self) -> TransportStats {
311        self.transport.read().await.stats()
312    }
313
314    /// Check if the agent is running.
315    pub fn is_running(&self) -> bool {
316        self.running.load(std::sync::atomic::Ordering::Relaxed)
317    }
318
319    /// Get a reference to the configuration.
320    pub fn config(&self) -> &PhyTraceConfig {
321        &self.config
322    }
323
324    /// Set the validation level.
325    pub fn set_validation_level(&mut self, level: ValidationLevel) {
326        self.validator = Validator::new(level);
327    }
328
329    // Start background worker for processing events
330    async fn start_worker(&self, mut event_rx: mpsc::Receiver<UdmEvent>) {
331        let transport = self.transport.clone();
332        let batch_processor = self.batch_processor.clone();
333        let retry_handler = self.retry_handler.clone();
334        let buffer = self.buffer.clone();
335        let running = self.running.clone();
336        let config = self.config.clone();
337
338        let handle = tokio::spawn(async move {
339            use std::sync::atomic::Ordering;
340            let flush_interval = std::time::Duration::from_secs(config.buffer.flush_interval_secs);
341            let mut flush_timer = tokio::time::interval(flush_interval);
342            // Skip the first tick which fires immediately at time 0
343            flush_timer.tick().await;
344
345            loop {
346                tokio::select! {
347                    // Handle incoming events
348                    Some(event) = event_rx.recv() => {
349                        if let Some(batch) = Box::pin(batch_processor.add(event)).await {
350                            Self::send_batch_with_retry(
351                                &transport,
352                                &retry_handler,
353                                &buffer,
354                                batch,
355                            ).await;
356                        }
357                    }
358
359                    // Periodic flush
360                    _ = flush_timer.tick() => {
361                        if !running.load(Ordering::Relaxed) {
362                            break;
363                        }
364
365                        if batch_processor.should_flush().await {
366                            let batch = batch_processor.flush().await;
367                            if !batch.is_empty() {
368                                Self::send_batch_with_retry(
369                                    &transport,
370                                    &retry_handler,
371                                    &buffer,
372                                    batch,
373                                ).await;
374                            }
375                        }
376
377                        // Try to replay buffered events
378                        if let Some(ref buf) = buffer {
379                            Box::pin(Self::replay_buffered_events(&transport, &retry_handler, buf)).await;
380                        }
381                    }
382                }
383            }
384        });
385
386        *self.worker_handle.lock().await = Some(handle);
387    }
388
389    async fn send_batch_with_retry(
390        transport: &Arc<RwLock<Box<dyn Transport>>>,
391        retry_handler: &RetryHandler,
392        buffer: &Option<Arc<FileBuffer>>,
393        batch: Vec<UdmEvent>,
394    ) {
395        let transport_ref = transport.read().await;
396
397        let result = retry_handler
398            .execute(|| async {
399                transport_ref.send_batch(&batch).await?;
400                Ok(())
401            })
402            .await;
403
404        // If failed after retries, buffer events
405        if result.is_err() {
406            if let Some(ref buf) = buffer {
407                drop(buf.buffer_events(&batch));
408            }
409        }
410    }
411
412    async fn replay_buffered_events(
413        transport: &Arc<RwLock<Box<dyn Transport>>>,
414        retry_handler: &RetryHandler,
415        buffer: &Arc<FileBuffer>,
416    ) {
417        // Only try to replay if connected
418        if !transport.read().await.is_connected().await {
419            return;
420        }
421
422        if let Ok(buffered) = buffer.read_events() {
423            let transport_ref = transport.read().await;
424
425            for buffered_event in buffered {
426                let result = retry_handler
427                    .execute(|| async {
428                        transport_ref.send(&buffered_event.event).await?;
429                        Ok(())
430                    })
431                    .await;
432
433                if result.is_ok() {
434                    drop(buffer.remove_file(&buffered_event.filepath));
435                } else {
436                    // Stop trying if we can't connect
437                    break;
438                }
439            }
440        }
441    }
442}
443
444/// Fluent event builder that integrates with the agent.
445pub struct AgentEventBuilder<'a> {
446    builder: EventBuilder,
447    agent: &'a PhyTraceAgent,
448}
449
450impl<'a> AgentEventBuilder<'a> {
451    /// Set the source ID (overrides config default).
452    pub fn source_id(mut self, source_id: impl Into<String>) -> Self {
453        self.builder = self.builder.source_id(source_id);
454        self
455    }
456
457    /// Set the source type (overrides config default).
458    pub fn source_type(mut self, source_type: crate::models::enums::SourceType) -> Self {
459        self.builder = self.builder.source_type(source_type);
460        self
461    }
462
463    /// Set 2D position.
464    pub fn position_2d(mut self, x: f64, y: f64) -> Self {
465        self.builder = self.builder.position_2d(x, y);
466        self
467    }
468
469    /// Set GPS coordinates (latitude, longitude).
470    pub fn gps(mut self, latitude: f64, longitude: f64) -> Self {
471        self.builder = self.builder.gps(latitude, longitude);
472        self
473    }
474
475    /// Set 3D position.
476    pub fn position_3d(mut self, x: f64, y: f64, z: f64) -> Self {
477        self.builder = self.builder.position_3d(x, y, z);
478        self
479    }
480
481    /// Set heading.
482    pub fn heading(mut self, heading_rad: f64) -> Self {
483        self.builder = self.builder.heading(heading_rad);
484        self
485    }
486
487    /// Set speed.
488    pub fn speed(mut self, speed_mps: f64) -> Self {
489        self.builder = self.builder.speed(speed_mps);
490        self
491    }
492
493    /// Set battery state of charge.
494    pub fn battery_soc(mut self, soc_percent: f64) -> Self {
495        self.builder = self.builder.battery_soc(soc_percent);
496        self
497    }
498
499    /// Set operational mode.
500    pub fn operational_mode(mut self, mode: crate::models::enums::OperationalMode) -> Self {
501        self.builder = self.builder.operational_mode(mode);
502        self
503    }
504
505    /// Set safety state.
506    pub fn safety_state(mut self, state: crate::models::enums::SafetyState) -> Self {
507        self.builder = self.builder.safety_state(state);
508        self
509    }
510
511    /// Set event type.
512    pub fn event_type(mut self, event_type: crate::models::enums::EventType) -> Self {
513        self.builder = self.builder.event_type(event_type);
514        self
515    }
516
517    /// Add a tag.
518    pub fn tag(mut self, tag: impl Into<String>) -> Self {
519        self.builder = self.builder.tag(tag);
520        self
521    }
522
523    /// Set extensions.
524    pub fn extensions(mut self, extensions: serde_json::Value) -> Self {
525        self.builder = self.builder.extensions(extensions);
526        self
527    }
528
529    /// Access the underlying builder for advanced configuration.
530    pub fn with_builder<F>(mut self, f: F) -> Self
531    where
532        F: FnOnce(EventBuilder) -> EventBuilder,
533    {
534        self.builder = f(self.builder);
535        self
536    }
537
538    /// Build and send the event.
539    pub async fn send(self) -> PhyTraceResult<()> {
540        let event = self.builder.build()?;
541        Box::pin(self.agent.send(event)).await
542    }
543
544    /// Build without sending (for inspection or manual sending).
545    pub fn build(self) -> PhyTraceResult<UdmEvent> {
546        self.builder.build()
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use crate::core::validation::ValidationLevel;
554    use crate::models::enums::{EventType, OperationalMode, SafetyState, SourceType};
555    use crate::transport::mock::MockTransport;
556    use tempfile::tempdir;
557
558    fn test_config() -> PhyTraceConfig {
559        let dir = tempdir().unwrap();
560        let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
561        config.transport.transport_type = "mock".to_string();
562        config.transport.endpoint = "http://localhost:8080".to_string();
563        config.buffer.path = dir.path().to_string_lossy().to_string();
564        config.buffer.enabled = false; // Disable for simpler tests
565        config
566    }
567
568    fn test_config_with_buffer() -> (PhyTraceConfig, tempfile::TempDir) {
569        let dir = tempdir().unwrap();
570        let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
571        config.transport.transport_type = "mock".to_string();
572        config.transport.endpoint = "http://localhost:8080".to_string();
573        config.buffer.path = dir.path().to_string_lossy().to_string();
574        config.buffer.enabled = true;
575        config.buffer.flush_interval_secs = 1;
576        config.buffer.batch_size = 5;
577        (config, dir)
578    }
579
580    #[tokio::test]
581    async fn test_agent_creation() {
582        let config = test_config();
583        let agent = PhyTraceAgent::from_config(config).await.unwrap();
584
585        assert!(!agent.is_running());
586    }
587
588    #[tokio::test]
589    async fn test_agent_with_mock_transport() {
590        let config = test_config();
591        let mock = MockTransport::new();
592
593        let agent = PhyTraceAgent::with_transport(config, Box::new(mock))
594            .await
595            .unwrap();
596
597        agent.start().await.unwrap();
598        assert!(agent.is_running());
599
600        agent.shutdown().await.unwrap();
601    }
602
603    #[tokio::test]
604    async fn test_agent_emit() {
605        let config = test_config();
606
607        let agent = PhyTraceAgent::with_transport(config, Box::new(MockTransport::new()))
608            .await
609            .unwrap();
610
611        agent.start().await.unwrap();
612
613        // Emit an event
614        Box::pin(
615            agent
616                .emit()
617                .position_2d(10.0, 20.0)
618                .battery_soc(85.0)
619                .send(),
620        )
621        .await
622        .unwrap();
623
624        // Give worker time to process
625        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
626
627        agent.shutdown().await.unwrap();
628    }
629
630    #[tokio::test]
631    async fn test_agent_fluent_builder() {
632        let config = test_config();
633        let agent = PhyTraceAgent::from_config(config).await.unwrap();
634
635        let event = agent
636            .emit()
637            .position_2d(42.0, 24.0)
638            .heading(1.57)
639            .speed(2.5)
640            .battery_soc(90.0)
641            .tag("test")
642            .build()
643            .unwrap();
644
645        assert!(event.location.is_some());
646        assert!(event.motion.is_some());
647        assert!(event.power.is_some());
648    }
649
650    #[tokio::test]
651    async fn test_agent_from_yaml_file() {
652        let dir = tempdir().unwrap();
653        let config_path = dir.path().join("config.yaml");
654
655        let yaml_content = format!(
656            r#"
657source:
658  source_id: "yaml-robot"
659  source_type: amr
660transport:
661  transport_type: mock
662  endpoint: "http://localhost:8080"
663buffer:
664  enabled: false
665  path: "{}"
666"#,
667            dir.path().to_string_lossy().replace('\\', "/")
668        );
669
670        std::fs::write(&config_path, yaml_content).unwrap();
671
672        let agent = PhyTraceAgent::from_file(&config_path).await.unwrap();
673        assert_eq!(agent.config().source.source_id, "yaml-robot");
674    }
675
676    #[tokio::test]
677    async fn test_agent_config_validation_fails_empty_source_id() {
678        let dir = tempdir().unwrap();
679        let mut config = PhyTraceConfig::new("");
680        config.transport.transport_type = "mock".to_string();
681        config.buffer.path = dir.path().to_string_lossy().to_string();
682        config.buffer.enabled = false;
683
684        let result = PhyTraceAgent::from_config(config).await;
685        assert!(result.is_err());
686    }
687
688    #[tokio::test]
689    async fn test_agent_stats() {
690        let config = test_config();
691        let agent = PhyTraceAgent::from_config(config).await.unwrap();
692
693        let stats = agent.stats().await;
694        assert_eq!(stats.events_sent, 0);
695        assert_eq!(stats.events_succeeded, 0);
696    }
697
698    #[tokio::test]
699    async fn test_agent_set_validation_level() {
700        let config = test_config();
701        let mut agent = PhyTraceAgent::from_config(config).await.unwrap();
702
703        agent.set_validation_level(ValidationLevel::Full);
704        // Validation is internal, but we can verify the agent still works
705        let event = agent.emit().position_2d(1.0, 2.0).build().unwrap();
706        assert!(event.location.is_some());
707
708        agent.set_validation_level(ValidationLevel::None);
709        let event = agent.emit().position_2d(3.0, 4.0).build().unwrap();
710        assert!(event.location.is_some());
711    }
712
713    #[tokio::test]
714    async fn test_agent_flush() {
715        let config = test_config();
716        let agent = PhyTraceAgent::from_config(config).await.unwrap();
717        agent.start().await.unwrap();
718
719        // Emit some events
720        Box::pin(agent.emit().position_2d(1.0, 2.0).send())
721            .await
722            .unwrap();
723        Box::pin(agent.emit().position_2d(3.0, 4.0).send())
724            .await
725            .unwrap();
726
727        // Flush should complete without error
728        agent.flush().await.unwrap();
729
730        agent.shutdown().await.unwrap();
731    }
732
733    #[tokio::test]
734    async fn test_agent_config_access() {
735        let config = test_config();
736        let agent = PhyTraceAgent::from_config(config).await.unwrap();
737
738        let cfg = agent.config();
739        assert_eq!(cfg.source.source_id, "test-robot");
740        assert_eq!(cfg.source.source_type, SourceType::Amr);
741        assert_eq!(cfg.transport.transport_type, "mock");
742    }
743
744    #[tokio::test]
745    async fn test_agent_builder_position_3d() {
746        let config = test_config();
747        let agent = PhyTraceAgent::from_config(config).await.unwrap();
748
749        let event = agent.emit().position_3d(1.0, 2.0, 3.0).build().unwrap();
750
751        assert!(event.location.is_some());
752        let loc = event.location.unwrap();
753        assert!(loc.local.is_some());
754        let local = loc.local.unwrap();
755        assert_eq!(local.x_m, Some(1.0));
756        assert_eq!(local.y_m, Some(2.0));
757        assert_eq!(local.z_m, Some(3.0));
758    }
759
760    #[tokio::test]
761    async fn test_agent_builder_operational_mode() {
762        let config = test_config();
763        let agent = PhyTraceAgent::from_config(config).await.unwrap();
764
765        let event = agent
766            .emit()
767            .operational_mode(OperationalMode::Manual)
768            .build()
769            .unwrap();
770
771        assert!(event.operational.is_some());
772        assert_eq!(
773            event.operational.unwrap().mode,
774            Some(OperationalMode::Manual)
775        );
776    }
777
778    #[tokio::test]
779    async fn test_agent_builder_safety_state() {
780        let config = test_config();
781        let agent = PhyTraceAgent::from_config(config).await.unwrap();
782
783        let event = agent
784            .emit()
785            .safety_state(SafetyState::Warning)
786            .build()
787            .unwrap();
788
789        assert!(event.safety.is_some());
790    }
791
792    #[tokio::test]
793    async fn test_agent_builder_event_type() {
794        let config = test_config();
795        let agent = PhyTraceAgent::from_config(config).await.unwrap();
796
797        let event = agent
798            .emit()
799            .event_type(EventType::SafetyViolation)
800            .build()
801            .unwrap();
802
803        assert_eq!(event.event_type, EventType::SafetyViolation);
804    }
805
806    #[tokio::test]
807    async fn test_agent_builder_extensions() {
808        let config = test_config();
809        let agent = PhyTraceAgent::from_config(config).await.unwrap();
810
811        let extensions = serde_json::json!({
812            "custom_field": "custom_value",
813            "numeric_field": 42
814        });
815
816        let event = agent.emit().extensions(extensions.clone()).build().unwrap();
817
818        assert!(event.extensions.is_some());
819        let ext = event.extensions.unwrap();
820        assert_eq!(ext["custom_field"], "custom_value");
821        assert_eq!(ext["numeric_field"], 42);
822    }
823
824    #[tokio::test]
825    async fn test_agent_builder_with_builder() {
826        let config = test_config();
827        let agent = PhyTraceAgent::from_config(config).await.unwrap();
828
829        let event = agent
830            .emit()
831            .with_builder(|b| b.position_2d(100.0, 200.0).speed(5.0))
832            .build()
833            .unwrap();
834
835        assert!(event.location.is_some());
836        assert!(event.motion.is_some());
837    }
838
839    #[tokio::test]
840    async fn test_agent_multiple_tags() {
841        let config = test_config();
842        let agent = PhyTraceAgent::from_config(config).await.unwrap();
843
844        let event = agent
845            .emit()
846            .tag("tag1")
847            .tag("tag2")
848            .tag("tag3")
849            .build()
850            .unwrap();
851
852        // Tags are stored in identity domain
853        assert!(event.identity.is_some());
854        let identity = event.identity.unwrap();
855        assert!(identity.tags.is_some());
856        let tags = identity.tags.unwrap();
857        assert!(tags.contains(&"tag1".to_string()));
858        assert!(tags.contains(&"tag2".to_string()));
859        assert!(tags.contains(&"tag3".to_string()));
860    }
861
862    #[tokio::test]
863    async fn test_agent_with_buffer_enabled() {
864        let (config, _dir) = test_config_with_buffer();
865        let agent = PhyTraceAgent::from_config(config).await.unwrap();
866
867        agent.start().await.unwrap();
868
869        // Emit events
870        for i in 0..3 {
871            Box::pin(agent.emit().position_2d(i as f64, i as f64 * 2.0).send())
872                .await
873                .unwrap();
874        }
875
876        // Allow processing
877        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
878
879        agent.shutdown().await.unwrap();
880    }
881
882    #[tokio::test]
883    async fn test_agent_complete_event_builder() {
884        let config = test_config();
885        let agent = PhyTraceAgent::from_config(config).await.unwrap();
886
887        let event = agent
888            .emit()
889            .position_3d(1.0, 2.0, 3.0)
890            .heading(std::f64::consts::PI / 2.0)
891            .speed(1.5)
892            .battery_soc(75.0)
893            .operational_mode(OperationalMode::Autonomous)
894            .safety_state(SafetyState::Normal)
895            .event_type(EventType::TelemetryPeriodic)
896            .tag("integration-test")
897            .extensions(serde_json::json!({"test": true}))
898            .build()
899            .unwrap();
900
901        assert!(event.location.is_some());
902        assert!(event.motion.is_some());
903        assert!(event.power.is_some());
904        assert!(event.operational.is_some());
905        assert!(event.safety.is_some());
906        assert_eq!(event.event_type, EventType::TelemetryPeriodic);
907        assert!(event
908            .identity
909            .as_ref()
910            .unwrap()
911            .tags
912            .as_ref()
913            .unwrap()
914            .contains(&"integration-test".to_string()));
915        assert!(event.extensions.is_some());
916    }
917
918    #[tokio::test]
919    async fn test_agent_shutdown_not_running() {
920        let config = test_config();
921        let agent = PhyTraceAgent::from_config(config).await.unwrap();
922
923        // Shutdown without starting should still work
924        agent.shutdown().await.unwrap();
925        assert!(!agent.is_running());
926    }
927
928    #[tokio::test]
929    async fn test_agent_start_stop_cycle() {
930        let config = test_config();
931        let agent = PhyTraceAgent::from_config(config).await.unwrap();
932
933        // Start
934        agent.start().await.unwrap();
935        assert!(agent.is_running());
936
937        // Emit
938        Box::pin(agent.emit().position_2d(1.0, 2.0).send())
939            .await
940            .unwrap();
941
942        // Stop
943        agent.shutdown().await.unwrap();
944        assert!(!agent.is_running());
945    }
946
947    #[tokio::test]
948    async fn test_agent_emit_without_start() {
949        let config = test_config();
950        let agent = PhyTraceAgent::from_config(config).await.unwrap();
951
952        // Emit should work even without start (events go to worker)
953        Box::pin(agent.emit().position_2d(1.0, 2.0).send())
954            .await
955            .unwrap();
956    }
957
958    #[tokio::test]
959    async fn test_agent_stats_after_emit() {
960        let config = test_config();
961        let agent = PhyTraceAgent::from_config(config).await.unwrap();
962
963        agent.start().await.unwrap();
964
965        // Emit an event
966        Box::pin(agent.emit().position_2d(1.0, 2.0).send())
967            .await
968            .unwrap();
969
970        // Give worker time to process
971        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
972
973        let stats = agent.stats().await;
974        // Stats reflect transport activity
975        assert_eq!(
976            stats.events_sent,
977            stats.events_succeeded + stats.events_failed
978        );
979
980        agent.shutdown().await.unwrap();
981    }
982
983    #[tokio::test]
984    async fn test_agent_builder_heading_and_speed() {
985        let config = test_config();
986        let agent = PhyTraceAgent::from_config(config).await.unwrap();
987
988        let event = agent.emit().heading(1.57).speed(3.0).build().unwrap();
989
990        assert!(event.motion.is_some());
991        let motion = event.motion.unwrap();
992        assert_eq!(motion.speed_mps, Some(3.0));
993        // Heading is stored in location domain
994        assert!(event.location.is_some());
995    }
996}