Skip to content

Reliability

Edge Buffer

EdgeBuffer

EdgeBuffer(storage_path: str | Path, max_size_mb: int = 1000, max_age_hours: int = 168, encryption_key: bytes | None = None)

Persistent local buffer for offline operation.

Stores events to disk when the transport is unavailable, and flushes them when connection is restored.

Features: - FIFO queue semantics - Configurable size and age limits - Optional encryption - Atomic writes

Initialize the edge buffer.

PARAMETER DESCRIPTION
storage_path

Directory path for buffer storage

TYPE: str | Path

max_size_mb

Maximum buffer size in MB

TYPE: int DEFAULT: 1000

max_age_hours

Maximum event age in hours

TYPE: int DEFAULT: 168

encryption_key

Optional encryption key (not implemented)

TYPE: bytes | None DEFAULT: None

Source code in phytrace/reliability/buffer.py
def __init__(
    self,
    storage_path: str | Path,
    max_size_mb: int = 1000,
    max_age_hours: int = 168,  # 7 days
    encryption_key: bytes | None = None,
):
    """
    Initialize the edge buffer.

    Args:
        storage_path: Directory path for buffer storage
        max_size_mb: Maximum buffer size in MB
        max_age_hours: Maximum event age in hours
        encryption_key: Optional encryption key (not implemented)
    """
    self.storage_path = Path(storage_path)
    self.max_size_bytes = max_size_mb * 1024 * 1024
    self.max_age_sec = max_age_hours * 3600
    self.encryption_key = encryption_key

    # Create storage directory
    self.storage_path.mkdir(parents=True, exist_ok=True)

    # File counter for ordering
    self._counter = 0
    self._lock = asyncio.Lock()

Functions

enqueue async

enqueue(event: UDMEvent) -> bool

Add an event to the buffer.

PARAMETER DESCRIPTION
event

The UDM event to buffer

TYPE: UDMEvent

RETURNS DESCRIPTION
bool

True if successfully buffered, False if buffer is full

Source code in phytrace/reliability/buffer.py
async def enqueue(self, event: UDMEvent) -> bool:
    """
    Add an event to the buffer.

    Args:
        event: The UDM event to buffer

    Returns:
        True if successfully buffered, False if buffer is full
    """
    async with self._lock:
        # Check size limit
        stats = self._get_stats_sync()
        if stats.size_bytes >= self.max_size_bytes:
            # Try to prune old events first
            self._prune_old_events()
            stats = self._get_stats_sync()
            if stats.size_bytes >= self.max_size_bytes:
                return False

        # Generate unique filename with timestamp for ordering
        self._counter += 1
        timestamp = int(time.time() * 1000000)
        filename = f"{timestamp}_{self._counter}_{event.event_id}.json"
        filepath = self.storage_path / filename

        # Serialize and write atomically
        event_data = {
            "buffered_at": datetime.utcnow().isoformat(),
            "event": event.to_dict(),
        }

        # Write to temp file then rename (atomic on POSIX)
        temp_path = filepath.with_suffix(".tmp")
        try:
            with open(temp_path, "w") as f:
                json.dump(event_data, f)
            temp_path.rename(filepath)
            return True
        except OSError:
            if temp_path.exists():
                temp_path.unlink()
            return False

dequeue async

dequeue(max_events: int = 100) -> list[UDMEvent]

Get events from the buffer (oldest first).

PARAMETER DESCRIPTION
max_events

Maximum number of events to retrieve

TYPE: int DEFAULT: 100

RETURNS DESCRIPTION
list[UDMEvent]

List of UDM events (does not remove them from buffer)

Source code in phytrace/reliability/buffer.py
async def dequeue(self, max_events: int = 100) -> list[UDMEvent]:
    """
    Get events from the buffer (oldest first).

    Args:
        max_events: Maximum number of events to retrieve

    Returns:
        List of UDM events (does not remove them from buffer)
    """
    async with self._lock:
        events: list[UDMEvent] = []

        # Get sorted list of buffer files
        files = sorted(self.storage_path.glob("*.json"))

        for filepath in files[:max_events]:
            try:
                with open(filepath, "r") as f:
                    data = json.load(f)
                event = UDMEvent.from_dict(data["event"])
                events.append(event)
            except (json.JSONDecodeError, KeyError, OSError):
                # Skip corrupted files
                continue

        return events

