302 lines
12 KiB
Python
302 lines
12 KiB
Python
import json as _json
|
|
from .config import _state, _access_log, DbType, LOGS_PER_PAGE, STEPS_ROWS
|
|
from .renderer import _base, _card, _page_css, _page_js, _tmpl, _css
|
|
from .queries.db import e as _e, query as _query
|
|
from .queries import processes as q_processes
|
|
from .queries import pec as q_pec
|
|
from .queries import logs as q_logs
|
|
from .queries import report as q_report
|
|
from .queries import documenti as q_documenti
|
|
from .queries import iscrizioni as q_iscrizioni
|
|
from .queries import api_iscrizioni as q_api_iscrizioni
|
|
from .queries import email as q_email
|
|
from .queries import sharepoint as q_sharepoint
|
|
|
|
|
|
def _load_data(db_path: str) -> dict:
|
|
db_type = _state.get('db_type', 'unknown')
|
|
if db_type == DbType.INTRAZ:
|
|
middle_stats = {**q_iscrizioni.get_stats(db_path), **q_email.get_stats(db_path)}
|
|
else:
|
|
middle_stats = q_pec.get_stats(db_path)
|
|
stats = {
|
|
**q_processes.get_stats(db_path),
|
|
**middle_stats,
|
|
**q_logs.get_stats(db_path),
|
|
}
|
|
return {
|
|
'processes': q_processes.get_processes(db_path),
|
|
'pec': [] if db_type == DbType.INTRAZ else q_pec.get_pec(db_path),
|
|
'logs': q_logs.get_logs(db_path),
|
|
'stats': stats,
|
|
}
|
|
|
|
|
|
def _page_dashboard(db_path: str) -> str:
|
|
s = _load_data(db_path)['stats']
|
|
db_type = _state.get('db_type', 'unknown')
|
|
|
|
cards_runs = (
|
|
_card("Run totali", s['total_runs'])
|
|
+ _card("Completati", s['completed_runs'], '#16a34a')
|
|
+ _card("Incompleti", s['total_runs'] - s['completed_runs'], '#dc2626')
|
|
)
|
|
cards_logs = (
|
|
_card("Errori", s['log_errors'], '#dc2626')
|
|
+ _card("Warning", s['log_warnings'], '#d97706')
|
|
)
|
|
|
|
if db_type == DbType.INTRAZ:
|
|
cards_middle = (
|
|
_card("Corsi totali", s.get('isc_total', 0))
|
|
+ _card("Completati", s.get('isc_ok', 0), '#16a34a')
|
|
+ _card("Con errore", s.get('isc_errore', 0), '#dc2626')
|
|
+ _card("In corso", s.get('isc_pending', 0), '#d97706')
|
|
)
|
|
cards_email = (
|
|
_card("Email totali", s.get('email_total', 0))
|
|
+ _card("Inviate", s.get('email_inviata', 0), '#16a34a')
|
|
+ _card("Pending", s.get('email_pending', 0), '#d97706')
|
|
+ _card("Errori", s.get('email_errore', 0), '#dc2626')
|
|
)
|
|
section_middle = (
|
|
'<section>'
|
|
'<h2><a href="/steps" class="sec-link">📋 RPA Steps</a></h2>'
|
|
f'<div class="grid">{cards_middle}</div>'
|
|
'</section>'
|
|
'<section>'
|
|
'<h2><a href="/email" class="sec-link">✉ Email</a></h2>'
|
|
f'<div class="grid">{cards_email}</div>'
|
|
'</section>'
|
|
)
|
|
else:
|
|
cards_middle = (
|
|
_card("PEC totali", s.get('total_pec', 0))
|
|
+ _card("Inviate", s.get('pec_inviato', 0), '#16a34a')
|
|
+ _card("Pending", s.get('pec_pending', 0), '#d97706')
|
|
+ _card("Errori", s.get('pec_errore', 0), '#dc2626')
|
|
)
|
|
section_middle = (
|
|
'<section>'
|
|
'<h2><a href="/pec" class="sec-link">✉ Comunicazioni PEC</a></h2>'
|
|
f'<div class="grid">{cards_middle}</div>'
|
|
'</section>'
|
|
)
|
|
|
|
content = _tmpl('dashboard.html').substitute(
|
|
cards_runs=cards_runs, section_middle=section_middle, cards_logs=cards_logs
|
|
)
|
|
return _base('Dashboard', content, 'nav_dashboard', db_path, _page_css('dashboard'), _page_js('dashboard'))
|
|
|
|
|
|
def _page_runs(db_path: str) -> str:
|
|
content = _tmpl('runs.html').substitute(
|
|
tbl_processes=q_processes.render_table(q_processes.get_processes(db_path))
|
|
)
|
|
return _base('Processi', content, 'nav_runs', db_path, _page_css('runs'), _page_js('runs'))
|
|
|
|
|
|
def _page_pec(db_path: str) -> str:
|
|
content = _tmpl('pec.html').substitute(
|
|
tbl_pec=q_pec.render_table(q_pec.get_pec(db_path))
|
|
)
|
|
return _base('PEC', content, 'nav_pec', db_path, _page_css('pec'), _page_js('pec'))
|
|
|
|
|
|
def _page_logs(db_path: str, run_id: int = None, page: int = 1) -> str:
|
|
PER_PAGE = LOGS_PER_PAGE
|
|
|
|
runs = q_logs.get_last_run_ids(db_path)
|
|
run_ids = [r['rpa_process_id'] for r in runs]
|
|
run_names = {r['rpa_process_id']: r.get('process_name', 'unknown') for r in runs}
|
|
run_dates = {r['rpa_process_id']: str(r.get('start_run', ''))[:10] for r in runs}
|
|
|
|
if not run_ids:
|
|
content = _tmpl('logs.html').substitute(
|
|
run_info='nessun log', pagination='', pg_rows='',
|
|
tbl_logs='<p class="empty">Nessun log trovato.</p>'
|
|
)
|
|
return _base('Log DB', content, 'nav_logs', db_path, _page_css('logs'), _page_js('logs'))
|
|
|
|
if run_id is None or run_id not in run_ids:
|
|
run_id = run_ids[0]
|
|
|
|
idx = run_ids.index(run_id)
|
|
prev_id = run_ids[idx + 1] if idx + 1 < len(run_ids) else None
|
|
next_id = run_ids[idx - 1] if idx > 0 else None
|
|
|
|
total = q_logs.get_logs_by_run_count(db_path, run_id)
|
|
total_pages = max(1, (total + PER_PAGE - 1) // PER_PAGE)
|
|
page = max(1, min(page, total_pages))
|
|
|
|
# ── Row-level pagination (rendered above the run bar, async via JS) ──────
|
|
pg_rows = (
|
|
f'<div class="log-pg-rows" id="log-pg-rows"'
|
|
f' data-run="{run_id}" data-page="{page}"'
|
|
f' data-total-pages="{total_pages}" data-total="{total}">'
|
|
f'<button id="pg-row-prev" class="pg-btn"'
|
|
f'{" disabled" if page <= 1 else ""}>←</button>'
|
|
f'<span id="pg-row-info" class="pg-row-info">'
|
|
f'{page}/{total_pages}'
|
|
f' <span class="pg-row-count">({total} log)</span>'
|
|
f'</span>'
|
|
f'<button id="pg-row-next" class="pg-btn"'
|
|
f'{" disabled" if page >= total_pages else ""}>→</button>'
|
|
f'</div>'
|
|
)
|
|
|
|
# ── Run-level pagination (fixed bottom bar) ───────────────────────────────
|
|
prev_btn = (f'<a class="pg-btn" href="/logs?run={prev_id}">← Run</a>'
|
|
if prev_id else '<span class="pg-btn disabled">← Run</span>')
|
|
next_btn = (f'<a class="pg-btn" href="/logs?run={next_id}">Run →</a>'
|
|
if next_id else '<span class="pg-btn disabled">Run →</span>')
|
|
options = ''.join(
|
|
f'<option value="{rid}"'
|
|
f' data-process-name="{_e(run_names.get(rid, ""))}"'
|
|
f' data-date="{_e(run_dates.get(rid, ""))}"'
|
|
f' style="color:#1d4ed8"'
|
|
f'{" selected" if rid == run_id else ""}>'
|
|
f'Run #{rid} — {_e(run_names.get(rid, ""))}</option>'
|
|
for rid in run_ids
|
|
)
|
|
current_name = run_names.get(run_id, 'unknown')
|
|
pagination = (
|
|
f'<div class="log-pagination">'
|
|
f'{prev_btn}'
|
|
f'<select id="pg-run-select" class="pg-select" onchange="location.href=\'/logs?run=\'+this.value">{options}</select>'
|
|
f'{next_btn}'
|
|
f'</div>'
|
|
)
|
|
|
|
run_info = f'<span class="proc-name-badge">{_e(current_name)}</span> Run #{run_id}'
|
|
|
|
offset = (page - 1) * PER_PAGE
|
|
logs = q_logs.get_logs_by_run(db_path, run_id, page=page, per_page=PER_PAGE)
|
|
content = _tmpl('logs.html').substitute(
|
|
run_info=run_info,
|
|
pagination=pagination,
|
|
pg_rows=pg_rows,
|
|
tbl_logs=q_logs.render_table(logs, offset=offset),
|
|
)
|
|
return _base('Log DB', content, 'nav_logs', db_path, _page_css('logs'), _page_js('logs'))
|
|
|
|
|
|
def _page_report(db_path: str) -> str:
|
|
rows = q_report.get_report(db_path)
|
|
content = _tmpl('report.html').substitute(
|
|
tbl_report=q_report.render_table(rows)
|
|
)
|
|
return _base('Report', content, 'nav_report', db_path, _page_css('report'), _page_js('report'))
|
|
|
|
|
|
def _page_documenti(db_path: str) -> str:
|
|
rows = q_documenti.get_documenti(db_path)
|
|
content = _tmpl('documenti.html').substitute(
|
|
sessions_html=q_documenti.render_page(rows)
|
|
)
|
|
return _base('Documenti', content, 'nav_documenti', db_path, _page_css('documenti'), _page_js('documenti'))
|
|
|
|
|
|
def _page_iscrizioni(db_path: str, page: int = 1) -> str:
|
|
PER_PAGE = STEPS_ROWS
|
|
total = q_iscrizioni.get_corsi_count(db_path)
|
|
total_pages = max(1, (total + PER_PAGE - 1) // PER_PAGE)
|
|
page = max(1, min(page, total_pages))
|
|
offset = (page - 1) * PER_PAGE
|
|
rows = q_iscrizioni.get_corsi(db_path, limit=PER_PAGE, offset=offset)
|
|
|
|
prev_btn = (f'<button id="pg-steps-prev" class="pg-btn">←</button>'
|
|
if page > 1 else '<button id="pg-steps-prev" class="pg-btn" disabled>←</button>')
|
|
next_btn = (f'<button id="pg-steps-next" class="pg-btn">→</button>'
|
|
if page < total_pages else '<button id="pg-steps-next" class="pg-btn" disabled>→</button>')
|
|
pg_steps = (
|
|
f'<div class="steps-pagination" id="steps-pg-bar"'
|
|
f' data-page="{page}" data-total-pages="{total_pages}" data-total="{total}">'
|
|
f'{prev_btn}'
|
|
f'<span id="steps-pg-info" class="pg-info">'
|
|
f'Pagina {page}/{total_pages}'
|
|
f' <span class="pg-row-count">({total} corsi)</span>'
|
|
f'</span>'
|
|
f'{next_btn}'
|
|
f'</div>'
|
|
)
|
|
|
|
content = _tmpl('iscrizioni.html').substitute(
|
|
tbl_iscrizioni=q_iscrizioni.render_table(rows),
|
|
pg_steps=pg_steps,
|
|
)
|
|
return _base('RPA Steps', content, 'nav_iscrizioni', db_path, _page_css('iscrizioni'), _page_js('iscrizioni'))
|
|
|
|
|
|
def _page_api_iscrizioni(db_path: str) -> str:
|
|
rows = q_api_iscrizioni.get_iscrizioni(db_path)
|
|
content = _tmpl('api_iscrizioni.html').substitute(
|
|
tbl_api_iscrizioni=q_api_iscrizioni.render_table(rows),
|
|
)
|
|
return _base('Iscrizioni', content, 'nav_api_iscrizioni', db_path, _page_css('api_iscrizioni'), _page_js('api_iscrizioni'))
|
|
|
|
|
|
def _page_sharepoint(db_path: str) -> str:
|
|
rows = q_sharepoint.get_sharepoint(db_path)
|
|
content = _tmpl('sharepoint.html').substitute(
|
|
tbl_sharepoint=q_sharepoint.render_table(rows)
|
|
)
|
|
return _base('SharePoint', content, 'nav_sharepoint', db_path, _page_css('sharepoint'), _page_js('sharepoint'))
|
|
|
|
|
|
def _page_email(db_path: str) -> str:
|
|
rows = q_email.get_email(db_path)
|
|
content = _tmpl('email.html').substitute(
|
|
tbl_email=q_email.render_table(rows)
|
|
)
|
|
return _base('Email', content, 'nav_email', db_path, _page_css('email'), _page_js('email'))
|
|
|
|
|
|
def _page_schema(db_path: str) -> str:
|
|
rows = q_processes.get_schema_stats(db_path)
|
|
labels = [r.get('process_name') or 'unknown' for r in rows]
|
|
counts = [r.get('run_count', 0) for r in rows]
|
|
minutes = [max(1, (r.get('total_seconds') or 0) // 60) for r in rows]
|
|
|
|
db_type = _state.get('db_type', 'unknown')
|
|
if db_type == DbType.INTRAZ:
|
|
err_rows = q_iscrizioni.get_errors_by_process(db_path)
|
|
else:
|
|
err_rows = q_logs.get_errors_by_process(db_path)
|
|
err_labels = [r.get('process_name', 'unknown') for r in err_rows]
|
|
err_counts = [r.get('error_count', 0) for r in err_rows]
|
|
|
|
content = _tmpl('schema.html').substitute(
|
|
labels_json=_json.dumps(labels),
|
|
counts_json=_json.dumps(counts),
|
|
minutes_json=_json.dumps(minutes),
|
|
err_labels_json=_json.dumps(err_labels),
|
|
err_counts_json=_json.dumps(err_counts),
|
|
)
|
|
return _base('Schema', content, 'nav_schema', db_path, _page_css('schema'), _page_js('schema'))
|
|
|
|
|
|
def _page_login(error: bool = False) -> str:
|
|
error_block = '<p class="login-error">✗ Credenziali non valide. Riprova.</p>' if error else ''
|
|
return _tmpl('login.html').substitute(css=_css(), error_block=error_block)
|
|
|
|
|
|
def _page_server_logs(db_path: str) -> str:
|
|
rows = ''.join(
|
|
f'<tr>'
|
|
f'<td>{_e(entry["ts"])}</td><td>{_e(entry["method"])}</td><td>{_e(entry["path"])}</td>'
|
|
f'<td style="color:{"#16a34a" if entry["code"] < 400 else "#d97706" if entry["code"] < 500 else "#dc2626"};font-weight:600">{entry["code"]}</td>'
|
|
f'<td>{_e(entry["client"])}</td>'
|
|
f'</tr>'
|
|
for entry in reversed(_access_log)
|
|
)
|
|
table = (
|
|
'<table><thead><tr>'
|
|
'<th>Timestamp</th><th>Method</th><th>Path</th><th>Status</th><th>Client</th>'
|
|
'</tr></thead><tbody>'
|
|
+ (rows or '<tr><td colspan=5 class="empty">Nessuna richiesta ancora.</td></tr>')
|
|
+ '</tbody></table>'
|
|
)
|
|
content = f'<section><h2>Accessi HTTP (ultimi 200)</h2><div class="tw">{table}</div></section>'
|
|
return _base('Server Logs', content, '', db_path, _page_css('server-logs'))
|