Core Layer
The core layer (src/core/) provides reusable infrastructure components with zero business logic. These components form the foundation upon which all services are built.
Components Overview
Section titled “Components Overview”| Component | File | Purpose |
|---|---|---|
| Pool | pool.py | PostgreSQL connection pooling |
| Brotr | brotr.py | Database interface with stored procedures |
| BaseService | base_service.py | Abstract service base class |
| Logger | logger.py | Structured logging |
The Pool component manages PostgreSQL connections using asyncpg with PGBouncer compatibility.
Key Features
Section titled “Key Features”- Async connection pool management
- Configurable pool size limits (1-100 connections)
- Retry logic with exponential backoff
- Connection health checking
- Environment variable password loading (
DB_PASSWORD) - Async context manager support
Configuration Model
Section titled “Configuration Model”class PoolConfig(BaseModel): database: DatabaseConfig # host, port, database, user limits: PoolLimitsConfig # min_size, max_size, max_queries timeouts: PoolTimeoutsConfig # acquisition, health_check retry: RetryConfig # max_attempts, delays, backoff server_settings: dict # application_name, timezonefrom core import Pool
# From YAML configurationpool = Pool.from_yaml("yaml/core/brotr.yaml")
# Using context manager (recommended)async with pool: result = await pool.fetch("SELECT * FROM relays LIMIT 10") count = await pool.fetchval("SELECT COUNT(*) FROM events")
# Manual lifecycleawait pool.connect()try: result = await pool.fetch("SELECT 1")finally: await pool.close()Connection Retry
Section titled “Connection Retry”Pool automatically retries failed connections with exponential backoff:
retry: max_attempts: 3 initial_delay: 1.0 # seconds max_delay: 10.0 # seconds exponential_backoff: trueBrotr is the high-level database interface that wraps Pool and provides stored procedure access.
Key Features
Section titled “Key Features”- Composition pattern: HAS-A Pool (publicly accessible)
- Stored procedure wrappers for all database operations
- Batch operations with configurable size limits
- Automatic hex-to-BYTEA conversion for event IDs
- Timeout configuration per operation type
Stored Procedures
Section titled “Stored Procedures”Brotr exposes these hardcoded procedure names for security:
PROC_INSERT_EVENT = "insert_event"PROC_INSERT_RELAY = "insert_relay"PROC_INSERT_RELAY_METADATA = "insert_relay_metadata"PROC_DELETE_ORPHAN_EVENTS = "delete_orphan_events"PROC_DELETE_ORPHAN_NIP11 = "delete_orphan_nip11"PROC_DELETE_ORPHAN_NIP66 = "delete_orphan_nip66"from core import Brotr
brotr = Brotr.from_yaml("yaml/core/brotr.yaml")
async with brotr: # Insert events (batch operation) count = await brotr.insert_events(events_list)
# Insert relays count = await brotr.insert_relays(relays_list)
# Insert metadata with deduplication count = await brotr.insert_relay_metadata(metadata_list)
# Cleanup orphaned records result = await brotr.cleanup_orphans()
# Access underlying pool for custom queries rows = await brotr.pool.fetch("SELECT * FROM relays WHERE network = $1", "tor")Batch Operations
Section titled “Batch Operations”Large datasets are automatically split into batches:
batch: max_batch_size: 1000 # Maximum items per batchBaseService
Section titled “BaseService”BaseService is the abstract base class for all services, providing common functionality.
Key Features
Section titled “Key Features”- Generic type parameter for configuration class
SERVICE_NAMEandCONFIG_CLASSclass attributes- State persistence via
_load_state()/_save_state() - Continuous operation via
run_forever(interval) - Factory methods:
from_yaml(),from_dict() - Graceful shutdown via
request_shutdown()
Interface
Section titled “Interface”class BaseService(ABC, Generic[ConfigT]): SERVICE_NAME: str # Unique identifier for state persistence CONFIG_CLASS: type[ConfigT] # For automatic config parsing
_brotr: Brotr # Database interface _config: ConfigT # Pydantic configuration _state: dict[str, Any] # Persisted state (JSONB in database)
@abstractmethod async def run(self) -> None: """Single cycle logic - must be implemented by subclasses.""" pass
async def run_forever(self, interval: float) -> None: """Continuous loop with configurable interval.""" pass
async def health_check(self) -> bool: """Database connectivity check.""" pass
def request_shutdown(self) -> None: """Sync-safe shutdown trigger for signal handlers.""" pass
async def wait(self, timeout: float) -> bool: """Interruptible sleep - returns True if shutdown requested.""" passState Persistence
Section titled “State Persistence”Services can persist arbitrary state to the database:
# State is automatically loaded on context enterasync with service: # Access state last_sync = self._state.get("last_sync_timestamp", 0)
# Modify state self._state["last_sync_timestamp"] = current_time
# State is automatically saved on context exitState is stored in the service_state table as JSONB:
CREATE TABLE service_state ( service_name TEXT PRIMARY KEY, state JSONB NOT NULL DEFAULT '{}', updated_at BIGINT NOT NULL);Creating a Service
Section titled “Creating a Service”from pydantic import BaseModel, Fieldfrom core import BaseService, Brotr, Logger
SERVICE_NAME = "myservice"
class MyServiceConfig(BaseModel): interval: float = Field(default=300.0, ge=60.0) some_setting: str = Field(default="value")
class MyService(BaseService[MyServiceConfig]): SERVICE_NAME = SERVICE_NAME CONFIG_CLASS = MyServiceConfig
def __init__(self, brotr: Brotr, config: MyServiceConfig | None = None): super().__init__(brotr=brotr, config=config or MyServiceConfig()) self._logger = Logger(SERVICE_NAME)
async def run(self) -> None: """Single cycle implementation.""" self._logger.info("cycle_started")
# Your service logic here await self._do_work()
self._logger.info("cycle_completed")
async def _do_work(self) -> None: # Access config: self._config.some_setting # Access database: self._brotr.pool.fetch(...) # Access state: self._state["key"] passLogger
Section titled “Logger”Logger provides structured key=value logging for machine parsing.
from core import Logger
logger = Logger("synchronizer")
# Info level with key=value pairslogger.info("sync_completed", events=1500, duration=45.2, relay="wss://relay.example.com")# Output: 2025-01-01 12:00:00 INFO synchronizer: sync_completed events=1500 duration=45.2 relay=wss://relay.example.com
# Error levellogger.error("connection_failed", relay="wss://relay.example.com", error="timeout")
# Debug levellogger.debug("processing_event", event_id="abc123")
# Warning levellogger.warning("slow_relay", relay="wss://relay.example.com", rtt=5000)Log Levels
Section titled “Log Levels”Configure log level via CLI:
python -m services finder --log-level DEBUGModule Exports
Section titled “Module Exports”The core package exports all public components:
from core.pool import Pool, PoolConfigfrom core.brotr import Brotr, BrotrConfigfrom core.base_service import BaseServicefrom core.logger import Logger
__all__ = [ "Pool", "PoolConfig", "Brotr", "BrotrConfig", "BaseService", "Logger",]Next Steps
Section titled “Next Steps”- Explore the Service Layer
- Learn about Configuration
- Understand the Database Schema