remove async

remove(event_ids: list[str]) -> int

Remove specific events from the buffer.

PARAMETER DESCRIPTION
event_ids

List of event IDs to remove

TYPE: list[str]

RETURNS DESCRIPTION
int

Number of events removed

Source code in phytrace/reliability/buffer.py
async def remove(self, event_ids: list[str]) -> int:
    """
    Remove specific events from the buffer.

    Args:
        event_ids: List of event IDs to remove

    Returns:
        Number of events removed
    """
    async with self._lock:
        removed = 0
        event_id_set = set(event_ids)

        for filepath in self.storage_path.glob("*.json"):
            # Extract event_id from filename
            parts = filepath.stem.split("_")
            if len(parts) >= 3:
                file_event_id = "_".join(parts[2:])
                if file_event_id in event_id_set:
                    try:
                        filepath.unlink()
                        removed += 1
                    except OSError:
                        pass

        return removed

flush async

flush(transport: Transport, batch_size: int = 100) -> BatchSendResult

Send all buffered events when connection is available.

PARAMETER DESCRIPTION
transport

The transport to use for sending

TYPE: Transport

batch_size

Number of events to send per batch

TYPE: int DEFAULT: 100

RETURNS DESCRIPTION
BatchSendResult

Combined BatchSendResult for all events

Source code in phytrace/reliability/buffer.py
async def flush(
    self, transport: Transport, batch_size: int = 100
) -> BatchSendResult:
    """
    Send all buffered events when connection is available.

    Args:
        transport: The transport to use for sending
        batch_size: Number of events to send per batch

    Returns:
        Combined BatchSendResult for all events
    """
    total_events = 0
    successful_events = 0
    failed_events = 0
    all_results = []

    while True:
        # Get next batch
        events = await self.dequeue(batch_size)
        if not events:
            break

        # Send batch
        result = await transport.send_batch(events)

        total_events += result.total_events
        successful_events += result.successful_events
        failed_events += result.failed_events
        all_results.extend(result.results)

        # Remove successfully sent events
        successful_ids = [r.event_id for r in result.results if r.success]
        await self.remove(successful_ids)

        # If batch failed completely, stop flushing
        if result.failed_events == result.total_events:
            break

    return BatchSendResult(
        success=failed_events == 0,
        total_events=total_events,
        successful_events=successful_events,
        failed_events=failed_events,
        results=all_results,
    )

get_stats async

get_stats() -> BufferStats

Get buffer statistics.

Source code in phytrace/reliability/buffer.py
async def get_stats(self) -> BufferStats:
    """Get buffer statistics."""
    async with self._lock:
        return self._get_stats_sync()

clear async

clear() -> int

Clear all buffered events.

Source code in phytrace/reliability/buffer.py
async def clear(self) -> int:
    """Clear all buffered events."""
    async with self._lock:
        removed = 0
        for filepath in self.storage_path.glob("*.json"):
            try:
                filepath.unlink()
                removed += 1
            except OSError:
                continue
        return removed

__aiter__ async

__aiter__() -> AsyncIterator[UDMEvent]

Iterate over buffered events.

Source code in phytrace/reliability/buffer.py
async def __aiter__(self) -> AsyncIterator[UDMEvent]:
    """Iterate over buffered events."""
    events = await self.dequeue(max_events=10000)
    for event in events:
        yield event

BufferStats

BufferStats dataclass

BufferStats(event_count: int, size_bytes: int, oldest_event_age_sec: float, disk_usage_pct: float)

Statistics about the edge buffer.

Attributes

size_mb property

size_mb: float

Size in megabytes.

Retry & Circuit Breaker

RetryPolicy

RetryPolicy dataclass

