118 lines
3.3 KiB
Python
118 lines
3.3 KiB
Python
"""queries/db.py — helper condiviso per query SQLite / PostgreSQL"""
|
|
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from ..config import DbType
|
|
|
|
|
|
def _pg_query(sql: str, params: tuple = ()) -> list:
|
|
"""Execute a query against PostgreSQL using connection params from _conn."""
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from ..config import _conn
|
|
conn = psycopg2.connect(
|
|
host=_conn['pg_host'],
|
|
port=int(_conn['pg_port'] or 5432),
|
|
dbname=_conn['pg_db'],
|
|
user=_conn['pg_user'],
|
|
password=_conn['pg_password'],
|
|
connect_timeout=10,
|
|
)
|
|
try:
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
# SQLite uses ? placeholders; psycopg2 uses %s
|
|
pg_sql = sql.replace('?', '%s')
|
|
cur.execute(pg_sql, params)
|
|
return [dict(r) for r in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def query(db_path: str, sql: str, params: tuple = ()) -> list:
|
|
from ..config import _conn, ConnType
|
|
if _conn['type'] == ConnType.POSTGRES:
|
|
return _pg_query(sql, params)
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(sql, params)
|
|
return [dict(r) for r in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def count(db_path: str, sql: str) -> int:
|
|
rows = query(db_path, sql)
|
|
return rows[0]['n'] if rows else 0
|
|
|
|
|
|
def count_q(db_path: str, sql: str, params: tuple = ()) -> int:
|
|
rows = query(db_path, sql, params)
|
|
return rows[0]['n'] if rows else 0
|
|
|
|
|
|
# ------------------------------------------------------------------ HTML utils
|
|
|
|
def e(v) -> str:
|
|
if v is None:
|
|
return '-'
|
|
return str(v).replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"')
|
|
|
|
|
|
def dt(v) -> str:
|
|
if not v:
|
|
return '-'
|
|
try:
|
|
return datetime.fromisoformat(str(v)).strftime('%d/%m/%Y %H:%M:%S')
|
|
except Exception:
|
|
return str(v)
|
|
|
|
|
|
def d(v) -> str:
|
|
if not v:
|
|
return '-'
|
|
try:
|
|
return datetime.fromisoformat(str(v)).strftime('%d/%m/%Y')
|
|
except Exception:
|
|
return str(v)
|
|
|
|
|
|
def dur(start, finish) -> str:
|
|
if not start or not finish:
|
|
return '-'
|
|
try:
|
|
s = int((datetime.fromisoformat(str(finish)) - datetime.fromisoformat(str(start))).total_seconds())
|
|
return f"{s // 60}m {s % 60}s" if s >= 60 else f"{s}s"
|
|
except Exception:
|
|
return '-'
|
|
|
|
|
|
def badge(cls: str, text: str) -> str:
|
|
return f'<span class="badge {cls}">{text}</span>'
|
|
|
|
|
|
def detect_db(db_path: str) -> 'DbType':
|
|
"""Returns REG_LOMB, INTRAZ, or UNKNOWN based on DB schema.
|
|
|
|
reg_Lomb (db_reg_lombardia) → has table 'sessione_documenti'
|
|
Intraz (db_corsi_intraziendali) → has table 'rpa_intra_api_iscrizione'
|
|
"""
|
|
from ..config import DbType, _conn, ConnType
|
|
if _conn['type'] == ConnType.POSTGRES:
|
|
rows = _pg_query(
|
|
"SELECT table_name AS name FROM information_schema.tables "
|
|
"WHERE table_schema='public'"
|
|
)
|
|
else:
|
|
rows = query(db_path, "SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = {r['name'] for r in rows}
|
|
if 'sessione_documenti' in tables:
|
|
return DbType.REG_LOMB
|
|
if 'rpa_intra_api_iscrizione' in tables:
|
|
return DbType.INTRAZ
|
|
return DbType.UNKNOWN
|