Base de Datos¶
Capa de acceso a datos definida en database.py. Usa psycopg2 con connection pool y soporte para PgBouncer en modo transaccional.
Pool de Conexiones¶
# database.py
connection_pool = SimpleConnectionPool(
minconn=5,
maxconn=50,
dsn=DATABASE_URL,
cursor_factory=RealDictCursor
)
El pool se inicializa al arrancar la aplicacion en el evento startup de FastAPI.
Multi-Tenant: schema_name¶
Regla Critica
Todas las funciones de BD reciben schema_name como parametro keyword-only (despues de *). Esto es obligatorio para evitar SQL injection y tenant leakage.
# CORRECTO
result = execute_query("SELECT ...", schema_name=schema_name)
# INCORRECTO - causa TypeError en runtime
result = execute_query("SELECT ...", schema_name)
Como funciona¶
- El
TenantMiddlewareextraeschema_namedel headerX-Tenant-Schema - El endpoint lo obtiene via
Depends(get_tenant_schema) - Se pasa a funciones de BD como
schema_name=schema_name get_db_connection()ejecutaSET search_path TO "{schema}", public
Funciones Principales¶
get_db_connection¶
Context manager que obtiene una conexion del pool con schema configurado.
@contextmanager
def get_db_connection(schema_name: str):
"""
Obtiene conexion con SET search_path al schema.
Reutiliza conexion en llamadas anidadas (via ContextVar).
"""
Caracteristicas:
- Valida
schema_namecontra SQL injection - Reutiliza conexion activa en llamadas anidadas (ContextVar)
- Usa
SET LOCALen modo PgBouncer transaccional - Reset de
search_pathantes de devolver al pool - Retry automatico si conexion cerrada
get_db_cursor¶
Context manager para obtener cursor con auto-commit opcional y contexto de auditoria.
@contextmanager
def get_db_cursor(
*,
commit: bool = False,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None
):
execute_query¶
Ejecuta SELECT con retry automatico.
def execute_query(
query: str,
params: tuple = None,
fetch: bool = True,
fetch_one: bool = False,
retry_count: int = 2,
*,
schema_name: str
) -> Optional[list]:
| Parametro | Tipo | Descripcion |
|---|---|---|
query |
str |
Query SQL |
params |
tuple |
Parametros para %s |
fetch |
bool |
Si True, retorna resultados |
fetch_one |
bool |
Si True, retorna solo 1 registro |
retry_count |
int |
Intentos en caso de error de conexion |
schema_name |
str |
keyword-only - Schema del tenant |
Retorna: dict (si fetch_one) o list[dict] (si fetch) o None.
execute_update¶
Ejecuta INSERT/UPDATE/DELETE con commit automatico.
def execute_update(
query: str,
params: tuple = None,
*,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None
) -> bool:
execute_transaction¶
Context manager para transacciones atomicas con multiples operaciones.
@contextmanager
def execute_transaction(
*,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None
):
Uso:
with execute_transaction(schema_name="200_muni") as (conn, cursor):
cursor.execute("INSERT INTO ...", params1)
cursor.execute("UPDATE ...", params2)
# Auto-commit si no hay excepciones
# Auto-rollback en caso de error
execute_single_update¶
Ejecuta una operacion individual con soporte para RETURNING.
def execute_single_update(
query: str,
params: tuple = None,
returning: bool = False,
*,
schema_name: str,
user_id: Optional[str] = None,
auth_source: Optional[str] = None
):
Retorna: {"status": "success", "rows_affected": N} o con "result" si returning=True.
Validacion de Schema¶
Valida que el schema sea seguro para SQL:
- No vacio ni None
- Solo letras, numeros y guion bajo (
^[a-zA-Z0-9_]+$) - Ejemplos validos:
"200_muni","public","municipio_abc"
PgBouncer Transaction Mode¶
El backend soporta PgBouncer en modo transaccional para manejar 300-500 conexiones concurrentes:
| Modo | Comando | Comportamiento |
|---|---|---|
| PostgreSQL directo | SET search_path |
Persiste en la sesion |
| PgBouncer transaction | SET LOCAL search_path |
Se resetea al fin de transaccion |
Configuracion via variable de entorno:
Funciones de Validacion¶
def check_user_exists(user_id: str, *, schema_name: str) -> bool:
def check_document_exists(document_id: str, *, schema_name: str) -> bool:
def get_user_basic_info(user_id: str, *, schema_name: str) -> Optional[dict]:
def get_document_basic_info(document_id: str, *, schema_name: str) -> Optional[dict]:
Numeracion de Expedientes¶
Usa Advisory Lock (pg_advisory_xact_lock(999999)) para serializar acceso y evitar race conditions en la numeracion de expedientes.
Formato de numero: EE-{año}-{secuencia:06d}-{organizacion}-{departamento}
Contexto de Auditoria¶
Cuando se pasan user_id y auth_source a las funciones de BD, se inyectan como GUC (Grand Unified Configuration) de PostgreSQL:
Esto permite que los triggers de auditoria en la BD registren quien hizo cada operacion.