410 lines
16 KiB
Python
410 lines
16 KiB
Python
import calendar as _cal_mod
|
|
import json as _json
|
|
from collections import defaultdict as _defaultdict
|
|
from datetime import date as _date
|
|
from .config import _state, _access_log, DbType, LOGS_PER_PAGE, STEPS_ROWS
|
|
from .renderer import _base, _card, _page_css, _page_js, _tmpl, _css
|
|
from .queries.db import e as _e
|
|
from .queries import processes as q_processes
|
|
from .queries import pec as q_pec
|
|
from .queries import logs as q_logs
|
|
from .queries import report as q_report
|
|
from .queries import documenti as q_documenti
|
|
from .queries import iscrizioni as q_iscrizioni
|
|
from .queries import api_iscrizioni as q_api_iscrizioni
|
|
from .queries import email as q_email
|
|
from .queries import sharepoint as q_sharepoint
|
|
|
|
_IT_MONTHS = [
|
|
'', 'Gennaio', 'Febbraio', 'Marzo', 'Aprile', 'Maggio', 'Giugno',
|
|
'Luglio', 'Agosto', 'Settembre', 'Ottobre', 'Novembre', 'Dicembre'
|
|
]
|
|
|
|
|
|
def _load_data(db_path: str) -> dict:
|
|
db_type = _state.get('db_type', 'unknown')
|
|
if db_type == DbType.INTRAZ:
|
|
middle_stats = {**q_iscrizioni.get_stats(db_path), **q_email.get_stats(db_path)}
|
|
else:
|
|
middle_stats = q_pec.get_stats(db_path)
|
|
stats = {
|
|
**q_processes.get_stats(db_path),
|
|
**middle_stats,
|
|
**q_logs.get_stats(db_path),
|
|
}
|
|
return {
|
|
'processes': q_processes.get_processes(db_path),
|
|
'pec': [] if db_type == DbType.INTRAZ else q_pec.get_pec(db_path),
|
|
'logs': q_logs.get_logs(db_path),
|
|
'stats': stats,
|
|
}
|
|
|
|
|
|
def _page_dashboard(db_path: str) -> str:
|
|
s = _load_data(db_path)['stats']
|
|
db_type = _state.get('db_type', 'unknown')
|
|
|
|
cards_runs = (
|
|
_card("Run totali", s['total_runs'])
|
|
+ _card("Completati", s['completed_runs'], '#16a34a')
|
|
+ _card("Incompleti", s['total_runs'] - s['completed_runs'], '#dc2626')
|
|
)
|
|
cards_logs = (
|
|
_card("Errori", s['log_errors'], '#dc2626')
|
|
+ _card("Warning", s['log_warnings'], '#d97706')
|
|
)
|
|
|
|
if db_type == DbType.INTRAZ:
|
|
cards_middle = (
|
|
_card("Corsi totali", s.get('isc_total', 0))
|
|
+ _card("Completati", s.get('isc_ok', 0), '#16a34a')
|
|
+ _card("Con errore", s.get('isc_errore', 0), '#dc2626')
|
|
+ _card("In corso", s.get('isc_pending', 0), '#d97706')
|
|
)
|
|
cards_email = (
|
|
_card("Email totali", s.get('email_total', 0))
|
|
+ _card("Inviate", s.get('email_inviata', 0), '#16a34a')
|
|
+ _card("Pending", s.get('email_pending', 0), '#d97706')
|
|
+ _card("Errori", s.get('email_errore', 0), '#dc2626')
|
|
)
|
|
section_middle = (
|
|
'<section>'
|
|
'<h2><a href="/steps" class="sec-link">📋 RPA Steps</a></h2>'
|
|
f'<div class="grid">{cards_middle}</div>'
|
|
'</section>'
|
|
'<section>'
|
|
'<h2><a href="/email" class="sec-link">✉ Email</a></h2>'
|
|
f'<div class="grid">{cards_email}</div>'
|
|
'</section>'
|
|
)
|
|
else:
|
|
cards_middle = (
|
|
_card("PEC totali", s.get('total_pec', 0))
|
|
+ _card("Inviate", s.get('pec_inviato', 0), '#16a34a')
|
|
+ _card("Pending", s.get('pec_pending', 0), '#d97706')
|
|
+ _card("Errori", s.get('pec_errore', 0), '#dc2626')
|
|
)
|
|
section_middle = (
|
|
'<section>'
|
|
'<h2><a href="/pec" class="sec-link">✉ Comunicazioni PEC</a></h2>'
|
|
f'<div class="grid">{cards_middle}</div>'
|
|
'</section>'
|
|
)
|
|
|
|
content = _tmpl('dashboard.html').substitute(
|
|
cards_runs=cards_runs, section_middle=section_middle, cards_logs=cards_logs
|
|
)
|
|
return _base('Dashboard', content, 'nav_dashboard', db_path, _page_css('dashboard'), _page_js('dashboard'))
|
|
|
|
|
|
def _page_runs(db_path: str) -> str:
|
|
content = _tmpl('runs.html').substitute(
|
|
tbl_processes=q_processes.render_table(q_processes.get_processes(db_path))
|
|
)
|
|
return _base('Processi', content, 'nav_runs', db_path, _page_css('runs'), _page_js('runs'))
|
|
|
|
|
|
def _page_pec(db_path: str) -> str:
|
|
content = _tmpl('pec.html').substitute(
|
|
tbl_pec=q_pec.render_table(q_pec.get_pec(db_path))
|
|
)
|
|
return _base('PEC', content, 'nav_pec', db_path, _page_css('pec'), _page_js('pec'))
|
|
|
|
|
|
def _page_logs(db_path: str, run_id: int = None, page: int = 1) -> str:
|
|
PER_PAGE = LOGS_PER_PAGE
|
|
|
|
runs = q_logs.get_last_run_ids(db_path)
|
|
run_ids = [r['rpa_process_id'] for r in runs]
|
|
run_names = {r['rpa_process_id']: r.get('process_name', 'unknown') for r in runs}
|
|
run_dates = {r['rpa_process_id']: str(r.get('start_run', ''))[:10] for r in runs}
|
|
|
|
if not run_ids:
|
|
content = _tmpl('logs.html').substitute(
|
|
run_info='nessun log', pagination='', pg_rows='',
|
|
tbl_logs='<p class="empty">Nessun log trovato.</p>'
|
|
)
|
|
return _base('Log DB', content, 'nav_logs', db_path, _page_css('logs'), _page_js('logs'))
|
|
|
|
if run_id is None or run_id not in run_ids:
|
|
run_id = run_ids[0]
|
|
|
|
idx = run_ids.index(run_id)
|
|
prev_id = run_ids[idx + 1] if idx + 1 < len(run_ids) else None
|
|
next_id = run_ids[idx - 1] if idx > 0 else None
|
|
|
|
total = q_logs.get_logs_by_run_count(db_path, run_id)
|
|
total_pages = max(1, (total + PER_PAGE - 1) // PER_PAGE)
|
|
page = max(1, min(page, total_pages))
|
|
|
|
# ── Row-level pagination (rendered above the run bar, async via JS) ──────
|
|
pg_rows = (
|
|
f'<div class="log-pg-rows" id="log-pg-rows"'
|
|
f' data-run="{run_id}" data-page="{page}"'
|
|
f' data-total-pages="{total_pages}" data-total="{total}">'
|
|
f'<button id="pg-row-prev" class="pg-btn"'
|
|
f'{" disabled" if page <= 1 else ""}>←</button>'
|
|
f'<span id="pg-row-info" class="pg-row-info">'
|
|
f'{page}/{total_pages}'
|
|
f' <span class="pg-row-count">({total} log)</span>'
|
|
f'</span>'
|
|
f'<button id="pg-row-next" class="pg-btn"'
|
|
f'{" disabled" if page >= total_pages else ""}>→</button>'
|
|
f'</div>'
|
|
)
|
|
|
|
# ── Run-level pagination (fixed bottom bar) ───────────────────────────────
|
|
prev_btn = (f'<a class="pg-btn" href="/logs?run={prev_id}">← Run</a>'
|
|
if prev_id else '<span class="pg-btn disabled">← Run</span>')
|
|
next_btn = (f'<a class="pg-btn" href="/logs?run={next_id}">Run →</a>'
|
|
if next_id else '<span class="pg-btn disabled">Run →</span>')
|
|
options = ''.join(
|
|
f'<option value="{rid}"'
|
|
f' data-process-name="{_e(run_names.get(rid, ""))}"'
|
|
f' data-date="{_e(run_dates.get(rid, ""))}"'
|
|
f' style="color:#1d4ed8"'
|
|
f'{" selected" if rid == run_id else ""}>'
|
|
f'Run #{rid} — {_e(run_names.get(rid, ""))}</option>'
|
|
for rid in run_ids
|
|
)
|
|
current_name = run_names.get(run_id, 'unknown')
|
|
pagination = (
|
|
f'<div class="log-pagination">'
|
|
f'{prev_btn}'
|
|
f'<select id="pg-run-select" class="pg-select"'
|
|
f' onchange="location.href=\'/logs?run=\'+this.value">{options}</select>'
|
|
f'{next_btn}'
|
|
f'</div>'
|
|
)
|
|
|
|
run_info = f'<span class="proc-name-badge">{_e(current_name)}</span> Run #{run_id}'
|
|
|
|
offset = (page - 1) * PER_PAGE
|
|
logs = q_logs.get_logs_by_run(db_path, run_id, page=page, per_page=PER_PAGE)
|
|
content = _tmpl('logs.html').substitute(
|
|
run_info=run_info,
|
|
pagination=pagination,
|
|
pg_rows=pg_rows,
|
|
tbl_logs=q_logs.render_table(logs, offset=offset),
|
|
)
|
|
return _base('Log DB', content, 'nav_logs', db_path, _page_css('logs'), _page_js('logs'))
|
|
|
|
|
|
def _page_report(db_path: str) -> str:
|
|
rows = q_report.get_report(db_path)
|
|
content = _tmpl('report.html').substitute(
|
|
tbl_report=q_report.render_table(rows)
|
|
)
|
|
return _base('Report', content, 'nav_report', db_path, _page_css('report'), _page_js('report'))
|
|
|
|
|
|
def _page_documenti(db_path: str) -> str:
|
|
rows = q_documenti.get_documenti(db_path)
|
|
content = _tmpl('documenti.html').substitute(
|
|
sessions_html=q_documenti.render_page(rows)
|
|
)
|
|
return _base('Documenti', content, 'nav_documenti', db_path, _page_css('documenti'), _page_js('documenti'))
|
|
|
|
|
|
def _page_iscrizioni(db_path: str, page: int = 1) -> str:
|
|
PER_PAGE = STEPS_ROWS
|
|
total = q_iscrizioni.get_corsi_count(db_path)
|
|
total_pages = max(1, (total + PER_PAGE - 1) // PER_PAGE)
|
|
page = max(1, min(page, total_pages))
|
|
offset = (page - 1) * PER_PAGE
|
|
rows = q_iscrizioni.get_corsi(db_path, limit=PER_PAGE, offset=offset)
|
|
|
|
prev_btn = ('<button id="pg-steps-prev" class="pg-btn">←</button>'
|
|
if page > 1 else '<button id="pg-steps-prev" class="pg-btn" disabled>←</button>')
|
|
next_btn = ('<button id="pg-steps-next" class="pg-btn">→</button>'
|
|
if page < total_pages else '<button id="pg-steps-next" class="pg-btn" disabled>→</button>')
|
|
pg_steps = (
|
|
f'<div class="steps-pagination" id="steps-pg-bar"'
|
|
f' data-page="{page}" data-total-pages="{total_pages}" data-total="{total}">'
|
|
f'{prev_btn}'
|
|
f'<span id="steps-pg-info" class="pg-info">'
|
|
f'Pagina {page}/{total_pages}'
|
|
f' <span class="pg-row-count">({total} corsi)</span>'
|
|
f'</span>'
|
|
f'{next_btn}'
|
|
f'</div>'
|
|
)
|
|
|
|
content = _tmpl('iscrizioni.html').substitute(
|
|
tbl_iscrizioni=q_iscrizioni.render_table(rows),
|
|
pg_steps=pg_steps,
|
|
)
|
|
return _base('RPA Steps', content, 'nav_iscrizioni', db_path, _page_css('iscrizioni'), _page_js('iscrizioni'))
|
|
|
|
|
|
def _page_api_iscrizioni(db_path: str) -> str:
|
|
rows = q_api_iscrizioni.get_iscrizioni(db_path)
|
|
content = _tmpl('api_iscrizioni.html').substitute(
|
|
tbl_api_iscrizioni=q_api_iscrizioni.render_table(rows),
|
|
)
|
|
return _base(
|
|
'Iscrizioni', content, 'nav_api_iscrizioni', db_path,
|
|
_page_css('api_iscrizioni'), _page_js('api_iscrizioni')
|
|
)
|
|
|
|
|
|
def _page_sharepoint(db_path: str) -> str:
|
|
rows = q_sharepoint.get_sharepoint(db_path)
|
|
content = _tmpl('sharepoint.html').substitute(
|
|
tbl_sharepoint=q_sharepoint.render_table(rows)
|
|
)
|
|
return _base('SharePoint', content, 'nav_sharepoint', db_path, _page_css('sharepoint'), _page_js('sharepoint'))
|
|
|
|
|
|
def _page_email(db_path: str) -> str:
|
|
rows = q_email.get_email(db_path)
|
|
content = _tmpl('email.html').substitute(
|
|
tbl_email=q_email.render_table(rows)
|
|
)
|
|
return _base('Email', content, 'nav_email', db_path, _page_css('email'), _page_js('email'))
|
|
|
|
|
|
def _page_schema(db_path: str) -> str:
|
|
rows = q_processes.get_schema_stats(db_path)
|
|
labels = [r.get('process_name') or 'unknown' for r in rows]
|
|
counts = [r.get('run_count', 0) for r in rows]
|
|
minutes = [max(1, (r.get('total_seconds') or 0) // 60) for r in rows]
|
|
|
|
db_type = _state.get('db_type', 'unknown')
|
|
if db_type == DbType.INTRAZ:
|
|
err_rows = q_iscrizioni.get_errors_by_process(db_path)
|
|
else:
|
|
err_rows = q_logs.get_errors_by_process(db_path)
|
|
err_labels = [r.get('process_name', 'unknown') for r in err_rows]
|
|
err_counts = [r.get('error_count', 0) for r in err_rows]
|
|
|
|
content = _tmpl('schema.html').substitute(
|
|
labels_json=_json.dumps(labels),
|
|
counts_json=_json.dumps(counts),
|
|
minutes_json=_json.dumps(minutes),
|
|
err_labels_json=_json.dumps(err_labels),
|
|
err_counts_json=_json.dumps(err_counts),
|
|
)
|
|
return _base('Schema', content, 'nav_schema', db_path, _page_css('schema'), _page_js('schema'))
|
|
|
|
|
|
def _page_calendar(db_path: str, year: int = None, month: int = None) -> str:
|
|
today = _date.today()
|
|
if year is None:
|
|
year = today.year
|
|
if month is None:
|
|
month = today.month
|
|
year = max(2000, min(2100, year))
|
|
month = max(1, min(12, month))
|
|
|
|
prev_year, prev_month = (year - 1, 12) if month == 1 else (year, month - 1)
|
|
next_year, next_month = (year + 1, 1) if month == 12 else (year, month + 1)
|
|
|
|
runs = q_processes.get_runs_by_month(db_path, year, month)
|
|
|
|
runs_by_day = _defaultdict(list)
|
|
for r in runs:
|
|
start = r.get('start_run', '')
|
|
if start and len(start) >= 10:
|
|
try:
|
|
runs_by_day[int(start[8:10])].append(r)
|
|
except ValueError:
|
|
pass
|
|
|
|
_CAL_MAX_VISIBLE = 3
|
|
|
|
def _run_pill(r):
|
|
run_id = r.get('id', '')
|
|
proc = _e(r.get('process_name', 'processo'))
|
|
ts = str(r.get('start_run', ''))
|
|
time_str = ts[11:16] if len(ts) >= 16 else ''
|
|
completed = r.get('completed') in (1, True, '1')
|
|
cls = 'cal-run-ok' if completed else 'cal-run-err'
|
|
icon = '✓' if completed else '✗'
|
|
return (
|
|
f'<a href="/logs?run={run_id}" class="cal-run {cls}"'
|
|
f' title="{proc} {time_str}">'
|
|
f'<span class="cal-run-icon">{icon}</span>'
|
|
f'<span class="cal-run-name">{proc}</span>'
|
|
f'</a>'
|
|
)
|
|
|
|
IT_DOWS = ['Lun', 'Mar', 'Mer', 'Gio', 'Ven', 'Sab', 'Dom']
|
|
cells = ''.join(f'<div class="cal-dow">{d}</div>' for d in IT_DOWS)
|
|
|
|
for week in _cal_mod.monthcalendar(year, month):
|
|
for day in week:
|
|
if day == 0:
|
|
cells += '<div class="cal-day cal-day-empty"></div>'
|
|
continue
|
|
is_today = (year == today.year and month == today.month and day == today.day)
|
|
day_runs = runs_by_day.get(day, [])
|
|
pills = [_run_pill(r) for r in day_runs]
|
|
|
|
if len(pills) <= _CAL_MAX_VISIBLE:
|
|
runs_html = f'<div class="cal-runs">{"".join(pills)}</div>'
|
|
else:
|
|
visible = ''.join(pills[:_CAL_MAX_VISIBLE])
|
|
extra = ''.join(pills[_CAL_MAX_VISIBLE:])
|
|
extra_count = len(pills) - _CAL_MAX_VISIBLE
|
|
more_label = f'⋯ +{extra_count}'
|
|
runs_html = (
|
|
f'<div class="cal-runs">{visible}</div>'
|
|
f'<button class="cal-more" data-more-text="{more_label}"'
|
|
f' onclick="calToggleMore(this)">{more_label}</button>'
|
|
f'<div class="cal-runs cal-runs-extra">{extra}</div>'
|
|
)
|
|
|
|
today_cls = ' cal-day-today' if is_today else ''
|
|
cells += (
|
|
f'<div class="cal-day{today_cls}">'
|
|
f'<div class="cal-day-num">{day}</div>'
|
|
f'{runs_html}'
|
|
f'</div>'
|
|
)
|
|
|
|
nav_html = (
|
|
f'<div class="cal-nav">'
|
|
f'<a href="/calendar?y={prev_year}&m={prev_month}" class="cal-nav-btn">'
|
|
f'← {_IT_MONTHS[prev_month]}</a>'
|
|
f'<h2 class="cal-month-title">{_IT_MONTHS[month]} {year}</h2>'
|
|
f'<a href="/calendar?y={next_year}&m={next_month}" class="cal-nav-btn">'
|
|
f'{_IT_MONTHS[next_month]} →</a>'
|
|
f'</div>'
|
|
)
|
|
content = _tmpl('calendar.html').substitute(
|
|
cal_nav=nav_html,
|
|
calendar_html=f'<div class="cal-grid">{cells}</div>',
|
|
)
|
|
return _base(
|
|
f'Calendario — {_IT_MONTHS[month]} {year}', content,
|
|
'nav_calendar', db_path, _page_css('calendar'), _page_js('calendar')
|
|
)
|
|
|
|
|
|
def _page_login(error: bool = False) -> str:
|
|
error_block = '<p class="login-error">✗ Credenziali non valide. Riprova.</p>' if error else ''
|
|
return _tmpl('login.html').substitute(css=_css(), error_block=error_block)
|
|
|
|
|
|
def _page_server_logs(db_path: str) -> str:
|
|
rows = ''.join(
|
|
f'<tr>'
|
|
f'<td>{_e(entry["ts"])}</td><td>{_e(entry["method"])}</td><td>{_e(entry["path"])}</td>'
|
|
f'<td style="color:'
|
|
f'{"#16a34a" if entry["code"] < 400 else "#d97706" if entry["code"] < 500 else "#dc2626"}'
|
|
f';font-weight:600">{entry["code"]}</td>'
|
|
f'<td>{_e(entry["client"])}</td>'
|
|
f'</tr>'
|
|
for entry in reversed(_access_log)
|
|
)
|
|
table = (
|
|
'<table><thead><tr>'
|
|
'<th>Timestamp</th><th>Method</th><th>Path</th><th>Status</th><th>Client</th>'
|
|
'</tr></thead><tbody>'
|
|
+ (rows or '<tr><td colspan=5 class="empty">Nessuna richiesta ancora.</td></tr>')
|
|
+ '</tbody></table>'
|
|
)
|
|
content = f'<section><h2>Accessi HTTP (ultimi 200)</h2><div class="tw">{table}</div></section>'
|
|
return _base('Server Logs', content, '', db_path, _page_css('server-logs'))
|