Skip to content

Transport

Base Transport Interface

Transport

Transport(config: TransportConfig)

Bases: ABC

Abstract base class for PhyCloud transports.

Implementations must provide async methods for sending single events and batches to PhyCloud endpoints.

Initialize the transport.

PARAMETER DESCRIPTION
config

Transport configuration

TYPE: TransportConfig

Source code in phytrace/transport/base.py
def __init__(self, config: TransportConfig):
    """
    Initialize the transport.

    Args:
        config: Transport configuration
    """
    self.config = config
    self._connected = False

Attributes

is_connected property

is_connected: bool

Check if transport is connected.

Functions

connect abstractmethod async

connect() -> None

Establish connection to PhyCloud.

RAISES DESCRIPTION
TransportError

If connection fails

Source code in phytrace/transport/base.py
@abstractmethod
async def connect(self) -> None:
    """
    Establish connection to PhyCloud.

    Raises:
        TransportError: If connection fails
    """
    pass

disconnect abstractmethod async

disconnect() -> None

Close connection to PhyCloud.

Source code in phytrace/transport/base.py
@abstractmethod
async def disconnect(self) -> None:
    """Close connection to PhyCloud."""
    pass

send abstractmethod async

send(event: UDMEvent) -> SendResult

Send a single event to PhyCloud.

PARAMETER DESCRIPTION
event

The UDM event to send

TYPE: UDMEvent

RETURNS DESCRIPTION
SendResult

SendResult indicating success or failure

Source code in phytrace/transport/base.py
@abstractmethod
async def send(self, event: UDMEvent) -> SendResult:
    """
    Send a single event to PhyCloud.

    Args:
        event: The UDM event to send

    Returns:
        SendResult indicating success or failure
    """
    pass

send_batch abstractmethod async

send_batch(events: list[UDMEvent]) -> BatchSendResult

Send a batch of events to PhyCloud.

PARAMETER DESCRIPTION
events

List of UDM events to send

TYPE: list[UDMEvent]

RETURNS DESCRIPTION
BatchSendResult

BatchSendResult with per-event results

Source code in phytrace/transport/base.py
@abstractmethod
async def send_batch(self, events: list[UDMEvent]) -> BatchSendResult:
    """
    Send a batch of events to PhyCloud.

    Args:
        events: List of UDM events to send

    Returns:
        BatchSendResult with per-event results
    """
    pass

health_check async

health_check() -> bool

Check if the transport connection is healthy.

RETURNS DESCRIPTION
bool

True if healthy, False otherwise

Source code in phytrace/transport/base.py
async def health_check(self) -> bool:
    """
    Check if the transport connection is healthy.

    Returns:
        True if healthy, False otherwise
    """
    return self._connected

__aenter__ async

__aenter__() -> Transport

Async context manager entry.

Source code in phytrace/transport/base.py
async def __aenter__(self) -> "Transport":
    """Async context manager entry."""
    await self.connect()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb) -> None

Async context manager exit.

Source code in phytrace/transport/base.py
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.disconnect()

SendResult

SendResult dataclass

SendResult(success: bool, event_id: str, timestamp: datetime = utcnow(), error: TransportError | None = None, response_time_ms: float | None = None, server_event_id: str | None = None)

Result of sending a single event.

Functions

ok classmethod

ok(event_id: str, response_time_ms: float | None = None, server_event_id: str | None = None) -> SendResult

Create a successful result.

Source code in phytrace/transport/base.py
@classmethod
def ok(
    cls,
    event_id: str,
    response_time_ms: float | None = None,
    server_event_id: str | None = None,
) -> "SendResult":
    """Create a successful result."""
    return cls(
        success=True,
        event_id=event_id,
        response_time_ms=response_time_ms,
        server_event_id=server_event_id,
    )

fail classmethod

fail(event_id: str, error: TransportError) -> SendResult

Create a failed result.

Source code in phytrace/transport/base.py
@classmethod
def fail(cls, event_id: str, error: TransportError) -> "SendResult":
    """Create a failed result."""
    return cls(
        success=False,
        event_id=event_id,
        error=error,
    )

BatchSendResult

