import json import os from datetime import datetime from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse, parse_qs from .config import _state, _access_log, DbType from .db_finder import _find_db, _pick_db_file from .pages import ( _page_dashboard, _page_runs, _page_pec, _page_logs, _page_report, _page_documenti, _page_iscrizioni, _page_email, _page_server_logs, ) from .queries.db import e as _e, query as _query, detect_db as _detect_db from .queries import documenti as q_documenti from .queries import iscrizioni as q_iscrizioni def _refresh_db_type(db_path: str) -> 'DbType': db_type = _detect_db(db_path) _state['db_type'] = db_type return db_type def make_handler(db_path: str): class ReportHandler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): print(f"[{datetime.now().strftime('%H:%M:%S')}] {fmt % args}") def _send(self, code: int, body: str, content_type: str = 'text/html; charset=utf-8'): encoded = body.encode('utf-8') self.send_response(code) self.send_header('Content-Type', content_type) self.send_header('Content-Length', str(len(encoded))) self.end_headers() self.wfile.write(encoded) _access_log.append({ 'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'), 'method': self.command, 'path': self.path, 'code': code, 'client': self.client_address[0], }) def do_GET(self): if self.path == '/change-db': chosen = _pick_db_file() if chosen and os.path.exists(chosen): _state['db_path'] = chosen _refresh_db_type(chosen) self.send_response(302) self.send_header('Location', '/') self.end_headers() _access_log.append({ 'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'), 'method': self.command, 'path': self.path, 'code': 302, 'client': self.client_address[0], }) return db_path = _state['db_path'] db_type = _state.get('db_type', 'unknown') if self.path == '/api/documenti' and db_type != DbType.INTRAZ: try: rows = q_documenti.get_documenti(db_path) html = q_documenti.render_page(rows) ts = datetime.now().strftime('%d/%m/%Y %H:%M:%S') self._send(200, json.dumps({'html': html, 'ts': ts}), 'application/json; charset=utf-8') except Exception as exc: self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8') return if self.path.startswith('/api/docs') and db_type != DbType.INTRAZ: qs = parse_qs(urlparse(self.path).query) sid = qs.get('sessione_id', [None])[0] if sid: try: rows = _query(db_path, "SELECT subfolder, filename, doc_status, sp_remote_path, sp_web_url, uploaded_at " "FROM sessione_documenti WHERE sessione_id=? ORDER BY subfolder, filename", (int(sid),) ) self._send(200, json.dumps(rows), 'application/json; charset=utf-8') except Exception as exc: self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8') else: self._send(400, json.dumps({'error': 'missing sessione_id'}), 'application/json; charset=utf-8') return if self.path == '/api/iscrizioni' and db_type == DbType.INTRAZ: try: rows = q_iscrizioni.get_iscrizioni(db_path) ts = datetime.now().strftime('%d/%m/%Y %H:%M:%S') self._send(200, json.dumps({'rows': rows, 'ts': ts}), 'application/json; charset=utf-8') except Exception as exc: self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8') return parsed = urlparse(self.path) qs = parse_qs(parsed.query) path = parsed.path def _logs_page(): raw = qs.get('run', [None])[0] run_id = int(raw) if raw and raw.isdigit() else None return _page_logs(db_path, run_id=run_id) routes = { '/': lambda: _page_dashboard(db_path), '/runs': lambda: _page_runs(db_path), '/logs': _logs_page, '/server-logs': lambda: _page_server_logs(db_path), } if db_type == DbType.INTRAZ: routes['/iscrizioni'] = lambda: _page_iscrizioni(db_path) routes['/email'] = lambda: _page_email(db_path) else: routes['/report'] = lambda: _page_report(db_path) routes['/documenti'] = lambda: _page_documenti(db_path) routes['/pec'] = lambda: _page_pec(db_path) fn = routes.get(path) if fn: try: self._send(200, fn()) except Exception as exc: self._send(500, f"
Errore: {_e(str(exc))}")
elif path == '/ping':
self._send(200, 'ok', 'text/plain; charset=utf-8')
else:
self._send(404, '