Skip to content

Agent

PhyTraceAgent

PhyTraceAgent

PhyTraceAgent(config: AgentConfig | None = None, transport: Transport | None = None)

PhyTrace Agent - orchestrates telemetry collection and transmission.

The agent provides: - License validation before operation - Event validation before sending - Provenance tagging for trust - Batching for efficiency - Local buffering for offline operation - Retry logic with circuit breaker - Multiple transport options

Example

From configuration file

agent = PhyTraceAgent.from_config("phytrace-agent.yaml") await agent.start()

event = UDMEventBuilder(...).build() await agent.emit(event)

await agent.stop()

With mock transport for testing

agent = PhyTraceAgent(transport=MockTransport()) await agent.emit(event)

Initialize the PhyTrace Agent.

PARAMETER DESCRIPTION
config

Agent configuration (uses defaults if not provided)

TYPE: AgentConfig | None DEFAULT: None

transport

Transport implementation (created from config if not provided)

TYPE: Transport | None DEFAULT: None

Source code in phytrace/agent.py
def __init__(
    self,
    config: AgentConfig | None = None,
    transport: Transport | None = None,
):
    """
    Initialize the PhyTrace Agent.

    Args:
        config: Agent configuration (uses defaults if not provided)
        transport: Transport implementation (created from config if not provided)
    """
    self.config = config or AgentConfig()
    self._transport = transport

    # Components (initialized in start())
    self._validation_engine: ValidationEngine | None = None
    self._provenance_tagger: ProvenanceTagger | None = None
    self._batch_processor: BatchProcessor | None = None
    self._edge_buffer: EdgeBuffer | None = None
    self._retry_policy: RetryPolicy | None = None
    self._circuit_breaker: CircuitBreaker | None = None

    # License validation
    self._license_validator: LicenseValidator | None = None
    self._license_heartbeat_task: asyncio.Task | None = None

    # State
    self._running = False
    self._stats = AgentStats()
    self._flush_task: asyncio.Task | None = None
    self._batch_send_tasks: set[asyncio.Task] = set()
    self._logger = __import__("logging").getLogger(__name__)

Attributes

stats property

stats: AgentStats

Get agent statistics.

is_running property

is_running: bool

Check if agent is running.

transport property

transport: Transport | None

Get the transport (useful for testing with MockTransport).

Functions

from_config classmethod

from_config(config_path: str | Path) -> PhyTraceAgent

Create an agent from a YAML configuration file.

PARAMETER DESCRIPTION
config_path

Path to configuration file

TYPE: str | Path

RETURNS DESCRIPTION
PhyTraceAgent

Configured PhyTraceAgent instance

Source code in phytrace/agent.py
@classmethod
def from_config(cls, config_path: str | Path) -> "PhyTraceAgent":
    """
    Create an agent from a YAML configuration file.

    Args:
        config_path: Path to configuration file

    Returns:
        Configured PhyTraceAgent instance
    """
    config_path = Path(config_path)

    with open(config_path) as f:
        data = yaml.safe_load(f)

    # Parse configuration
    config = AgentConfig()

    # Agent section
    if "agent" in data:
        config.agent_id = data["agent"].get("id", config.agent_id)
        config.agent_version = data["agent"].get("version", config.agent_version)

    # Source section
    if "source" in data:
        config.source_id = data["source"].get("id", config.source_id)
        source_type_str = data["source"].get("type", "amr")
        config.source_type = SourceType(source_type_str)
        config.platform = data["source"].get("platform")
        config.fleet_id = data["source"].get("fleet_id")
        config.site_id = data["source"].get("site_id")
        config.organization_id = data["source"].get("organization_id")

    # Transport section
    if "transport" in data:
        transport_data = data["transport"]
        if "primary" in transport_data:
            primary = transport_data["primary"]
            config.transport_type = primary.get("type", "http")
            config.endpoint = primary.get("endpoint", config.endpoint)
            config.timeout_sec = primary.get("timeout_sec", config.timeout_sec)

        if "batching" in transport_data:
            batching = transport_data["batching"]
            config.batching_enabled = batching.get("enabled", True)
            config.batch_max_size = batching.get("max_size", config.batch_max_size)
            config.batch_max_age_ms = batching.get(
                "max_age_ms", config.batch_max_age_ms
            )

    # Auth section
    if "auth" in data:
        auth = data["auth"]
        config.api_key = auth.get("api_key")

    # License section
    if "license" in data:
        license_data = data["license"]
        config.license_token = license_data.get("token")
        config.license_grace_period_hours = license_data.get(
            "grace_period_hours", DEFAULT_GRACE_PERIOD_HOURS
        )

    # Reliability section
    if "reliability" in data:
        reliability = data["reliability"]

        if "buffer" in reliability:
            buffer = reliability["buffer"]
            config.buffer_enabled = buffer.get("enabled", True)
            config.buffer_path = buffer.get("path", config.buffer_path)
            config.buffer_max_size_mb = buffer.get(
                "max_size_mb", config.buffer_max_size_mb
            )

        if "retry" in reliability:
            retry = reliability["retry"]
            config.retry_max_retries = retry.get(
                "max_retries", config.retry_max_retries
            )
            config.retry_initial_delay_ms = retry.get(
                "initial_delay_ms", config.retry_initial_delay_ms
            )

        if "circuit_breaker" in reliability:
            cb = reliability["circuit_breaker"]
            config.circuit_failure_threshold = cb.get(
                "failure_threshold", config.circuit_failure_threshold
            )
            config.circuit_recovery_timeout_sec = cb.get(
                "recovery_timeout_sec", config.circuit_recovery_timeout_sec
            )

    # Provenance section
    if "provenance" in data:
        prov = data["provenance"]
        config.provenance_enabled = prov.get("enabled", True)

    return cls(config=config)