BatchSendResult dataclass

BatchSendResult(success: bool, total_events: int, successful_events: int, failed_events: int, results: list[SendResult] = list(), batch_id: str | None = None, response_time_ms: float | None = None)

Result of sending a batch of events.

Attributes

failure_rate property

failure_rate: float

Calculate the failure rate.

HTTP Transport

HTTPTransport

HTTPTransport(endpoint: str, api_key: str | None = None, timeout_sec: float = 30.0, max_retries: int = 3)

Bases: Transport

HTTP/REST transport for PhyCloud ingestion.

Sends events to PhyCloud using the REST API: - POST /api/v1/events - Single event - POST /api/v1/events/batch - Multiple events

Initialize HTTP transport.

PARAMETER DESCRIPTION
endpoint

PhyCloud API endpoint (e.g., https://api.phycloud.io/v1)

TYPE: str

api_key

API key for authentication

TYPE: str | None DEFAULT: None

timeout_sec

Request timeout in seconds

TYPE: float DEFAULT: 30.0

max_retries

Maximum retry attempts

TYPE: int DEFAULT: 3

Source code in phytrace/transport/http.py
def __init__(
    self,
    endpoint: str,
    api_key: str | None = None,
    timeout_sec: float = 30.0,
    max_retries: int = 3,
):
    """
    Initialize HTTP transport.

    Args:
        endpoint: PhyCloud API endpoint (e.g., https://api.phycloud.io/v1)
        api_key: API key for authentication
        timeout_sec: Request timeout in seconds
        max_retries: Maximum retry attempts
    """
    config = TransportConfig(
        endpoint=endpoint,
        api_key=api_key,
        timeout_sec=timeout_sec,
        max_retries=max_retries,
    )
    super().__init__(config)

    self._session: aiohttp.ClientSession | None = None

Functions

connect async

connect() -> None

Establish HTTP session.

Source code in phytrace/transport/http.py
async def connect(self) -> None:
    """Establish HTTP session."""
    if self._session is None or self._session.closed:
        # Build headers
        headers = {
            "Content-Type": "application/json",
            "User-Agent": "PhyTrace-SDK/0.1.0",
        }
        if self.config.api_key:
            headers["Authorization"] = f"Bearer {self.config.api_key}"

        # Create timeout
        timeout = aiohttp.ClientTimeout(total=self.config.timeout_sec)

        # Create session
        self._session = aiohttp.ClientSession(
            headers=headers,
            timeout=timeout,
        )

    self._connected = True

disconnect async

disconnect() -> None

Close HTTP session.

Source code in phytrace/transport/http.py
async def disconnect(self) -> None:
    """Close HTTP session."""
    if self._session and not self._session.closed:
        await self._session.close()
    self._connected = False

send async

send(event: UDMEvent) -> SendResult

Send a single event to PhyCloud.

PARAMETER DESCRIPTION
event

The UDM event to send

TYPE: UDMEvent

RETURNS DESCRIPTION
SendResult

SendResult indicating success or failure

Source code in phytrace/transport/http.py
async def send(self, event: UDMEvent) -> SendResult:
    """
    Send a single event to PhyCloud.

    Args:
        event: The UDM event to send

    Returns:
        SendResult indicating success or failure
    """
    if not self._session:
        await self.connect()

    start_time = time.time()
    url = f"{self.config.endpoint.rstrip('/')}/events"

    try:
        async with self._session.post(url, json=event.to_dict()) as response:
            response_time_ms = (time.time() - start_time) * 1000

            if response.status in (200, 201):
                data = await response.json()
                return SendResult.ok(
                    event_id=event.event_id,
                    response_time_ms=response_time_ms,
                    server_event_id=data.get("event_id"),
                )
            elif response.status == 401:
                error = TransportError(
                    error_type=ErrorType.AUTH_ERROR,
                    message="Authentication failed",
                    status_code=response.status,
                    retryable=False,
                )
                return SendResult.fail(event.event_id, error)
            elif response.status == 422:
                error = TransportError(
                    error_type=ErrorType.VALIDATION_ERROR,
                    message="Event validation failed",
                    status_code=response.status,
                    details=await response.json(),
                    retryable=False,
                )
                return SendResult.fail(event.event_id, error)
            elif response.status == 429:
                error = TransportError(
                    error_type=ErrorType.RATE_LIMITED,
                    message="Rate limited",
                    status_code=response.status,
                    retryable=True,
                )
                return SendResult.fail(event.event_id, error)
            elif response.status >= 500:
                error = TransportError(
                    error_type=ErrorType.SERVER_ERROR,
                    message=f"Server error: {response.status}",
                    status_code=response.status,
                    retryable=True,
                )
                return SendResult.fail(event.event_id, error)
            else:
                error = TransportError(
                    error_type=ErrorType.UNKNOWN,
                    message=f"Unexpected status: {response.status}",
                    status_code=response.status,
                    retryable=False,
                )
                return SendResult.fail(event.event_id, error)

    except aiohttp.ClientConnectionError as e:
        error = TransportError(
            error_type=ErrorType.NETWORK_ERROR,
            message=str(e),
            retryable=True,
        )
        return SendResult.fail(event.event_id, error)
    except TimeoutError:
        error = TransportError(
            error_type=ErrorType.TIMEOUT,
            message="Request timed out",
            retryable=True,
        )
        return SendResult.fail(event.event_id, error)

send_batch async

send_batch(events: list[UDMEvent]) -> BatchSendResult

Send a batch of events to PhyCloud.

PARAMETER DESCRIPTION
events

List of UDM events to send

TYPE: list[UDMEvent]

RETURNS DESCRIPTION
BatchSendResult

BatchSendResult with per-event results

Source code in phytrace/transport/http.py
async def send_batch(self, events: list[UDMEvent]) -> BatchSendResult:
    """
    Send a batch of events to PhyCloud.

    Args:
        events: List of UDM events to send

    Returns:
        BatchSendResult with per-event results
    """
    if not events:
        return BatchSendResult(
            success=True,
            total_events=0,
            successful_events=0,
            failed_events=0,
        )

    if not self._session:
        await self.connect()

    start_time = time.time()
    url = f"{self.config.endpoint.rstrip('/')}/events/batch"

    import logging

    logger = logging.getLogger(__name__)
    logger.debug(f"Sending batch of {len(events)} events to URL: {url}")

    # Prepare batch payload
    payload = {"events": [event.to_dict() for event in events]}

    results: list[SendResult] = []

    try:
        logger.debug(f"Making POST request to {url} with {len(events)} events")
        async with self._session.post(url, json=payload) as response:
            logger.debug(f"Batch response: status={response.status}")
            response_time_ms = (time.time() - start_time) * 1000

            if response.status in (200, 201):
                data = await response.json()

                # Parse per-event results from response
                event_results = data.get("results", [])
                for i, event in enumerate(events):
                    if i < len(event_results):
                        result_data = event_results[i]
                        if result_data.get("success", True):
                            results.append(
                                SendResult.ok(
                                    event_id=event.event_id,
                                    server_event_id=result_data.get("event_id"),
                                )
                            )
                        else:
                            results.append(
                                SendResult.fail(
                                    event.event_id,
                                    TransportError(
                                        error_type=ErrorType.VALIDATION_ERROR,
                                        message=result_data.get(
                                            "error", "Unknown error"
                                        ),
                                        retryable=False,
                                    ),
                                )
                            )
                    else:
                        results.append(SendResult.ok(event_id=event.event_id))

                successful = sum(1 for r in results if r.success)
                return BatchSendResult(
                    success=successful == len(events),
                    total_events=len(events),
                    successful_events=successful,
                    failed_events=len(events) - successful,
                    results=results,
                    batch_id=data.get("batch_id"),
                    response_time_ms=response_time_ms,
                )
            else:
                # All events failed
                error_type = self._map_status_to_error(response.status)
                for event in events:
                    results.append(
                        SendResult.fail(
                            event.event_id,
                            TransportError(
                                error_type=error_type,
                                message=f"Batch request failed: {response.status}",
                                status_code=response.status,
                                retryable=response.status >= 500,
                            ),
                        )
                    )

                return BatchSendResult(
                    success=False,
                    total_events=len(events),
                    successful_events=0,
                    failed_events=len(events),
                    results=results,
                    response_time_ms=response_time_ms,
                )

    except aiohttp.ClientConnectionError as e:
        logger.error(f"Connection error sending batch: {e}")
        for event in events:
            results.append(
                SendResult.fail(
                    event.event_id,
                    TransportError(
                        error_type=ErrorType.NETWORK_ERROR,
                        message=str(e),
                        retryable=True,
                    ),
                )
            )

        return BatchSendResult(
            success=False,
            total_events=len(events),
            successful_events=0,
            failed_events=len(events),
            results=results,
        )
    except TimeoutError:
        logger.error("Timeout error sending batch")
        for event in events:
            results.append(
                SendResult.fail(
                    event.event_id,
                    TransportError(
                        error_type=ErrorType.TIMEOUT,
                        message="Request timed out",
                        retryable=True,
                    ),
                )
            )

        return BatchSendResult(
            success=False,
            total_events=len(events),
            successful_events=0,
            failed_events=len(events),
            results=results,
        )
    except Exception as e:
        logger.error(f"Unexpected error sending batch: {type(e).__name__}: {e}")
        for event in events:
            results.append(
                SendResult.fail(
                    event.event_id,
                    TransportError(
                        error_type=ErrorType.UNKNOWN,
                        message=str(e),
                        retryable=True,
                    ),
                )
            )

        return BatchSendResult(
            success=False,
            total_events=len(events),
            successful_events=0,
            failed_events=len(events),
            results=results,
        )

Mock Transport

MockTransport

MockTransport(behavior: MockBehavior | None = None)

Bases: Transport

Mock transport for testing.

Records all sent events and allows configuring behavior for testing different scenarios.

Example

transport = MockTransport() await transport.send(event)

assert len(transport.sent_events) == 1 assert transport.sent_events[0].source_id == "robot-001"

Test error handling

transport.behavior.fail_all = True result = await transport.send(event) assert not result.success

Initialize mock transport.

PARAMETER DESCRIPTION
behavior

Optional MockBehavior configuration

TYPE: MockBehavior | None DEFAULT: None

Source code in phytrace/transport/mock.py
def __init__(self, behavior: MockBehavior | None = None):
    """
    Initialize mock transport.

    Args:
        behavior: Optional MockBehavior configuration
    """
    config = TransportConfig(endpoint="mock://localhost")
    super().__init__(config)

    self.behavior = behavior or MockBehavior()
    self.sent_events: list[UDMEvent] = []
    self.batches: list[list[UDMEvent]] = []
    self._request_count = 0

Attributes

event_count property

event_count: int

Get total number of events sent.

request_count property

request_count: int

Get total number of send requests (not events).

Functions

connect async

connect() -> None

Mock connect (always succeeds).

Source code in phytrace/transport/mock.py
async def connect(self) -> None:
    """Mock connect (always succeeds)."""
    self._connected = True

disconnect async

disconnect() -> None

Mock disconnect.

Source code in phytrace/transport/mock.py
async def disconnect(self) -> None:
    """Mock disconnect."""
    self._connected = False

send async

send(event: UDMEvent) -> SendResult

Mock send - records the event and returns configured result.

PARAMETER DESCRIPTION
event

The UDM event to "send"

TYPE: UDMEvent

RETURNS DESCRIPTION
SendResult

SendResult based on configured behavior

Source code in phytrace/transport/mock.py
async def send(self, event: UDMEvent) -> SendResult:
    """
    Mock send - records the event and returns configured result.

    Args:
        event: The UDM event to "send"

    Returns:
        SendResult based on configured behavior
    """
    self._request_count += 1

    # Check custom callback first
    if self.behavior.on_send:
        custom_result = self.behavior.on_send(event)
        if custom_result is not None:
            if custom_result.success:
                self.sent_events.append(event)
            return custom_result

    # Check rate limit
    if (
        self.behavior.rate_limit_after
        and self._request_count > self.behavior.rate_limit_after
    ):
        return SendResult.fail(
            event.event_id,
            TransportError(
                error_type=ErrorType.RATE_LIMITED,
                message="Rate limit exceeded",
                status_code=429,
                retryable=True,
            ),
        )

    # Check fail_all
    if self.behavior.fail_all:
        return SendResult.fail(
            event.event_id,
            TransportError(
                error_type=self.behavior.error_type,
                message=self.behavior.error_message,
                retryable=self.behavior.error_type
                in [ErrorType.SERVER_ERROR, ErrorType.NETWORK_ERROR],
            ),
        )

    # Check success rate
    import random

    if random.random() > self.behavior.success_rate:
        return SendResult.fail(
            event.event_id,
            TransportError(
                error_type=self.behavior.error_type,
                message=self.behavior.error_message,
                retryable=True,
            ),
        )

    # Success - record the event
    self.sent_events.append(event)

    return SendResult.ok(
        event_id=event.event_id,
        response_time_ms=self.behavior.response_time_ms,
        server_event_id=f"srv-{uuid.uuid4().hex[:8]}",
    )

send_batch async

send_batch(events: list[UDMEvent]) -> BatchSendResult

Mock batch send - records all events.

PARAMETER DESCRIPTION
events

List of UDM events to "send"

TYPE: list[UDMEvent]

RETURNS DESCRIPTION
BatchSendResult

BatchSendResult based on configured behavior

Source code in phytrace/transport/mock.py
async def send_batch(self, events: list[UDMEvent]) -> BatchSendResult:
    """
    Mock batch send - records all events.

    Args:
        events: List of UDM events to "send"

    Returns:
        BatchSendResult based on configured behavior
    """
    if not events:
        return BatchSendResult(
            success=True,
            total_events=0,
            successful_events=0,
            failed_events=0,
        )

    self._request_count += 1
    self.batches.append(events.copy())

    # Check fail_all
    if self.behavior.fail_all:
        results = [
            SendResult.fail(
                event.event_id,
                TransportError(
                    error_type=self.behavior.error_type,
                    message=self.behavior.error_message,
                    retryable=True,
                ),
            )
            for event in events
        ]
        return BatchSendResult(
            success=False,
            total_events=len(events),
            successful_events=0,
            failed_events=len(events),
            results=results,
            response_time_ms=self.behavior.response_time_ms,
        )

    # Process each event
    results: list[SendResult] = []
    successful = 0

    import random

    for event in events:
        if random.random() <= self.behavior.success_rate:
            self.sent_events.append(event)
            results.append(
                SendResult.ok(
                    event_id=event.event_id,
                    server_event_id=f"srv-{uuid.uuid4().hex[:8]}",
                )
            )
            successful += 1
        else:
            results.append(
                SendResult.fail(
                    event.event_id,
                    TransportError(
                        error_type=self.behavior.error_type,
                        message=self.behavior.error_message,
                        retryable=True,
                    ),
                )
            )

    return BatchSendResult(
        success=successful == len(events),
        total_events=len(events),
        successful_events=successful,
        failed_events=len(events) - successful,
        results=results,
        batch_id=f"batch-{uuid.uuid4().hex[:8]}",
        response_time_ms=self.behavior.response_time_ms,
    )

clear

clear() -> None

Clear all recorded events and reset counters.

Source code in phytrace/transport/mock.py
def clear(self) -> None:
    """Clear all recorded events and reset counters."""
    self.sent_events.clear()
    self.batches.clear()
    self._request_count = 0

get_events_by_source

get_events_by_source(source_id: str) -> list[UDMEvent]

Get all events from a specific source.

Source code in phytrace/transport/mock.py
def get_events_by_source(self, source_id: str) -> list[UDMEvent]:
    """Get all events from a specific source."""
    return [e for e in self.sent_events if e.source_id == source_id]

get_events_by_type

get_events_by_type(event_type: str) -> list[UDMEvent]

Get all events of a specific type.

Source code in phytrace/transport/mock.py
def get_events_by_type(self, event_type: str) -> list[UDMEvent]:
    """Get all events of a specific type."""
    return [
        e
        for e in self.sent_events
        if str(e.event_type) == event_type or e.event_type == event_type
    ]