phytrace_sdk/
agent.rs

1//! PhyTrace Agent - The main SDK entry point.
2//!
3//! The agent orchestrates event building, emission, buffering, and transmission.
4//!
5//! # Example
6//!
7//! ```rust,no_run
8//! use phytrace_sdk::{PhyTraceAgent, PhyTraceConfig, SourceType};
9//!
10//! #[tokio::main]
11//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
12//!     let config = PhyTraceConfig::new("robot-001")
13//!         .with_endpoint("https://api.phyware.io")
14//!         .with_api_key("your-api-key")
15//!         .with_source_type(SourceType::Amr);
16//!
17//!     let agent = PhyTraceAgent::from_config(config).await?;
18//!     
19//!     // Start the agent
20//!     agent.start().await?;
21//!
22//!     // Emit events
23//!     agent.emit()
24//!         .position_2d(10.0, 20.0)
25//!         .battery_soc(85.0)
26//!         .send()
27//!         .await?;
28//!
29//!     // Shutdown gracefully
30//!     agent.shutdown().await?;
31//!     
32//!     Ok(())
33//! }
34//! ```
35
36use std::path::Path;
37use std::sync::Arc;
38use tokio::sync::{mpsc, Mutex, RwLock};
39
40use crate::core::builder::EventBuilder;
41use crate::core::config::PhyTraceConfig;
42use crate::core::license::{LicenseError, LicenseStatus, LicenseValidator};
43use crate::core::provenance::ProvenanceSigner;
44use crate::core::validation::{ValidationLevel, Validator};
45use crate::error::{PhyTraceError, PhyTraceResult};
46use crate::models::event::UdmEvent;
47use crate::reliability::batch::BatchProcessor;
48use crate::reliability::buffer::FileBuffer;
49use crate::reliability::retry::RetryHandler;
50#[cfg(feature = "http")]
51use crate::transport::http::HttpTransport;
52use crate::transport::mock::MockTransport;
53use crate::transport::traits::{Transport, TransportStats};
54
55/// The main PhyTrace SDK agent.
56///
57/// Manages the complete event lifecycle:
58/// - License validation
59/// - Event building and validation
60/// - Provenance signing (optional)
61/// - Batching and buffering
62/// - Transport with retry logic
63pub struct PhyTraceAgent {
64    config: PhyTraceConfig,
65    transport: Arc<RwLock<Box<dyn Transport>>>,
66    buffer: Option<Arc<FileBuffer>>,
67    batch_processor: Arc<BatchProcessor>,
68    retry_handler: RetryHandler,
69    signer: Option<ProvenanceSigner>,
70    validator: Validator,
71    license_validator: Option<Arc<LicenseValidator>>,
72    event_tx: mpsc::Sender<UdmEvent>,
73    running: Arc<std::sync::atomic::AtomicBool>,
74    worker_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
75    license_heartbeat_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
76}
77
78impl PhyTraceAgent {
79    /// Create an agent from configuration.
80    pub async fn from_config(config: PhyTraceConfig) -> PhyTraceResult<Self> {
81        config.validate().map_err(|e| {
82            PhyTraceError::Config(crate::error::ConfigError::Validation(e.to_string()))
83        })?;
84
85        // Create transport based on config
86        let transport: Box<dyn Transport> = if config.transport.transport_type == "mock" {
87            Box::new(MockTransport::new())
88        } else {
89            #[cfg(feature = "http")]
90            {
91                Box::new(HttpTransport::from_config(&config)?)
92            }
93            #[cfg(not(feature = "http"))]
94            {
95                return Err(PhyTraceError::Config(
96                    "HTTP transport not available. Enable the 'http' feature.".to_string(),
97                ));
98            }
99        };
100
101        Self::with_transport(config, transport).await
102    }
103
104    /// Create an agent with a custom transport.
105    pub async fn with_transport(
106        config: PhyTraceConfig,
107        transport: Box<dyn Transport>,
108    ) -> PhyTraceResult<Self> {
109        // Create license validator (skip for mock transport)
110        let license_validator = if config.transport.transport_type != "mock" {
111            let validator = LicenseValidator::new(
112                config.license.token.clone(),
113                Some(config.transport.endpoint.clone()),
114                config.license.grace_period_hours,
115            );
116
117            // Validate license on initialization
118            let status = validator.validate();
119            if !status.allows_operation() {
120                return Err(PhyTraceError::License(LicenseError::ValidationFailed(
121                    format!(
122                        "License validation failed: {}. \
123                        Set PHYWARE_DEV_MODE=1 for local development, \
124                        or provide a valid license token.",
125                        status.as_str()
126                    ),
127                )));
128            }
129
130            if status == LicenseStatus::GracePeriod {
131                tracing::warn!(
132                    "License is in grace period. Events will be quarantined until license is renewed."
133                );
134            }
135
136            Some(Arc::new(validator))
137        } else {
138            None
139        };
140
141        // Create buffer if enabled
142        let buffer = if config.buffer.enabled {
143            Some(Arc::new(FileBuffer::new(&config.buffer)?))
144        } else {
145            None
146        };
147
148        // Create batch processor
149        let batch_processor = Arc::new(BatchProcessor::from_config(&config.buffer));
150
151        // Create retry handler
152        let retry_handler = RetryHandler::new(&config.retry);
153
154        // Create signer if enabled
155        let signer = if config.provenance.enabled {
156            Some(ProvenanceSigner::from_config(&config.provenance)?)
157        } else {
158            None
159        };
160
161        // Create event channel
162        let (event_tx, event_rx) = mpsc::channel(1000);
163
164        let agent = Self {
165            config,
166            transport: Arc::new(RwLock::new(transport)),
167            buffer,
168            batch_processor,
169            retry_handler,
170            signer,
171            validator: Validator::new(ValidationLevel::Basic),
172            license_validator,
173            event_tx,
174            running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
175            worker_handle: Mutex::new(None),
176            license_heartbeat_handle: Mutex::new(None),
177        };
178
179        // Start background worker
180        agent.start_worker(event_rx).await;
181
182        Ok(agent)
183    }
184
185    /// Load configuration from a YAML file and create agent.
186    pub async fn from_file<P: AsRef<Path>>(path: P) -> PhyTraceResult<Self> {
187        let config = PhyTraceConfig::from_file(path)?;
188        Self::from_config(config).await
189    }
190
191    /// Start the agent.
192    pub async fn start(&self) -> PhyTraceResult<()> {
193        use std::sync::atomic::Ordering;
194
195        // Connect transport
196        self.transport.write().await.connect().await?;
197        self.running.store(true, Ordering::Relaxed);
198
199        // Start license heartbeat (hourly validation)
200        if let Some(ref validator) = self.license_validator {
201            if !validator.is_dev_mode() {
202                let validator = validator.clone();
203                let running = self.running.clone();
204
205                let handle = tokio::spawn(async move {
206                    loop {
207                        tokio::time::sleep(std::time::Duration::from_secs(3600)).await; // 1 hour
208
209                        if !running.load(Ordering::Relaxed) {
210                            break;
211                        }
212
213                        let status = validator.validate_online().await;
214                        match status {
215                            LicenseStatus::Expired => {
216                                tracing::error!(
217                                    "License has expired. Please renew your license to continue."
218                                );
219                            }
220                            LicenseStatus::GracePeriod => {
221                                tracing::warn!(
222                                    "License is in grace period. Events are being quarantined."
223                                );
224                            }
225                            _ => {}
226                        }
227                    }
228                });
229
230                *self.license_heartbeat_handle.lock().await = Some(handle);
231            }
232        }
233
234        Ok(())
235    }
236
237    /// Stop the agent gracefully.
238    pub async fn shutdown(&self) -> PhyTraceResult<()> {
239        use std::sync::atomic::Ordering;
240
241        self.running.store(false, Ordering::Relaxed);
242
243        // Stop license heartbeat
244        if let Some(handle) = self.license_heartbeat_handle.lock().await.take() {
245            handle.abort();
246        }
247
248        // Flush remaining batched events
249        self.flush().await?;
250
251        // Disconnect transport
252        self.transport.write().await.disconnect().await?;
253
254        // Stop worker
255        if let Some(handle) = self.worker_handle.lock().await.take() {
256            handle.abort();
257        }
258
259        Ok(())
260    }
261
262    /// Create an event builder for fluent emission.
263    pub fn emit(&self) -> AgentEventBuilder<'_> {
264        AgentEventBuilder {
265            builder: EventBuilder::new(&self.config),
266            agent: self,
267        }
268    }
269
270    /// Send a pre-built event.
271    pub async fn send(&self, mut event: UdmEvent) -> PhyTraceResult<()> {
272        // Validate
273        self.validator.validate(&event)?;
274
275        // Sign if configured
276        if let Some(ref signer) = self.signer {
277            signer.sign(&mut event)?;
278        }
279
280        // Inject license metadata
281        if let Some(ref validator) = self.license_validator {
282            let metadata = validator.get_license_metadata().await;
283            event.inject_license_metadata(&metadata);
284        }
285
286        // Send to worker
287        self.event_tx
288            .send(event)
289            .await
290            .map_err(|_| PhyTraceError::Transport("Event channel closed".to_string()))?;
291
292        Ok(())
293    }
294
295    /// Flush buffered events immediately.
296    pub async fn flush(&self) -> PhyTraceResult<()> {
297        let batch = self.batch_processor.flush().await;
298        if !batch.is_empty() {
299            let transport = self.transport.read().await;
300            self.retry_handler
301                .execute(|| async {
302                    transport.send_batch(&batch).await?;
303                    Ok(())
304                })
305                .await?;
306        }
307        Ok(())
308    }
309
310    /// Get transport statistics.
311    pub async fn stats(&self) -> TransportStats {
312        self.transport.read().await.stats()
313    }
314
315    /// Check if the agent is running.
316    pub fn is_running(&self) -> bool {
317        self.running.load(std::sync::atomic::Ordering::Relaxed)
318    }
319
320    /// Get a reference to the configuration.
321    pub fn config(&self) -> &PhyTraceConfig {
322        &self.config
323    }
324
325    /// Set the validation level.
326    pub fn set_validation_level(&mut self, level: ValidationLevel) {
327        self.validator = Validator::new(level);
328    }
329
330    // Start background worker for processing events
331    async fn start_worker(&self, mut event_rx: mpsc::Receiver<UdmEvent>) {
332        let transport = self.transport.clone();
333        let batch_processor = self.batch_processor.clone();
334        let retry_handler = self.retry_handler.clone();
335        let buffer = self.buffer.clone();
336        let running = self.running.clone();
337        let config = self.config.clone();
338
339        let handle = tokio::spawn(async move {
340            use std::sync::atomic::Ordering;
341            let flush_interval = std::time::Duration::from_secs(config.buffer.flush_interval_secs);
342            let mut flush_timer = tokio::time::interval(flush_interval);
343            // Skip the first tick which fires immediately at time 0
344            flush_timer.tick().await;
345
346            loop {
347                tokio::select! {
348                    // Handle incoming events
349                    Some(event) = event_rx.recv() => {
350                        if let Some(batch) = batch_processor.add(event).await {
351                            Self::send_batch_with_retry(
352                                &transport,
353                                &retry_handler,
354                                &buffer,
355                                batch,
356                            ).await;
357                        }
358                    }
359
360                    // Periodic flush
361                    _ = flush_timer.tick() => {
362                        if !running.load(Ordering::Relaxed) {
363                            break;
364                        }
365
366                        if batch_processor.should_flush().await {
367                            let batch = batch_processor.flush().await;
368                            if !batch.is_empty() {
369                                Self::send_batch_with_retry(
370                                    &transport,
371                                    &retry_handler,
372                                    &buffer,
373                                    batch,
374                                ).await;
375                            }
376                        }
377
378                        // Try to replay buffered events
379                        if let Some(ref buf) = buffer {
380                            Self::replay_buffered_events(&transport, &retry_handler, buf).await;
381                        }
382                    }
383                }
384            }
385        });
386
387        *self.worker_handle.lock().await = Some(handle);
388    }
389
390    async fn send_batch_with_retry(
391        transport: &Arc<RwLock<Box<dyn Transport>>>,
392        retry_handler: &RetryHandler,
393        buffer: &Option<Arc<FileBuffer>>,
394        batch: Vec<UdmEvent>,
395    ) {
396        let transport_ref = transport.read().await;
397
398        let result = retry_handler
399            .execute(|| async {
400                transport_ref.send_batch(&batch).await?;
401                Ok(())
402            })
403            .await;
404
405        // If failed after retries, buffer events
406        if result.is_err() {
407            if let Some(ref buf) = buffer {
408                let _ = buf.buffer_events(&batch);
409            }
410        }
411    }
412
413    async fn replay_buffered_events(
414        transport: &Arc<RwLock<Box<dyn Transport>>>,
415        retry_handler: &RetryHandler,
416        buffer: &Arc<FileBuffer>,
417    ) {
418        // Only try to replay if connected
419        if !transport.read().await.is_connected().await {
420            return;
421        }
422
423        if let Ok(buffered) = buffer.read_events() {
424            let transport_ref = transport.read().await;
425
426            for buffered_event in buffered {
427                let result = retry_handler
428                    .execute(|| async {
429                        transport_ref.send(&buffered_event.event).await?;
430                        Ok(())
431                    })
432                    .await;
433
434                if result.is_ok() {
435                    let _ = buffer.remove_file(&buffered_event.filepath);
436                } else {
437                    // Stop trying if we can't connect
438                    break;
439                }
440            }
441        }
442    }
443}
444
445/// Fluent event builder that integrates with the agent.
446pub struct AgentEventBuilder<'a> {
447    builder: EventBuilder,
448    agent: &'a PhyTraceAgent,
449}
450
451impl<'a> AgentEventBuilder<'a> {
452    /// Set the source ID (overrides config default).
453    pub fn source_id(mut self, source_id: impl Into<String>) -> Self {
454        self.builder = self.builder.source_id(source_id);
455        self
456    }
457
458    /// Set the source type (overrides config default).
459    pub fn source_type(mut self, source_type: crate::models::enums::SourceType) -> Self {
460        self.builder = self.builder.source_type(source_type);
461        self
462    }
463
464    /// Set 2D position.
465    pub fn position_2d(mut self, x: f64, y: f64) -> Self {
466        self.builder = self.builder.position_2d(x, y);
467        self
468    }
469
470    /// Set GPS coordinates (latitude, longitude).
471    pub fn gps(mut self, latitude: f64, longitude: f64) -> Self {
472        self.builder = self.builder.gps(latitude, longitude);
473        self
474    }
475
476    /// Set 3D position.
477    pub fn position_3d(mut self, x: f64, y: f64, z: f64) -> Self {
478        self.builder = self.builder.position_3d(x, y, z);
479        self
480    }
481
482    /// Set heading.
483    pub fn heading(mut self, heading_rad: f64) -> Self {
484        self.builder = self.builder.heading(heading_rad);
485        self
486    }
487
488    /// Set speed.
489    pub fn speed(mut self, speed_mps: f64) -> Self {
490        self.builder = self.builder.speed(speed_mps);
491        self
492    }
493
494    /// Set battery state of charge.
495    pub fn battery_soc(mut self, soc_percent: f64) -> Self {
496        self.builder = self.builder.battery_soc(soc_percent);
497        self
498    }
499
500    /// Set operational mode.
501    pub fn operational_mode(mut self, mode: crate::models::enums::OperationalMode) -> Self {
502        self.builder = self.builder.operational_mode(mode);
503        self
504    }
505
506    /// Set safety state.
507    pub fn safety_state(mut self, state: crate::models::enums::SafetyState) -> Self {
508        self.builder = self.builder.safety_state(state);
509        self
510    }
511
512    /// Set event type.
513    pub fn event_type(mut self, event_type: crate::models::enums::EventType) -> Self {
514        self.builder = self.builder.event_type(event_type);
515        self
516    }
517
518    /// Add a tag.
519    pub fn tag(mut self, tag: impl Into<String>) -> Self {
520        self.builder = self.builder.tag(tag);
521        self
522    }
523
524    /// Set extensions.
525    pub fn extensions(mut self, extensions: serde_json::Value) -> Self {
526        self.builder = self.builder.extensions(extensions);
527        self
528    }
529
530    /// Access the underlying builder for advanced configuration.
531    pub fn with_builder<F>(mut self, f: F) -> Self
532    where
533        F: FnOnce(EventBuilder) -> EventBuilder,
534    {
535        self.builder = f(self.builder);
536        self
537    }
538
539    /// Build and send the event.
540    pub async fn send(self) -> PhyTraceResult<()> {
541        let event = self.builder.build()?;
542        self.agent.send(event).await
543    }
544
545    /// Build without sending (for inspection or manual sending).
546    pub fn build(self) -> PhyTraceResult<UdmEvent> {
547        self.builder.build()
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use crate::core::validation::ValidationLevel;
555    use crate::models::enums::{EventType, OperationalMode, SafetyState, SourceType};
556    use crate::transport::mock::MockTransport;
557    use tempfile::tempdir;
558
559    fn test_config() -> PhyTraceConfig {
560        let dir = tempdir().unwrap();
561        let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
562        config.transport.transport_type = "mock".to_string();
563        config.transport.endpoint = "http://localhost:8080".to_string();
564        config.buffer.path = dir.path().to_string_lossy().to_string();
565        config.buffer.enabled = false; // Disable for simpler tests
566        config
567    }
568
569    fn test_config_with_buffer() -> (PhyTraceConfig, tempfile::TempDir) {
570        let dir = tempdir().unwrap();
571        let mut config = PhyTraceConfig::new("test-robot").with_source_type(SourceType::Amr);
572        config.transport.transport_type = "mock".to_string();
573        config.transport.endpoint = "http://localhost:8080".to_string();
574        config.buffer.path = dir.path().to_string_lossy().to_string();
575        config.buffer.enabled = true;
576        config.buffer.flush_interval_secs = 1;
577        config.buffer.batch_size = 5;
578        (config, dir)
579    }
580
581    #[tokio::test]
582    async fn test_agent_creation() {
583        let config = test_config();
584        let agent = PhyTraceAgent::from_config(config).await.unwrap();
585
586        assert!(!agent.is_running());
587    }
588
589    #[tokio::test]
590    async fn test_agent_with_mock_transport() {
591        let config = test_config();
592        let mock = MockTransport::new();
593
594        let agent = PhyTraceAgent::with_transport(config, Box::new(mock))
595            .await
596            .unwrap();
597
598        agent.start().await.unwrap();
599        assert!(agent.is_running());
600
601        agent.shutdown().await.unwrap();
602    }
603
604    #[tokio::test]
605    async fn test_agent_emit() {
606        let config = test_config();
607
608        let agent = PhyTraceAgent::with_transport(config, Box::new(MockTransport::new()))
609            .await
610            .unwrap();
611
612        agent.start().await.unwrap();
613
614        // Emit an event
615        agent
616            .emit()
617            .position_2d(10.0, 20.0)
618            .battery_soc(85.0)
619            .send()
620            .await
621            .unwrap();
622
623        // Give worker time to process
624        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
625
626        agent.shutdown().await.unwrap();
627    }
628
629    #[tokio::test]
630    async fn test_agent_fluent_builder() {
631        let config = test_config();
632        let agent = PhyTraceAgent::from_config(config).await.unwrap();
633
634        let event = agent
635            .emit()
636            .position_2d(42.0, 24.0)
637            .heading(1.57)
638            .speed(2.5)
639            .battery_soc(90.0)
640            .tag("test")
641            .build()
642            .unwrap();
643
644        assert!(event.location.is_some());
645        assert!(event.motion.is_some());
646        assert!(event.power.is_some());
647    }
648
649    #[tokio::test]
650    async fn test_agent_from_yaml_file() {
651        let dir = tempdir().unwrap();
652        let config_path = dir.path().join("config.yaml");
653
654        let yaml_content = format!(
655            r#"
656source:
657  source_id: "yaml-robot"
658  source_type: amr
659transport:
660  transport_type: mock
661  endpoint: "http://localhost:8080"
662buffer:
663  enabled: false
664  path: "{}"
665"#,
666            dir.path().to_string_lossy().replace('\\', "/")
667        );
668
669        std::fs::write(&config_path, yaml_content).unwrap();
670
671        let agent = PhyTraceAgent::from_file(&config_path).await.unwrap();
672        assert_eq!(agent.config().source.source_id, "yaml-robot");
673    }
674
675    #[tokio::test]
676    async fn test_agent_config_validation_fails_empty_source_id() {
677        let dir = tempdir().unwrap();
678        let mut config = PhyTraceConfig::new("");
679        config.transport.transport_type = "mock".to_string();
680        config.buffer.path = dir.path().to_string_lossy().to_string();
681        config.buffer.enabled = false;
682
683        let result = PhyTraceAgent::from_config(config).await;
684        assert!(result.is_err());
685    }
686
687    #[tokio::test]
688    async fn test_agent_stats() {
689        let config = test_config();
690        let agent = PhyTraceAgent::from_config(config).await.unwrap();
691
692        let stats = agent.stats().await;
693        assert_eq!(stats.events_sent, 0);
694        assert_eq!(stats.events_succeeded, 0);
695    }
696
697    #[tokio::test]
698    async fn test_agent_set_validation_level() {
699        let config = test_config();
700        let mut agent = PhyTraceAgent::from_config(config).await.unwrap();
701
702        agent.set_validation_level(ValidationLevel::Full);
703        // Validation is internal, but we can verify the agent still works
704        let event = agent.emit().position_2d(1.0, 2.0).build().unwrap();
705        assert!(event.location.is_some());
706
707        agent.set_validation_level(ValidationLevel::None);
708        let event = agent.emit().position_2d(3.0, 4.0).build().unwrap();
709        assert!(event.location.is_some());
710    }
711
712    #[tokio::test]
713    async fn test_agent_flush() {
714        let config = test_config();
715        let agent = PhyTraceAgent::from_config(config).await.unwrap();
716        agent.start().await.unwrap();
717
718        // Emit some events
719        agent.emit().position_2d(1.0, 2.0).send().await.unwrap();
720        agent.emit().position_2d(3.0, 4.0).send().await.unwrap();
721
722        // Flush should complete without error
723        agent.flush().await.unwrap();
724
725        agent.shutdown().await.unwrap();
726    }
727
728    #[tokio::test]
729    async fn test_agent_config_access() {
730        let config = test_config();
731        let agent = PhyTraceAgent::from_config(config).await.unwrap();
732
733        let cfg = agent.config();
734        assert_eq!(cfg.source.source_id, "test-robot");
735        assert_eq!(cfg.source.source_type, SourceType::Amr);
736        assert_eq!(cfg.transport.transport_type, "mock");
737    }
738
739    #[tokio::test]
740    async fn test_agent_builder_position_3d() {
741        let config = test_config();
742        let agent = PhyTraceAgent::from_config(config).await.unwrap();
743
744        let event = agent.emit().position_3d(1.0, 2.0, 3.0).build().unwrap();
745
746        assert!(event.location.is_some());
747        let loc = event.location.unwrap();
748        assert!(loc.local.is_some());
749        let local = loc.local.unwrap();
750        assert_eq!(local.x_m, Some(1.0));
751        assert_eq!(local.y_m, Some(2.0));
752        assert_eq!(local.z_m, Some(3.0));
753    }
754
755    #[tokio::test]
756    async fn test_agent_builder_operational_mode() {
757        let config = test_config();
758        let agent = PhyTraceAgent::from_config(config).await.unwrap();
759
760        let event = agent
761            .emit()
762            .operational_mode(OperationalMode::Manual)
763            .build()
764            .unwrap();
765
766        assert!(event.operational.is_some());
767        assert_eq!(
768            event.operational.unwrap().mode,
769            Some(OperationalMode::Manual)
770        );
771    }
772
773    #[tokio::test]
774    async fn test_agent_builder_safety_state() {
775        let config = test_config();
776        let agent = PhyTraceAgent::from_config(config).await.unwrap();
777
778        let event = agent
779            .emit()
780            .safety_state(SafetyState::Warning)
781            .build()
782            .unwrap();
783
784        assert!(event.safety.is_some());
785    }
786
787    #[tokio::test]
788    async fn test_agent_builder_event_type() {
789        let config = test_config();
790        let agent = PhyTraceAgent::from_config(config).await.unwrap();
791
792        let event = agent
793            .emit()
794            .event_type(EventType::SafetyViolation)
795            .build()
796            .unwrap();
797
798        assert_eq!(event.event_type, EventType::SafetyViolation);
799    }
800
801    #[tokio::test]
802    async fn test_agent_builder_extensions() {
803        let config = test_config();
804        let agent = PhyTraceAgent::from_config(config).await.unwrap();
805
806        let extensions = serde_json::json!({
807            "custom_field": "custom_value",
808            "numeric_field": 42
809        });
810
811        let event = agent.emit().extensions(extensions.clone()).build().unwrap();
812
813        assert!(event.extensions.is_some());
814        let ext = event.extensions.unwrap();
815        assert_eq!(ext["custom_field"], "custom_value");
816        assert_eq!(ext["numeric_field"], 42);
817    }
818
819    #[tokio::test]
820    async fn test_agent_builder_with_builder() {
821        let config = test_config();
822        let agent = PhyTraceAgent::from_config(config).await.unwrap();
823
824        let event = agent
825            .emit()
826            .with_builder(|b| b.position_2d(100.0, 200.0).speed(5.0))
827            .build()
828            .unwrap();
829
830        assert!(event.location.is_some());
831        assert!(event.motion.is_some());
832    }
833
834    #[tokio::test]
835    async fn test_agent_multiple_tags() {
836        let config = test_config();
837        let agent = PhyTraceAgent::from_config(config).await.unwrap();
838
839        let event = agent
840            .emit()
841            .tag("tag1")
842            .tag("tag2")
843            .tag("tag3")
844            .build()
845            .unwrap();
846
847        // Tags are stored in identity domain
848        assert!(event.identity.is_some());
849        let identity = event.identity.unwrap();
850        assert!(identity.tags.is_some());
851        let tags = identity.tags.unwrap();
852        assert!(tags.contains(&"tag1".to_string()));
853        assert!(tags.contains(&"tag2".to_string()));
854        assert!(tags.contains(&"tag3".to_string()));
855    }
856
857    #[tokio::test]
858    async fn test_agent_with_buffer_enabled() {
859        let (config, _dir) = test_config_with_buffer();
860        let agent = PhyTraceAgent::from_config(config).await.unwrap();
861
862        agent.start().await.unwrap();
863
864        // Emit events
865        for i in 0..3 {
866            agent
867                .emit()
868                .position_2d(i as f64, i as f64 * 2.0)
869                .send()
870                .await
871                .unwrap();
872        }
873
874        // Allow processing
875        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
876
877        agent.shutdown().await.unwrap();
878    }
879
880    #[tokio::test]
881    async fn test_agent_complete_event_builder() {
882        let config = test_config();
883        let agent = PhyTraceAgent::from_config(config).await.unwrap();
884
885        let event = agent
886            .emit()
887            .position_3d(1.0, 2.0, 3.0)
888            .heading(std::f64::consts::PI / 2.0)
889            .speed(1.5)
890            .battery_soc(75.0)
891            .operational_mode(OperationalMode::Autonomous)
892            .safety_state(SafetyState::Normal)
893            .event_type(EventType::TelemetryPeriodic)
894            .tag("integration-test")
895            .extensions(serde_json::json!({"test": true}))
896            .build()
897            .unwrap();
898
899        assert!(event.location.is_some());
900        assert!(event.motion.is_some());
901        assert!(event.power.is_some());
902        assert!(event.operational.is_some());
903        assert!(event.safety.is_some());
904        assert_eq!(event.event_type, EventType::TelemetryPeriodic);
905        assert!(event
906            .identity
907            .as_ref()
908            .unwrap()
909            .tags
910            .as_ref()
911            .unwrap()
912            .contains(&"integration-test".to_string()));
913        assert!(event.extensions.is_some());
914    }
915
916    #[tokio::test]
917    async fn test_agent_shutdown_not_running() {
918        let config = test_config();
919        let agent = PhyTraceAgent::from_config(config).await.unwrap();
920
921        // Shutdown without starting should still work
922        agent.shutdown().await.unwrap();
923        assert!(!agent.is_running());
924    }
925
926    #[tokio::test]
927    async fn test_agent_start_stop_cycle() {
928        let config = test_config();
929        let agent = PhyTraceAgent::from_config(config).await.unwrap();
930
931        // Start
932        agent.start().await.unwrap();
933        assert!(agent.is_running());
934
935        // Emit
936        agent.emit().position_2d(1.0, 2.0).send().await.unwrap();
937
938        // Stop
939        agent.shutdown().await.unwrap();
940        assert!(!agent.is_running());
941    }
942
943    #[tokio::test]
944    async fn test_agent_emit_without_start() {
945        let config = test_config();
946        let agent = PhyTraceAgent::from_config(config).await.unwrap();
947
948        // Emit should work even without start (events go to worker)
949        let result = agent.emit().position_2d(1.0, 2.0).send().await;
950        assert!(result.is_ok());
951    }
952
953    #[tokio::test]
954    async fn test_agent_stats_after_emit() {
955        let config = test_config();
956        let agent = PhyTraceAgent::from_config(config).await.unwrap();
957
958        agent.start().await.unwrap();
959
960        // Emit an event
961        agent.emit().position_2d(1.0, 2.0).send().await.unwrap();
962
963        // Give worker time to process
964        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
965
966        let stats = agent.stats().await;
967        // Stats reflect transport activity
968        assert_eq!(
969            stats.events_sent,
970            stats.events_succeeded + stats.events_failed
971        );
972
973        agent.shutdown().await.unwrap();
974    }
975
976    #[tokio::test]
977    async fn test_agent_builder_heading_and_speed() {
978        let config = test_config();
979        let agent = PhyTraceAgent::from_config(config).await.unwrap();
980
981        let event = agent.emit().heading(1.57).speed(3.0).build().unwrap();
982
983        assert!(event.motion.is_some());
984        let motion = event.motion.unwrap();
985        assert_eq!(motion.speed_mps, Some(3.0));
986        // Heading is stored in location domain
987        assert!(event.location.is_some());
988    }
989}