RetryPolicy(max_retries: int = 5, initial_delay_ms: int = 100, max_delay_ms: int = 30000, exponential_base: float = 2.0, jitter: bool = True, retryable_errors: set[ErrorType] = (lambda: {NETWORK_ERROR, TIMEOUT, SERVER_ERROR, RATE_LIMITED})())

Configurable retry logic with exponential backoff.

Implements exponential backoff with optional jitter to prevent thundering herd problems.

Functions

get_delay

get_delay(attempt: int) -> float

Calculate delay for given attempt number.

PARAMETER DESCRIPTION
attempt

Attempt number (0-indexed)

TYPE: int

RETURNS DESCRIPTION
float

Delay in seconds

Source code in phytrace/reliability/retry.py
def get_delay(self, attempt: int) -> float:
    """
    Calculate delay for given attempt number.

    Args:
        attempt: Attempt number (0-indexed)

    Returns:
        Delay in seconds
    """
    # Calculate exponential delay
    delay_ms = self.initial_delay_ms * (self.exponential_base**attempt)

    # Cap at max delay
    delay_ms = min(delay_ms, self.max_delay_ms)

    # Add jitter (±25%)
    if self.jitter:
        jitter_factor = 1.0 + (random.random() - 0.5) * 0.5
        delay_ms *= jitter_factor

    return delay_ms / 1000.0  # Convert to seconds

should_retry

should_retry(error_type: ErrorType, attempt: int) -> bool

Determine if a retry should be attempted.

PARAMETER DESCRIPTION
error_type

Type of error that occurred

TYPE: ErrorType

attempt

Current attempt number (0-indexed)

TYPE: int

RETURNS DESCRIPTION
bool

True if should retry, False otherwise

Source code in phytrace/reliability/retry.py
def should_retry(self, error_type: ErrorType, attempt: int) -> bool:
    """
    Determine if a retry should be attempted.

    Args:
        error_type: Type of error that occurred
        attempt: Current attempt number (0-indexed)

    Returns:
        True if should retry, False otherwise
    """
    if attempt >= self.max_retries:
        return False

    return error_type in self.retryable_errors

execute_with_retry async

execute_with_retry(func: Callable[[], T], on_retry: Callable[[int, Exception], None] | None = None) -> T

Execute a function with retry logic.

PARAMETER DESCRIPTION
func

Async function to execute

TYPE: Callable[[], T]

on_retry

Optional callback on each retry

TYPE: Callable[[int, Exception], None] | None DEFAULT: None

RETURNS DESCRIPTION
T

Result of the function

RAISES DESCRIPTION
Exception

If all retries exhausted

Source code in phytrace/reliability/retry.py
async def execute_with_retry(
    self,
    func: Callable[[], T],
    on_retry: Callable[[int, Exception], None] | None = None,
) -> T:
    """
    Execute a function with retry logic.

    Args:
        func: Async function to execute
        on_retry: Optional callback on each retry

    Returns:
        Result of the function

    Raises:
        Exception: If all retries exhausted
    """
    last_exception: Exception | None = None

    for attempt in range(self.max_retries + 1):
        try:
            return await func()
        except Exception as e:
            last_exception = e

            # Check if error type is retryable
            error_type = getattr(e, "error_type", ErrorType.UNKNOWN)
            if not self.should_retry(error_type, attempt):
                raise

            # Call retry callback
            if on_retry:
                on_retry(attempt, e)

            # Wait before retrying
            delay = self.get_delay(attempt)
            await asyncio.sleep(delay)

    # All retries exhausted
    if last_exception:
        raise last_exception
    raise RuntimeError("Retry logic error: no exception captured")

CircuitBreaker

CircuitBreaker

CircuitBreaker(failure_threshold: int = 5, recovery_timeout_sec: float = 60.0, half_open_requests: int = 3)

Circuit breaker to prevent cascade failures.

The circuit breaker has three states: - CLOSED: Normal operation, requests pass through - OPEN: Failing, all requests are rejected immediately - HALF_OPEN: Testing, limited requests allowed to check recovery