start async

start() -> None

Start the agent and establish connections.

Source code in phytrace/agent.py
async def start(self) -> None:
    """Start the agent and establish connections."""
    if self._running:
        return

    # Initialize and validate license (skip for mock transport in tests)
    if self.config.transport_type != "mock":
        self._license_validator = LicenseValidator(
            license_token=self.config.license_token,
            endpoint=self.config.endpoint,
            grace_period_hours=self.config.license_grace_period_hours,
        )

        # Validate license - will pass in dev mode or with valid token
        status = self._license_validator.validate()
        self._stats.license_status = status.value

        if not status in (LicenseStatus.VALID, LicenseStatus.GRACE_PERIOD, LicenseStatus.DEV_MODE):
            raise LicenseValidationError(
                f"License validation failed: {status.value}. "
                "Ensure PHYWARE_DEV_MODE=1 is set for local development, "
                "or provide a valid license_token.",
                status
            )

        if status == LicenseStatus.GRACE_PERIOD:
            self._logger.warning(
                "License is in grace period. Events will be quarantined until license is renewed."
            )

    # Initialize transport
    if self._transport is None:
        if self.config.transport_type == "mock":
            self._transport = MockTransport()
        else:
            self._transport = HTTPTransport(
                endpoint=self.config.endpoint,
                api_key=self.config.api_key,
                timeout_sec=self.config.timeout_sec,
            )

    # Connect transport
    await self._transport.connect()

    # Initialize validation engine
    if self.config.validation_enabled:
        self._validation_engine = ValidationEngine(
            strict=self.config.validation_strict
        )

    # Initialize provenance tagger
    if self.config.provenance_enabled:
        self._provenance_tagger = ProvenanceTagger(
            source_id=self.config.source_id,
            signing_key=self.config.signing_key,
            agent_version=self.config.agent_version,
        )

    # Initialize batch processor
    if self.config.batching_enabled:
        batch_config = BatchConfig(
            max_batch_size=self.config.batch_max_size,
            max_batch_age_ms=self.config.batch_max_age_ms,
        )
        self._batch_processor = BatchProcessor(
            config=batch_config,
            on_batch_ready=self._on_batch_ready,
        )
        await self._batch_processor.start()

    # Initialize edge buffer
    if self.config.buffer_enabled:
        self._edge_buffer = EdgeBuffer(
            storage_path=self.config.buffer_path,
            max_size_mb=self.config.buffer_max_size_mb,
            max_age_hours=self.config.buffer_max_age_hours,
        )

    # Initialize retry policy
    self._retry_policy = RetryPolicy(
        max_retries=self.config.retry_max_retries,
        initial_delay_ms=self.config.retry_initial_delay_ms,
    )

    # Initialize circuit breaker
    self._circuit_breaker = CircuitBreaker(
        failure_threshold=self.config.circuit_failure_threshold,
        recovery_timeout_sec=self.config.circuit_recovery_timeout_sec,
    )

    self._running = True

    # Start buffer flush task
    self._flush_task = asyncio.create_task(self._buffer_flush_loop())

    # Start license heartbeat task (hourly validation)
    if self._license_validator and not self._license_validator.is_dev_mode:
        self._license_heartbeat_task = asyncio.create_task(self._license_heartbeat_loop())

stop async

stop() -> None

Stop the agent and flush remaining events.

Source code in phytrace/agent.py
async def stop(self) -> None:
    """Stop the agent and flush remaining events."""
    if not self._running:
        return

    self._running = False

    # Stop license heartbeat task
    if self._license_heartbeat_task:
        self._license_heartbeat_task.cancel()
        try:
            await self._license_heartbeat_task
        except asyncio.CancelledError:
            pass

    # Stop flush task
    if self._flush_task:
        self._flush_task.cancel()
        try:
            await self._flush_task
        except asyncio.CancelledError:
            pass

    # Stop batch processor (flushes pending events via callback)
    if self._batch_processor:
        await self._batch_processor.stop()
        if self._batch_send_tasks:
            await asyncio.gather(
                *list(self._batch_send_tasks), return_exceptions=True
            )
            self._batch_send_tasks.clear()

    # Disconnect transport
    if self._transport:
        await self._transport.disconnect()

