Skip to content

Emitters

PeriodicEmitter

PeriodicEmitter

PeriodicEmitter(collector: TelemetryCollector, interval_ms: int = 100, event_type: EventType = TELEMETRY_PERIODIC, on_event: Callable[[UDMEvent], None] | None = None)

Captures and emits telemetry events at regular intervals.

Uses a TelemetryCollector to gather data and emits UDM events at the configured interval.

Example

collector = MyRobotCollector(source_id="robot-001") emitter = PeriodicEmitter( collector=collector, interval_ms=100, # 10 Hz on_event=lambda e: agent.emit(e) ) await emitter.start()

Initialize the periodic emitter.

PARAMETER DESCRIPTION
collector

Telemetry collector to use

TYPE: TelemetryCollector

interval_ms

Emission interval in milliseconds

TYPE: int DEFAULT: 100

event_type

Event type for emitted events

TYPE: EventType DEFAULT: TELEMETRY_PERIODIC

on_event

Callback for each emitted event

TYPE: Callable[[UDMEvent], None] | None DEFAULT: None

Source code in phytrace/emitters/periodic.py
def __init__(
    self,
    collector: TelemetryCollector,
    interval_ms: int = 100,
    event_type: EventType = EventType.TELEMETRY_PERIODIC,
    on_event: Callable[[UDMEvent], None] | None = None,
):
    """
    Initialize the periodic emitter.

    Args:
        collector: Telemetry collector to use
        interval_ms: Emission interval in milliseconds
        event_type: Event type for emitted events
        on_event: Callback for each emitted event
    """
    self.collector = collector
    self.interval_ms = interval_ms
    self.event_type = event_type
    self.on_event = on_event

    self._running = False
    self._task: asyncio.Task | None = None
    self._sequence = 0
    self._event_queue: asyncio.Queue[UDMEvent] = asyncio.Queue()

Attributes

is_running property

is_running: bool

Check if emitter is running.

current_sequence property

current_sequence: int

Get current sequence number.

Functions

start async

start() -> None

Start periodic emission.

Source code in phytrace/emitters/periodic.py
async def start(self) -> None:
    """Start periodic emission."""
    if self._running:
        return

    self._running = True
    self._task = asyncio.create_task(self._emission_loop())

stop async

stop() -> None

Stop periodic emission.

Source code in phytrace/emitters/periodic.py
async def stop(self) -> None:
    """Stop periodic emission."""
    self._running = False
    if self._task:
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass
        self._task = None

set_interval

set_interval(interval_ms: int) -> None

Update the emission interval.

Source code in phytrace/emitters/periodic.py
def set_interval(self, interval_ms: int) -> None:
    """Update the emission interval."""
    self.interval_ms = interval_ms

get_event async

get_event(timeout: float | None = None) -> UDMEvent | None

Get the next emitted event from the queue.

PARAMETER DESCRIPTION
timeout

Maximum time to wait in seconds

TYPE: float | None DEFAULT: None

RETURNS DESCRIPTION
UDMEvent | None

UDMEvent or None if timeout

Source code in phytrace/emitters/periodic.py
async def get_event(self, timeout: float | None = None) -> UDMEvent | None:
    """
    Get the next emitted event from the queue.

    Args:
        timeout: Maximum time to wait in seconds

    Returns:
        UDMEvent or None if timeout
    """
    try:
        if timeout is None:
            return await self._event_queue.get()
        else:
            return await asyncio.wait_for(self._event_queue.get(), timeout=timeout)
    except asyncio.TimeoutError:
        return None

EventEmitter

EventEmitter

EventEmitter(source_id: str, source_type: SourceType, session_id: str | None = None, on_event: Callable[[UDMEvent], None] | None = None)

Emits specific event types when triggered.

Used for emitting events in response to specific occurrences like state transitions, safety violations, task events, etc.

Example

emitter = EventEmitter(source_id="robot-001", source_type=SourceType.AMR)

Emit state transition

event = emitter.emit_state_transition( from_state="idle", to_state="executing_task", reason="New task received" ) await agent.emit(event)

Initialize the event emitter.

PARAMETER DESCRIPTION
source_id

Unique source identifier

TYPE: str

source_type

Type of source

TYPE: SourceType

session_id

Optional session ID for grouping events

TYPE: str | None DEFAULT: None

on_event

Optional callback for each emitted event

TYPE: Callable[[UDMEvent], None] | None DEFAULT: None

Source code in phytrace/emitters/event.py
def __init__(
    self,
    source_id: str,
    source_type: SourceType,
    session_id: str | None = None,
    on_event: Callable[[UDMEvent], None] | None = None,
):
    """
    Initialize the event emitter.

    Args:
        source_id: Unique source identifier
        source_type: Type of source
        session_id: Optional session ID for grouping events
        on_event: Optional callback for each emitted event
    """
    self.source_id = source_id
    self.source_type = source_type
    self.session_id = session_id
    self.on_event = on_event

    self._sequence = 0

Attributes

current_sequence property

current_sequence: int

Get current sequence number.

Functions

emit_state_transition

emit_state_transition(from_state: str, to_state: str, reason: str | None = None) -> UDMEvent

Emit a state transition event.

PARAMETER DESCRIPTION
from_state

Previous state

TYPE: str

to_state

New state

TYPE: str

reason

Optional reason for transition

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
UDMEvent

The emitted UDMEvent

Source code in phytrace/emitters/event.py
def emit_state_transition(
    self,
    from_state: str,
    to_state: str,
    reason: str | None = None,
) -> UDMEvent:
    """
    Emit a state transition event.

    Args:
        from_state: Previous state
        to_state: New state
        reason: Optional reason for transition

    Returns:
        The emitted UDMEvent
    """
    builder = self._create_builder(EventType.STATE_TRANSITION)
    builder.with_operational(state=None, sub_state=to_state)

    extensions = {
        "state_transition": {
            "from_state": from_state,
            "to_state": to_state,
            "reason": reason,
        }
    }
    builder.with_extensions(extensions)

    event = builder.build()
    return self._emit_and_callback(event)

emit_safety_violation

emit_safety_violation(violation_id: str, rule_id: str, violation_type: str, severity: ViolationSeverity, details: dict[str, Any] | None = None) -> UDMEvent

Emit a safety violation event.

PARAMETER DESCRIPTION
violation_id

Unique violation identifier

TYPE: str

rule_id

Rule that was violated

TYPE: str

violation_type

Type of violation

TYPE: str

severity

Violation severity

TYPE: ViolationSeverity

details

Additional violation details

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
UDMEvent

The emitted UDMEvent

Source code in phytrace/emitters/event.py
def emit_safety_violation(
    self,
    violation_id: str,
    rule_id: str,
    violation_type: str,
    severity: ViolationSeverity,
    details: dict[str, Any] | None = None,
) -> UDMEvent:
    """
    Emit a safety violation event.

    Args:
        violation_id: Unique violation identifier
        rule_id: Rule that was violated
        violation_type: Type of violation
        severity: Violation severity
        details: Additional violation details

    Returns:
        The emitted UDMEvent
    """
    builder = self._create_builder(EventType.SAFETY_VIOLATION)

    violation = SafetyViolation(
        violation_id=violation_id,
        rule_id=rule_id,
        violation_type=violation_type,
        severity=severity,
        timestamp=datetime.utcnow(),
        details=details,
    )
    builder.with_safety(violations=[violation])

    event = builder.build()
    return self._emit_and_callback(event)

emit_task_event

emit_task_event(task_id: str, event_type: TaskEventType, task_type: str | None = None, details: dict[str, Any] | None = None) -> UDMEvent

Emit a task lifecycle event.

PARAMETER DESCRIPTION
task_id

Task identifier

TYPE: str

event_type

Task event type (started, completed, failed, cancelled)

TYPE: TaskEventType

task_type

Optional task type

TYPE: str | None DEFAULT: None

details

Additional task details

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
UDMEvent

The emitted UDMEvent

Source code in phytrace/emitters/event.py
def emit_task_event(
    self,
    task_id: str,
    event_type: TaskEventType,
    task_type: str | None = None,
    details: dict[str, Any] | None = None,
) -> UDMEvent:
    """
    Emit a task lifecycle event.

    Args:
        task_id: Task identifier
        event_type: Task event type (started, completed, failed, cancelled)
        task_type: Optional task type
        details: Additional task details

    Returns:
        The emitted UDMEvent
    """
    # Map to UDM event type
    type_mapping = {
        TaskEventType.STARTED: EventType.TASK_STARTED,
        TaskEventType.COMPLETED: EventType.TASK_COMPLETED,
        TaskEventType.FAILED: EventType.TASK_FAILED,
        TaskEventType.CANCELLED: EventType.TASK_CANCELLED,
    }

    builder = self._create_builder(type_mapping[event_type])
    builder.with_operational(
        task_id=task_id,
        task_type=task_type,
        task_state=event_type.value,
    )

    if details:
        builder.with_extensions({"task_details": details})

    event = builder.build()
    return self._emit_and_callback(event)

emit_navigation_event

emit_navigation_event(event_type: NavigationEventType, goal_id: str | None = None, goal_x_m: float | None = None, goal_y_m: float | None = None, details: dict[str, Any] | None = None) -> UDMEvent

Emit a navigation event.

PARAMETER DESCRIPTION
event_type

Navigation event type

TYPE: NavigationEventType

goal_id

Goal identifier

TYPE: str | None DEFAULT: None

goal_x_m

Goal X position

TYPE: float | None DEFAULT: None

goal_y_m

Goal Y position

TYPE: float | None DEFAULT: None

details

Additional details

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
UDMEvent

The emitted UDMEvent

Source code in phytrace/emitters/event.py
def emit_navigation_event(
    self,
    event_type: NavigationEventType,
    goal_id: str | None = None,
    goal_x_m: float | None = None,
    goal_y_m: float | None = None,
    details: dict[str, Any] | None = None,
) -> UDMEvent:
    """
    Emit a navigation event.

    Args:
        event_type: Navigation event type
        goal_id: Goal identifier
        goal_x_m: Goal X position
        goal_y_m: Goal Y position
        details: Additional details

    Returns:
        The emitted UDMEvent
    """
    # Map to UDM event type
    type_mapping = {
        NavigationEventType.GOAL_REACHED: EventType.NAVIGATION_GOAL_REACHED,
        NavigationEventType.PATH_BLOCKED: EventType.NAVIGATION_PATH_BLOCKED,
        NavigationEventType.REROUTING: EventType.NAVIGATION_REROUTING,
    }

    builder = self._create_builder(type_mapping[event_type])
    builder.with_navigation(
        goal_id=goal_id,
        goal_x_m=goal_x_m,
        goal_y_m=goal_y_m,
    )

    if details:
        builder.with_extensions({"navigation_details": details})

    event = builder.build()
    return self._emit_and_callback(event)

emit_power_event

emit_power_event(event_type: EventType, battery_soc_pct: float | None = None, is_charging: bool | None = None, charger_id: str | None = None) -> UDMEvent

Emit a power-related event.

PARAMETER DESCRIPTION
event_type

Power event type

TYPE: EventType

battery_soc_pct

Battery state of charge

TYPE: float | None DEFAULT: None

is_charging

Whether currently charging

TYPE: bool | None DEFAULT: None

charger_id

Charger identifier

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
UDMEvent

The emitted UDMEvent

Source code in phytrace/emitters/event.py
def emit_power_event(
    self,
    event_type: EventType,
    battery_soc_pct: float | None = None,
    is_charging: bool | None = None,
    charger_id: str | None = None,
) -> UDMEvent:
    """
    Emit a power-related event.

    Args:
        event_type: Power event type
        battery_soc_pct: Battery state of charge
        is_charging: Whether currently charging
        charger_id: Charger identifier

    Returns:
        The emitted UDMEvent
    """
    builder = self._create_builder(event_type)
    builder.with_power(
        battery_soc_pct=battery_soc_pct,
        is_charging=is_charging,
        charger_id=charger_id,
    )

    event = builder.build()
    return self._emit_and_callback(event)

emit_custom_event

emit_custom_event(event_type_name: str, data: dict[str, Any]) -> UDMEvent

Emit a custom event (custom.* namespace).

PARAMETER DESCRIPTION
event_type_name

Custom event type name (without 'custom.' prefix)

TYPE: str

data

Event data

TYPE: dict[str, Any]

RETURNS DESCRIPTION
UDMEvent

The emitted UDMEvent

Source code in phytrace/emitters/event.py
def emit_custom_event(
    self,
    event_type_name: str,
    data: dict[str, Any],
) -> UDMEvent:
    """
    Emit a custom event (custom.* namespace).

    Args:
        event_type_name: Custom event type name (without 'custom.' prefix)
        data: Event data

    Returns:
        The emitted UDMEvent
    """
    # For custom events, we use TELEMETRY_ON_CHANGE as base
    # and add the custom type in extensions
    builder = self._create_builder(EventType.TELEMETRY_ON_CHANGE)
    builder.with_extensions(
        {
            "custom_event": {
                "type": f"custom.{event_type_name}",
                "data": data,
            }
        }
    )

    event = builder.build()
    return self._emit_and_callback(event)