Reliability¶
Edge Buffer¶
EdgeBuffer ¶
EdgeBuffer(storage_path: str | Path, max_size_mb: int = 1000, max_age_hours: int = 168, encryption_key: bytes | None = None)
Persistent local buffer for offline operation.
Stores events to disk when the transport is unavailable, and flushes them when connection is restored.
Features: - FIFO queue semantics - Configurable size and age limits - Optional encryption - Atomic writes
Initialize the edge buffer.
| PARAMETER | DESCRIPTION |
|---|---|
storage_path | Directory path for buffer storage TYPE: |
max_size_mb | Maximum buffer size in MB TYPE: |
max_age_hours | Maximum event age in hours TYPE: |
encryption_key | Optional encryption key (not implemented) TYPE: |
Source code in phytrace/reliability/buffer.py
Functions¶
enqueue async ¶
enqueue(event: UDMEvent) -> bool
Add an event to the buffer.
| PARAMETER | DESCRIPTION |
|---|---|
event | The UDM event to buffer TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
bool | True if successfully buffered, False if buffer is full |
Source code in phytrace/reliability/buffer.py
dequeue async ¶
dequeue(max_events: int = 100) -> list[UDMEvent]
Get events from the buffer (oldest first).
| PARAMETER | DESCRIPTION |
|---|---|
max_events | Maximum number of events to retrieve TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[UDMEvent] | List of UDM events (does not remove them from buffer) |
Source code in phytrace/reliability/buffer.py
remove async ¶
Remove specific events from the buffer.
| PARAMETER | DESCRIPTION |
|---|---|
event_ids | List of event IDs to remove TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
int | Number of events removed |
Source code in phytrace/reliability/buffer.py
flush async ¶
flush(transport: Transport, batch_size: int = 100) -> BatchSendResult
Send all buffered events when connection is available.
| PARAMETER | DESCRIPTION |
|---|---|
transport | The transport to use for sending TYPE: |
batch_size | Number of events to send per batch TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
BatchSendResult | Combined BatchSendResult for all events |
Source code in phytrace/reliability/buffer.py
get_stats async ¶
get_stats() -> BufferStats
clear async ¶
Clear all buffered events.
Source code in phytrace/reliability/buffer.py
BufferStats¶
BufferStats dataclass ¶
Retry & Circuit Breaker¶
RetryPolicy¶
RetryPolicy dataclass ¶
RetryPolicy(max_retries: int = 5, initial_delay_ms: int = 100, max_delay_ms: int = 30000, exponential_base: float = 2.0, jitter: bool = True, retryable_errors: set[ErrorType] = (lambda: {NETWORK_ERROR, TIMEOUT, SERVER_ERROR, RATE_LIMITED})())
Configurable retry logic with exponential backoff.
Implements exponential backoff with optional jitter to prevent thundering herd problems.
Functions¶
get_delay ¶
Calculate delay for given attempt number.
| PARAMETER | DESCRIPTION |
|---|---|
attempt | Attempt number (0-indexed) TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
float | Delay in seconds |
Source code in phytrace/reliability/retry.py
should_retry ¶
Determine if a retry should be attempted.
| PARAMETER | DESCRIPTION |
|---|---|
error_type | Type of error that occurred TYPE: |
attempt | Current attempt number (0-indexed) TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
bool | True if should retry, False otherwise |
Source code in phytrace/reliability/retry.py
execute_with_retry async ¶
execute_with_retry(func: Callable[[], T], on_retry: Callable[[int, Exception], None] | None = None) -> T
Execute a function with retry logic.
| PARAMETER | DESCRIPTION |
|---|---|
func | Async function to execute TYPE: |
on_retry | Optional callback on each retry TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
T | Result of the function |
| RAISES | DESCRIPTION |
|---|---|
Exception | If all retries exhausted |
Source code in phytrace/reliability/retry.py
CircuitBreaker¶
CircuitBreaker ¶
CircuitBreaker(failure_threshold: int = 5, recovery_timeout_sec: float = 60.0, half_open_requests: int = 3)
Circuit breaker to prevent cascade failures.
The circuit breaker has three states: - CLOSED: Normal operation, requests pass through - OPEN: Failing, all requests are rejected immediately - HALF_OPEN: Testing, limited requests allowed to check recovery
Transitions: - CLOSED -> OPEN: When failure threshold is reached - OPEN -> HALF_OPEN: After recovery timeout - HALF_OPEN -> CLOSED: If test requests succeed - HALF_OPEN -> OPEN: If test requests fail
Initialize the circuit breaker.
| PARAMETER | DESCRIPTION |
|---|---|
failure_threshold | Number of failures before opening circuit TYPE: |
recovery_timeout_sec | Time to wait before trying recovery TYPE: |
half_open_requests | Number of test requests in half-open state TYPE: |
Source code in phytrace/reliability/retry.py
Attributes¶
Functions¶
allow_request async ¶
Check if a request should be allowed.
| RETURNS | DESCRIPTION |
|---|---|
bool | True if request should proceed, False if rejected |
Source code in phytrace/reliability/retry.py
record_success async ¶
Record a successful request.
Source code in phytrace/reliability/retry.py
record_failure async ¶
Record a failed request.
Source code in phytrace/reliability/retry.py
reset async ¶
Reset the circuit breaker to closed state.
Source code in phytrace/reliability/retry.py
execute async ¶
Execute a function with circuit breaker protection.
| PARAMETER | DESCRIPTION |
|---|---|
func | Async function to execute TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
T | Result of the function |
| RAISES | DESCRIPTION |
|---|---|
CircuitOpenError | If circuit is open |
Exception | If function raises an error |
Source code in phytrace/reliability/retry.py
Batching¶
BatchProcessor¶
BatchProcessor ¶
BatchProcessor(config: BatchConfig | None = None, on_batch_ready: Callable[[Batch], None] | None = None)
Batches and compresses events for efficient transmission.
Collects events into batches based on: - Maximum batch size - Maximum batch age
When either limit is reached, the batch is flushed using the provided callback.
Initialize the batch processor.
| PARAMETER | DESCRIPTION |
|---|---|
config | Batch configuration TYPE: |
on_batch_ready | Callback when a batch is ready for sending TYPE: |
Source code in phytrace/reliability/batch.py
Attributes¶
Functions¶
start async ¶
stop async ¶
Stop the batch processor and flush remaining events.
Source code in phytrace/reliability/batch.py
add async ¶
add(event: UDMEvent) -> Batch | None
Add an event to the current batch.
| PARAMETER | DESCRIPTION |
|---|---|
event | The event to add TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Batch | None | A Batch if one was flushed, None otherwise |
Source code in phytrace/reliability/batch.py
flush async ¶
Flush the current batch.
| RETURNS | DESCRIPTION |
|---|---|
Batch | None | The flushed batch, or None if empty |
BatchConfig¶
BatchConfig dataclass ¶
BatchConfig(max_batch_size: int = 100, max_batch_age_ms: int = 1000, compression: CompressionType = GZIP, compression_level: int = 6)
Configuration for batch processor.