emit async

emit(event: UDMEvent) -> SendResult

Emit a single event.

The event is validated, tagged with provenance and license metadata, and sent through the configured transport. If batching is enabled, the event may be held until the batch is full.

PARAMETER DESCRIPTION
event

The UDM event to send

TYPE: UDMEvent

RETURNS DESCRIPTION
SendResult

SendResult indicating success or failure

Source code in phytrace/agent.py
async def emit(self, event: UDMEvent) -> SendResult:
    """
    Emit a single event.

    The event is validated, tagged with provenance and license metadata,
    and sent through the configured transport. If batching is enabled,
    the event may be held until the batch is full.

    Args:
        event: The UDM event to send

    Returns:
        SendResult indicating success or failure
    """
    # Validate event
    if self._validation_engine:
        result = self._validation_engine.validate(event)
        if not result.valid:
            from phytrace.transport.base import TransportError, ErrorType

            error = TransportError(
                error_type=ErrorType.VALIDATION_ERROR,
                message=f"Validation failed: {result.errors[0].message}",
                retryable=False,
            )
            self._stats.events_failed += 1
            return SendResult.fail(event.event_id, error)

    # Tag with provenance
    if self._provenance_tagger:
        tagged = self._provenance_tagger.tag(event)
        event = tagged.event

    # Inject license metadata
    event = self._inject_license_metadata(event)

    # Add to batch or send directly
    if self._batch_processor:
        await self._batch_processor.add(event)
        # Return a pending result - actual send happens in batch
        return SendResult.ok(event.event_id)
    else:
        return await self._send_single(event)

emit_batch async

emit_batch(events: list[UDMEvent]) -> BatchSendResult

Emit a batch of events.

PARAMETER DESCRIPTION
events

List of UDM events to send

TYPE: list[UDMEvent]

RETURNS DESCRIPTION
BatchSendResult

BatchSendResult with per-event results

Source code in phytrace/agent.py
async def emit_batch(self, events: list[UDMEvent]) -> BatchSendResult:
    """
    Emit a batch of events.

    Args:
        events: List of UDM events to send

    Returns:
        BatchSendResult with per-event results
    """
    # Validate and tag all events
    validated_events: list[UDMEvent] = []
    results: list[SendResult] = []

    for event in events:
        # Validate
        if self._validation_engine:
            result = self._validation_engine.validate(event)
            if not result.valid:
                from phytrace.transport.base import TransportError, ErrorType

                error = TransportError(
                    error_type=ErrorType.VALIDATION_ERROR,
                    message=f"Validation failed: {result.errors[0].message}",
                    retryable=False,
                )
                results.append(SendResult.fail(event.event_id, error))
                continue

        # Tag with provenance
        if self._provenance_tagger:
            tagged = self._provenance_tagger.tag(event)
            event = tagged.event

        # Inject license metadata
        event = self._inject_license_metadata(event)

        validated_events.append(event)

    # Send validated events
    if validated_events:
        send_result = await self._send_batch(validated_events)
        results.extend(send_result.results)

    successful = sum(1 for r in results if r.success)

    return BatchSendResult(
        success=successful == len(events),
        total_events=len(events),
        successful_events=successful,
        failed_events=len(events) - successful,
        results=results,
    )

AgentConfig

AgentConfig dataclass

AgentConfig(agent_id: str = 'phytrace-agent', agent_version: str = '0.1.0', source_id: str = '', source_type: SourceType = AMR, platform: str | None = None, fleet_id: str | None = None, site_id: str | None = None, organization_id: str | None = None, transport_type: str = 'http', endpoint: str = 'https://api.phycloud.io/v1', api_key: str | None = None, timeout_sec: float = 30.0, license_token: str | None = None, license_grace_period_hours: int = DEFAULT_GRACE_PERIOD_HOURS, batching_enabled: bool = True, batch_max_size: int = 100, batch_max_age_ms: int = 1000, buffer_enabled: bool = True, buffer_path: str = '/var/lib/phytrace/buffer', buffer_max_size_mb: int = 1000, buffer_max_age_hours: int = 168, retry_max_retries: int = 5, retry_initial_delay_ms: int = 100, circuit_failure_threshold: int = 5, circuit_recovery_timeout_sec: float = 60.0, provenance_enabled: bool = True, signing_key: bytes | None = None, validation_enabled: bool = True, validation_strict: bool = False)

Configuration for the PhyTrace Agent.