PhyTrace Agent - orchestrates telemetry collection and transmission.
The agent provides: - License validation before operation - Event validation before sending - Provenance tagging for trust - Batching for efficiency - Local buffering for offline operation - Retry logic with circuit breaker - Multiple transport options
Example
From configuration file
agent = PhyTraceAgent.from_config("phytrace-agent.yaml") await agent.start()
event = UDMEventBuilder(...).build() await agent.emit(event)
await agent.stop()
With mock transport for testing
agent = PhyTraceAgent(transport=MockTransport()) await agent.emit(event)
Initialize the PhyTrace Agent.
| PARAMETER | DESCRIPTION |
config | Agent configuration (uses defaults if not provided) TYPE: AgentConfig | None DEFAULT: None |
transport | Transport implementation (created from config if not provided) TYPE: Transport | None DEFAULT: None |
Source code in phytrace/agent.py
| def __init__(
self,
config: AgentConfig | None = None,
transport: Transport | None = None,
):
"""
Initialize the PhyTrace Agent.
Args:
config: Agent configuration (uses defaults if not provided)
transport: Transport implementation (created from config if not provided)
"""
self.config = config or AgentConfig()
self._transport = transport
# Components (initialized in start())
self._validation_engine: ValidationEngine | None = None
self._provenance_tagger: ProvenanceTagger | None = None
self._batch_processor: BatchProcessor | None = None
self._edge_buffer: EdgeBuffer | None = None
self._retry_policy: RetryPolicy | None = None
self._circuit_breaker: CircuitBreaker | None = None
# License validation
self._license_validator: LicenseValidator | None = None
self._license_heartbeat_task: asyncio.Task | None = None
# State
self._running = False
self._stats = AgentStats()
self._flush_task: asyncio.Task | None = None
self._batch_send_tasks: set[asyncio.Task] = set()
self._logger = __import__("logging").getLogger(__name__)
|
Attributes
is_running property
Check if agent is running.
transport property
Get the transport (useful for testing with MockTransport).
Functions
from_config classmethod
Create an agent from a YAML configuration file.
| PARAMETER | DESCRIPTION |
config_path | Path to configuration file TYPE: str | Path |
| RETURNS | DESCRIPTION |
PhyTraceAgent | Configured PhyTraceAgent instance |
Source code in phytrace/agent.py
| @classmethod
def from_config(cls, config_path: str | Path) -> "PhyTraceAgent":
"""
Create an agent from a YAML configuration file.
Args:
config_path: Path to configuration file
Returns:
Configured PhyTraceAgent instance
"""
config_path = Path(config_path)
with open(config_path) as f:
data = yaml.safe_load(f)
# Parse configuration
config = AgentConfig()
# Agent section
if "agent" in data:
config.agent_id = data["agent"].get("id", config.agent_id)
config.agent_version = data["agent"].get("version", config.agent_version)
# Source section
if "source" in data:
config.source_id = data["source"].get("id", config.source_id)
source_type_str = data["source"].get("type", "amr")
config.source_type = SourceType(source_type_str)
config.platform = data["source"].get("platform")
config.fleet_id = data["source"].get("fleet_id")
config.site_id = data["source"].get("site_id")
config.organization_id = data["source"].get("organization_id")
# Transport section
if "transport" in data:
transport_data = data["transport"]
if "primary" in transport_data:
primary = transport_data["primary"]
config.transport_type = primary.get("type", "http")
config.endpoint = primary.get("endpoint", config.endpoint)
config.timeout_sec = primary.get("timeout_sec", config.timeout_sec)
if "batching" in transport_data:
batching = transport_data["batching"]
config.batching_enabled = batching.get("enabled", True)
config.batch_max_size = batching.get("max_size", config.batch_max_size)
config.batch_max_age_ms = batching.get(
"max_age_ms", config.batch_max_age_ms
)
# Auth section
if "auth" in data:
auth = data["auth"]
config.api_key = auth.get("api_key")
# License section
if "license" in data:
license_data = data["license"]
config.license_token = license_data.get("token")
config.license_grace_period_hours = license_data.get(
"grace_period_hours", DEFAULT_GRACE_PERIOD_HOURS
)
# Reliability section
if "reliability" in data:
reliability = data["reliability"]
if "buffer" in reliability:
buffer = reliability["buffer"]
config.buffer_enabled = buffer.get("enabled", True)
config.buffer_path = buffer.get("path", config.buffer_path)
config.buffer_max_size_mb = buffer.get(
"max_size_mb", config.buffer_max_size_mb
)
if "retry" in reliability:
retry = reliability["retry"]
config.retry_max_retries = retry.get(
"max_retries", config.retry_max_retries
)
config.retry_initial_delay_ms = retry.get(
"initial_delay_ms", config.retry_initial_delay_ms
)
if "circuit_breaker" in reliability:
cb = reliability["circuit_breaker"]
config.circuit_failure_threshold = cb.get(
"failure_threshold", config.circuit_failure_threshold
)
config.circuit_recovery_timeout_sec = cb.get(
"recovery_timeout_sec", config.circuit_recovery_timeout_sec
)
# Provenance section
if "provenance" in data:
prov = data["provenance"]
config.provenance_enabled = prov.get("enabled", True)
return cls(config=config)
|
start async
Start the agent and establish connections.
Source code in phytrace/agent.py
| async def start(self) -> None:
"""Start the agent and establish connections."""
if self._running:
return
# Initialize and validate license (skip for mock transport in tests)
if self.config.transport_type != "mock":
self._license_validator = LicenseValidator(
license_token=self.config.license_token,
endpoint=self.config.endpoint,
grace_period_hours=self.config.license_grace_period_hours,
)
# Validate license - will pass in dev mode or with valid token
status = self._license_validator.validate()
self._stats.license_status = status.value
if not status in (LicenseStatus.VALID, LicenseStatus.GRACE_PERIOD, LicenseStatus.DEV_MODE):
raise LicenseValidationError(
f"License validation failed: {status.value}. "
"Ensure PHYWARE_DEV_MODE=1 is set for local development, "
"or provide a valid license_token.",
status
)
if status == LicenseStatus.GRACE_PERIOD:
self._logger.warning(
"License is in grace period. Events will be quarantined until license is renewed."
)
# Initialize transport
if self._transport is None:
if self.config.transport_type == "mock":
self._transport = MockTransport()
else:
self._transport = HTTPTransport(
endpoint=self.config.endpoint,
api_key=self.config.api_key,
timeout_sec=self.config.timeout_sec,
)
# Connect transport
await self._transport.connect()
# Initialize validation engine
if self.config.validation_enabled:
self._validation_engine = ValidationEngine(
strict=self.config.validation_strict
)
# Initialize provenance tagger
if self.config.provenance_enabled:
self._provenance_tagger = ProvenanceTagger(
source_id=self.config.source_id,
signing_key=self.config.signing_key,
agent_version=self.config.agent_version,
)
# Initialize batch processor
if self.config.batching_enabled:
batch_config = BatchConfig(
max_batch_size=self.config.batch_max_size,
max_batch_age_ms=self.config.batch_max_age_ms,
)
self._batch_processor = BatchProcessor(
config=batch_config,
on_batch_ready=self._on_batch_ready,
)
await self._batch_processor.start()
# Initialize edge buffer
if self.config.buffer_enabled:
self._edge_buffer = EdgeBuffer(
storage_path=self.config.buffer_path,
max_size_mb=self.config.buffer_max_size_mb,
max_age_hours=self.config.buffer_max_age_hours,
)
# Initialize retry policy
self._retry_policy = RetryPolicy(
max_retries=self.config.retry_max_retries,
initial_delay_ms=self.config.retry_initial_delay_ms,
)
# Initialize circuit breaker
self._circuit_breaker = CircuitBreaker(
failure_threshold=self.config.circuit_failure_threshold,
recovery_timeout_sec=self.config.circuit_recovery_timeout_sec,
)
self._running = True
# Start buffer flush task
self._flush_task = asyncio.create_task(self._buffer_flush_loop())
# Start license heartbeat task (hourly validation)
if self._license_validator and not self._license_validator.is_dev_mode:
self._license_heartbeat_task = asyncio.create_task(self._license_heartbeat_loop())
|
stop async
Stop the agent and flush remaining events.
Source code in phytrace/agent.py
| async def stop(self) -> None:
"""Stop the agent and flush remaining events."""
if not self._running:
return
self._running = False
# Stop license heartbeat task
if self._license_heartbeat_task:
self._license_heartbeat_task.cancel()
try:
await self._license_heartbeat_task
except asyncio.CancelledError:
pass
# Stop flush task
if self._flush_task:
self._flush_task.cancel()
try:
await self._flush_task
except asyncio.CancelledError:
pass
# Stop batch processor (flushes pending events via callback)
if self._batch_processor:
await self._batch_processor.stop()
if self._batch_send_tasks:
await asyncio.gather(
*list(self._batch_send_tasks), return_exceptions=True
)
self._batch_send_tasks.clear()
# Disconnect transport
if self._transport:
await self._transport.disconnect()
|
emit async
Emit a single event.
The event is validated, tagged with provenance and license metadata, and sent through the configured transport. If batching is enabled, the event may be held until the batch is full.
| PARAMETER | DESCRIPTION |
event | TYPE: UDMEvent |
| RETURNS | DESCRIPTION |
SendResult | SendResult indicating success or failure |
Source code in phytrace/agent.py
| async def emit(self, event: UDMEvent) -> SendResult:
"""
Emit a single event.
The event is validated, tagged with provenance and license metadata,
and sent through the configured transport. If batching is enabled,
the event may be held until the batch is full.
Args:
event: The UDM event to send
Returns:
SendResult indicating success or failure
"""
# Validate event
if self._validation_engine:
result = self._validation_engine.validate(event)
if not result.valid:
from phytrace.transport.base import TransportError, ErrorType
error = TransportError(
error_type=ErrorType.VALIDATION_ERROR,
message=f"Validation failed: {result.errors[0].message}",
retryable=False,
)
self._stats.events_failed += 1
return SendResult.fail(event.event_id, error)
# Tag with provenance
if self._provenance_tagger:
tagged = self._provenance_tagger.tag(event)
event = tagged.event
# Inject license metadata
event = self._inject_license_metadata(event)
# Add to batch or send directly
if self._batch_processor:
await self._batch_processor.add(event)
# Return a pending result - actual send happens in batch
return SendResult.ok(event.event_id)
else:
return await self._send_single(event)
|
emit_batch async
Emit a batch of events.
| PARAMETER | DESCRIPTION |
events | List of UDM events to send TYPE: list[UDMEvent] |
Source code in phytrace/agent.py
| async def emit_batch(self, events: list[UDMEvent]) -> BatchSendResult:
"""
Emit a batch of events.
Args:
events: List of UDM events to send
Returns:
BatchSendResult with per-event results
"""
# Validate and tag all events
validated_events: list[UDMEvent] = []
results: list[SendResult] = []
for event in events:
# Validate
if self._validation_engine:
result = self._validation_engine.validate(event)
if not result.valid:
from phytrace.transport.base import TransportError, ErrorType
error = TransportError(
error_type=ErrorType.VALIDATION_ERROR,
message=f"Validation failed: {result.errors[0].message}",
retryable=False,
)
results.append(SendResult.fail(event.event_id, error))
continue
# Tag with provenance
if self._provenance_tagger:
tagged = self._provenance_tagger.tag(event)
event = tagged.event
# Inject license metadata
event = self._inject_license_metadata(event)
validated_events.append(event)
# Send validated events
if validated_events:
send_result = await self._send_batch(validated_events)
results.extend(send_result.results)
successful = sum(1 for r in results if r.success)
return BatchSendResult(
success=successful == len(events),
total_events=len(events),
successful_events=successful,
failed_events=len(events) - successful,
results=results,
)
|