Transitions: - CLOSED -> OPEN: When failure threshold is reached - OPEN -> HALF_OPEN: After recovery timeout - HALF_OPEN -> CLOSED: If test requests succeed - HALF_OPEN -> OPEN: If test requests fail

Initialize the circuit breaker.

PARAMETER DESCRIPTION
failure_threshold

Number of failures before opening circuit

TYPE: int DEFAULT: 5

recovery_timeout_sec

Time to wait before trying recovery

TYPE: float DEFAULT: 60.0

half_open_requests

Number of test requests in half-open state

TYPE: int DEFAULT: 3

Source code in phytrace/reliability/retry.py
def __init__(
    self,
    failure_threshold: int = 5,
    recovery_timeout_sec: float = 60.0,
    half_open_requests: int = 3,
):
    """
    Initialize the circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        recovery_timeout_sec: Time to wait before trying recovery
        half_open_requests: Number of test requests in half-open state
    """
    self.failure_threshold = failure_threshold
    self.recovery_timeout_sec = recovery_timeout_sec
    self.half_open_requests = half_open_requests

    self._state = CircuitState.CLOSED
    self._failure_count = 0
    self._last_failure_time: float | None = None
    self._half_open_successes = 0
    self._half_open_failures = 0
    self._lock = asyncio.Lock()

Attributes

state property

state: CircuitState

Get current circuit state.

is_open property

is_open: bool

Check if circuit is open (rejecting requests).

Functions

allow_request async

allow_request() -> bool

Check if a request should be allowed.

RETURNS DESCRIPTION
bool

True if request should proceed, False if rejected

Source code in phytrace/reliability/retry.py
async def allow_request(self) -> bool:
    """
    Check if a request should be allowed.

    Returns:
        True if request should proceed, False if rejected
    """
    async with self._lock:
        if self._state == CircuitState.CLOSED:
            return True

        if self._state == CircuitState.OPEN:
            # Check if recovery timeout has passed
            if self._last_failure_time:
                elapsed = time.time() - self._last_failure_time
                if elapsed >= self.recovery_timeout_sec:
                    # Transition to half-open
                    self._state = CircuitState.HALF_OPEN
                    self._half_open_successes = 0
                    self._half_open_failures = 0
                    return True
            return False

        if self._state == CircuitState.HALF_OPEN:
            # Allow limited requests
            total_half_open = self._half_open_successes + self._half_open_failures
            return total_half_open < self.half_open_requests

        return False

record_success async

record_success() -> None

Record a successful request.

Source code in phytrace/reliability/retry.py
async def record_success(self) -> None:
    """Record a successful request."""
    async with self._lock:
        if self._state == CircuitState.HALF_OPEN:
            self._half_open_successes += 1

            # If enough successes, close the circuit
            if self._half_open_successes >= self.half_open_requests:
                self._state = CircuitState.CLOSED
                self._failure_count = 0
                self._half_open_successes = 0
                self._half_open_failures = 0

        elif self._state == CircuitState.CLOSED:
            # Reset failure count on success
            self._failure_count = 0

record_failure async

record_failure() -> None

Record a failed request.

Source code in phytrace/reliability/retry.py
async def record_failure(self) -> None:
    """Record a failed request."""
    async with self._lock:
        self._last_failure_time = time.time()

        if self._state == CircuitState.HALF_OPEN:
            self._half_open_failures += 1

            # If any failure in half-open, open the circuit
            self._state = CircuitState.OPEN

        elif self._state == CircuitState.CLOSED:
            self._failure_count += 1

            # Check if threshold reached
            if self._failure_count >= self.failure_threshold:
                self._state = CircuitState.OPEN

reset async

reset() -> None

Reset the circuit breaker to closed state.

Source code in phytrace/reliability/retry.py
async def reset(self) -> None:
    """Reset the circuit breaker to closed state."""
    async with self._lock:
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
        self._half_open_successes = 0
        self._half_open_failures = 0

execute async

execute(func: Callable[[], T]) -> T

Execute a function with circuit breaker protection.

PARAMETER DESCRIPTION
func

Async function to execute

TYPE: Callable[[], T]

RETURNS DESCRIPTION
T

