Files
TDI-Dashboard/Tool/server.py
T
Luca Banfi e9d07162d9
ci/woodpecker/push/woodpecker Pipeline was successful
ci/woodpecker/tag/woodpecker Pipeline failed
lint syntax fixes
2026-05-18 16:25:52 +02:00

357 lines
16 KiB
Python

import json
import os
import secrets
from datetime import datetime
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, parse_qs, unquote_plus
from .config import _state, _access_log, DbType, _auth, _sessions, LOGS_PER_PAGE, STEPS_ROWS
from .env_manager import update_auth as _update_env_auth
from .db_finder import _find_db, _pick_db_file
from .pages import (
_page_dashboard, _page_runs, _page_pec, _page_logs,
_page_report, _page_documenti, _page_iscrizioni,
_page_api_iscrizioni, _page_sharepoint, _page_email,
_page_server_logs, _page_schema, _page_login,
)
from .queries.db import e as _e, query as _query, detect_db as _detect_db
from .queries import documenti as q_documenti
from .queries import iscrizioni as q_iscrizioni
from .queries import logs as q_logs_api
_COOKIE_NAME = 'rpa_session'
def _get_session_token(cookie_header: str) -> str | None:
"""Extract session token from Cookie header."""
if not cookie_header:
return None
for part in cookie_header.split(';'):
part = part.strip()
if part.startswith(_COOKIE_NAME + '='):
return part[len(_COOKIE_NAME) + 1:]
return None
def _is_authenticated(cookie_header: str) -> bool:
if not _auth['enabled']:
return True
token = _get_session_token(cookie_header)
return token is not None and token in _sessions
def _refresh_db_type(db_path: str) -> 'DbType':
db_type = _detect_db(db_path)
_state['db_type'] = db_type
return db_type
def make_handler(db_path: str):
class ReportHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {fmt % args}")
def _send(self, code: int, body: str, content_type: str = 'text/html; charset=utf-8'):
encoded = body.encode('utf-8')
try:
self.send_response(code)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', str(len(encoded)))
self.end_headers()
self.wfile.write(encoded)
except (ConnectionAbortedError, BrokenPipeError, ConnectionResetError):
return
_access_log.append({
'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
'method': self.command,
'path': self.path,
'code': code,
'client': self.client_address[0],
})
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path == '/login':
length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(length).decode('utf-8')
fields = {}
for part in body.split('&'):
if '=' in part:
k, v = part.split('=', 1)
fields[unquote_plus(k)] = unquote_plus(v)
user = fields.get('username', '')
pwd = fields.get('password', '')
if user == _auth['user'] and pwd == _auth['password']:
token = secrets.token_hex(32)
_sessions.add(token)
self.send_response(302)
self.send_header('Location', '/')
self.send_header('Set-Cookie', f'{_COOKIE_NAME}={token}; Path=/; HttpOnly; SameSite=Strict')
self.end_headers()
else:
self.send_response(302)
self.send_header('Location', '/login?error=1')
self.end_headers()
_access_log.append({
'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
'method': 'POST', 'path': '/login',
'code': 302, 'client': self.client_address[0],
})
elif parsed.path == '/settings/login':
if not _is_authenticated(self.headers.get('Cookie', '')):
self.send_response(302)
self.send_header('Location', '/login')
self.end_headers()
return
length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(length).decode('utf-8')
fields = {}
for part in body.split('&'):
if '=' in part:
k, v = part.split('=', 1)
fields[unquote_plus(k)] = unquote_plus(v)
action = fields.get('action', '')
if action == 'disable':
_update_env_auth(enabled=False)
_sessions.clear()
elif action == 'enable':
new_user = fields.get('new_user', '').strip()
new_pwd = fields.get('new_password', '').strip()
if new_user and new_pwd:
_update_env_auth(enabled=True, user=new_user, password=new_pwd)
self.send_response(302)
self.send_header('Location', '/')
self.end_headers()
_access_log.append({
'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
'method': 'POST', 'path': '/settings/login',
'code': 302, 'client': self.client_address[0],
})
else:
self._send(405, '<h1>405 Method Not Allowed</h1>')
def do_GET(self):
parsed_path = urlparse(self.path).path
# Login / logout routes (no auth required)
if parsed_path == '/login':
qs = parse_qs(urlparse(self.path).query)
error = bool(qs.get('error'))
self._send(200, _page_login(error=error))
return
if parsed_path == '/logout':
token = _get_session_token(self.headers.get('Cookie', ''))
if token and token in _sessions:
_sessions.discard(token)
self.send_response(302)
self.send_header('Location', '/login')
self.send_header('Set-Cookie', f'{_COOKIE_NAME}=; Path=/; HttpOnly; Max-Age=0')
self.end_headers()
_access_log.append({
'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
'method': 'GET', 'path': '/logout',
'code': 302, 'client': self.client_address[0],
})
return
# Auth gate
if not _is_authenticated(self.headers.get('Cookie', '')):
self.send_response(302)
self.send_header('Location', '/login')
self.end_headers()
_access_log.append({
'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
'method': 'GET', 'path': self.path,
'code': 302, 'client': self.client_address[0],
})
return
if self.path == '/change-db':
chosen = _pick_db_file()
if chosen and os.path.exists(chosen):
_state['db_path'] = chosen
_refresh_db_type(chosen)
self.send_response(302)
self.send_header('Location', '/')
self.end_headers()
_access_log.append({
'ts': datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
'method': self.command, 'path': self.path,
'code': 302, 'client': self.client_address[0],
})
return
db_path = _state['db_path']
db_type = _state.get('db_type', 'unknown')
if self.path == '/api/documenti' and db_type != DbType.INTRAZ:
try:
rows = q_documenti.get_documenti(db_path)
html = q_documenti.render_page(rows)
ts = datetime.now().strftime('%d/%m/%Y %H:%M:%S')
self._send(200, json.dumps({'html': html, 'ts': ts}), 'application/json; charset=utf-8')
except Exception as exc:
self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8')
return
if self.path.startswith('/api/docs') and db_type != DbType.INTRAZ:
qs = parse_qs(urlparse(self.path).query)
sid = qs.get('sessione_id', [None])[0]
if sid:
try:
rows = _query(
db_path,
"SELECT subfolder, filename, doc_status, sp_remote_path, sp_web_url, uploaded_at "
"FROM sessione_documenti WHERE sessione_id=? ORDER BY subfolder, filename",
(int(sid),)
)
self._send(200, json.dumps(rows), 'application/json; charset=utf-8')
except Exception as exc:
self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8')
else:
self._send(400, json.dumps({'error': 'missing sessione_id'}), 'application/json; charset=utf-8')
return
if self.path == '/api/iscrizioni' and db_type == DbType.INTRAZ:
try:
rows = q_iscrizioni.get_iscrizioni(db_path)
ts = datetime.now().strftime('%d/%m/%Y %H:%M:%S')
self._send(200, json.dumps({'rows': rows, 'ts': ts}), 'application/json; charset=utf-8')
except Exception as exc:
self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8')
return
if urlparse(self.path).path == '/api/logs':
qs_api = parse_qs(urlparse(self.path).query)
raw_r = qs_api.get('run', [None])[0]
raw_p = qs_api.get('page', [None])[0]
search = (qs_api.get('q', [None])[0] or '').strip()
run_id = int(raw_r) if raw_r and raw_r.isdigit() else None
if run_id is None:
self._send(400, json.dumps({'error': 'missing run'}), 'application/json; charset=utf-8')
return
try:
per_page = LOGS_PER_PAGE
total = q_logs_api.get_logs_by_run_count(db_path, run_id, search=search)
total_pages = max(1, (total + per_page - 1) // per_page)
page = max(1, min(int(raw_p) if raw_p and raw_p.isdigit() else 1, total_pages))
rows = q_logs_api.get_logs_by_run(
db_path, run_id, page=page, per_page=per_page, search=search
)
self._send(200, json.dumps({
'html': q_logs_api.render_table(rows, offset=(page - 1) * per_page),
'page': page,
'total_pages': total_pages,
'total': total,
}), 'application/json; charset=utf-8')
except Exception as exc:
self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8')
return
if urlparse(self.path).path == '/api/steps' and db_type == DbType.INTRAZ:
qs_api = parse_qs(urlparse(self.path).query)
raw_p = qs_api.get('page', [None])[0]
try:
per_page = STEPS_ROWS
total = q_iscrizioni.get_corsi_count(db_path)
total_pages = max(1, (total + per_page - 1) // per_page)
page = max(1, min(int(raw_p) if raw_p and raw_p.isdigit() else 1, total_pages))
offset = (page - 1) * per_page
rows = q_iscrizioni.get_corsi(db_path, limit=per_page, offset=offset)
self._send(200, json.dumps({
'html': q_iscrizioni.render_table(rows),
'page': page,
'total_pages': total_pages,
'total': total,
}), 'application/json; charset=utf-8')
except Exception as exc:
self._send(500, json.dumps({'error': str(exc)}), 'application/json; charset=utf-8')
return
parsed = urlparse(self.path)
qs = parse_qs(parsed.query)
path = parsed.path
def _logs_page():
raw = qs.get('run', [None])[0]
raw_page = qs.get('page', [None])[0]
run_id = int(raw) if raw and raw.isdigit() else None
page = int(raw_page) if raw_page and raw_page.isdigit() else 1
return _page_logs(db_path, run_id=run_id, page=page)
routes = {
'/': lambda: _page_dashboard(db_path),
'/runs': lambda: _page_runs(db_path),
'/logs': _logs_page,
'/schema': lambda: _page_schema(db_path),
'/server-logs': lambda: _page_server_logs(db_path),
}
if db_type == DbType.INTRAZ:
page_val = qs.get('page', ['1'])[0]
routes['/steps'] = lambda: _page_iscrizioni(
db_path, page=int(page_val) if page_val.isdigit() else 1
)
routes['/iscrizioni-api'] = lambda: _page_api_iscrizioni(db_path)
routes['/sharepoint'] = lambda: _page_sharepoint(db_path)
routes['/email'] = lambda: _page_email(db_path)
else:
routes['/report'] = lambda: _page_report(db_path)
routes['/documenti'] = lambda: _page_documenti(db_path)
routes['/pec'] = lambda: _page_pec(db_path)
fn = routes.get(path)
if fn:
try:
self._send(200, fn())
except Exception as exc:
self._send(500, f"<pre>Errore: {_e(str(exc))}</pre>")
elif path == '/ping':
self._send(200, 'ok', 'text/plain; charset=utf-8')
else:
self._send(404, '<h1>404 Not Found</h1>')
return ReportHandler
def run_server(db_path: str, host: str = '0.0.0.0', port: int = 8473):
_state['db_path'] = db_path
db_type = _refresh_db_type(db_path)
handler = make_handler(db_path)
server = HTTPServer((host, port), handler)
display_host = 'localhost' if host in ('0.0.0.0', '') else host
base = f"http://{display_host}:{port}"
print(f"RPA Report server avviato [db: {db_type}]")
print(f" Dashboard : {base}/")
print(f" Processi : {base}/runs")
if db_type == DbType.INTRAZ:
print(f" RPA Steps : {base}/steps")
print(f" Iscr. API : {base}/iscrizioni-api")
print(f" SharePoint : {base}/sharepoint")
print(f" Email : {base}/email")
else:
print(f" PEC : {base}/pec")
print(f" Documenti : {base}/documenti")
print(f" Report : {base}/report")
print(f" Schema : {base}/schema")
print(f" Log DB : {base}/logs")
print(f" Server log : {base}/server-logs")
print(f" DB : {db_path}")
print("Ctrl+C per fermare.\n")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nServer fermato.")
server.server_close()
def main():
from dotenv import load_dotenv
load_dotenv()
db_file = os.environ.get('RPA_DB_FILE', 'rpa_FORMAZIONE.db')
db_path = _find_db(db_file)
port = int(os.environ.get('RPA_REPORT_PORT', 8473))
run_server(db_path, port=port)