Result of the function

RAISES DESCRIPTION
CircuitOpenError

If circuit is open

Exception

If function raises an error

Source code in phytrace/reliability/retry.py
async def execute(
    self,
    func: Callable[[], T],
) -> T:
    """
    Execute a function with circuit breaker protection.

    Args:
        func: Async function to execute

    Returns:
        Result of the function

    Raises:
        CircuitOpenError: If circuit is open
        Exception: If function raises an error
    """
    if not await self.allow_request():
        raise CircuitOpenError("Circuit breaker is open")

    try:
        result = await func()
        await self.record_success()
        return result
    except Exception as e:
        await self.record_failure()
        raise

Batching

BatchProcessor

BatchProcessor

BatchProcessor(config: BatchConfig | None = None, on_batch_ready: Callable[[Batch], None] | None = None)

Batches and compresses events for efficient transmission.

Collects events into batches based on: - Maximum batch size - Maximum batch age

When either limit is reached, the batch is flushed using the provided callback.

Initialize the batch processor.

PARAMETER DESCRIPTION
config

Batch configuration

TYPE: BatchConfig | None DEFAULT: None

on_batch_ready

Callback when a batch is ready for sending

TYPE: Callable[[Batch], None] | None DEFAULT: None

Source code in phytrace/reliability/batch.py
def __init__(
    self,
    config: BatchConfig | None = None,
    on_batch_ready: Callable[[Batch], None] | None = None,
):
    """
    Initialize the batch processor.

    Args:
        config: Batch configuration
        on_batch_ready: Callback when a batch is ready for sending
    """
    self.config = config or BatchConfig()
    self.on_batch_ready = on_batch_ready

    self._current_batch: list[UDMEvent] = []
    self._batch_created_at: float | None = None
    self._lock = asyncio.Lock()
    self._flush_task: asyncio.Task | None = None
    self._running = False

Attributes

pending_count property

pending_count: int

Number of events waiting to be batched.

has_pending property

has_pending: bool

Check if there are pending events.

Functions

start async

start() -> None

Start the batch processor.

Source code in phytrace/reliability/batch.py
async def start(self) -> None:
    """Start the batch processor."""
    self._running = True
    self._flush_task = asyncio.create_task(self._flush_loop())

stop async

stop() -> None

Stop the batch processor and flush remaining events.

Source code in phytrace/reliability/batch.py
async def stop(self) -> None:
    """Stop the batch processor and flush remaining events."""
    self._running = False
    if self._flush_task:
        self._flush_task.cancel()
        try:
            await self._flush_task
        except asyncio.CancelledError:
            pass

    # Flush any remaining events
    await self.flush()

add async

add(event: UDMEvent) -> Batch | None

Add an event to the current batch.

PARAMETER DESCRIPTION
event

The event to add

TYPE: UDMEvent

RETURNS DESCRIPTION
Batch | None

A Batch if one was flushed, None otherwise

Source code in phytrace/reliability/batch.py
async def add(self, event: UDMEvent) -> Batch | None:
    """
    Add an event to the current batch.

    Args:
        event: The event to add

    Returns:
        A Batch if one was flushed, None otherwise
    """
    async with self._lock:
        # Initialize batch timestamp if needed
        if self._batch_created_at is None:
            self._batch_created_at = time.time()

        self._current_batch.append(event)

        # Check if batch is full
        if len(self._current_batch) >= self.config.max_batch_size:
            return await self._flush_batch()

        return None

flush async

flush() -> Batch | None

Flush the current batch.

RETURNS DESCRIPTION
Batch | None

The flushed batch, or None if empty

Source code in phytrace/reliability/batch.py
async def flush(self) -> Batch | None:
    """
    Flush the current batch.

    Returns:
        The flushed batch, or None if empty
    """
    async with self._lock:
        return await self._flush_batch()

BatchConfig

BatchConfig dataclass

BatchConfig(max_batch_size: int = 100, max_batch_age_ms: int = 1000, compression: CompressionType = GZIP, compression_level: int = 6)

Configuration for batch processor.