""" Routes API pour les sessions terminal SSH web. Provides endpoints for: - Creating terminal sessions - Listing active sessions - Closing sessions - Serving terminal popout page """ import asyncio import json import hashlib import httpx import logging import socket import html from urllib.parse import urlencode, urlparse, parse_qs from datetime import datetime, timedelta, timezone from typing import Optional from app.services.shell_history_service import shell_history_service, ShellHistoryError from fastapi import APIRouter, Depends, HTTPException, Request, WebSocket, WebSocketDisconnect, status from fastapi.responses import HTMLResponse, StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.dependencies import get_db, get_current_user, require_debug_mode from app.crud.host import HostRepository from app.crud.bootstrap_status import BootstrapStatusRepository from app.crud.terminal_session import TerminalSessionRepository from app.crud.terminal_command_log import TerminalCommandLogRepository from app.schemas.terminal import ( TerminalSessionRequest, TerminalSessionResponse, TerminalSessionHost, TerminalSessionStatus, TerminalSessionList, CommandHistoryItem, CommandHistoryResponse, UniqueCommandItem, UniqueCommandsResponse, ActiveSessionInfo, SessionLimitError, HeartbeatResponse, SessionMetrics, ) from app.services.terminal_service import ( terminal_service, TERMINAL_SESSION_TTL_MINUTES, TERMINAL_SESSION_TTL_SECONDS, TERMINAL_MAX_SESSIONS_PER_USER, TERMINAL_SESSION_IDLE_TIMEOUT_SECONDS, TERMINAL_HEARTBEAT_INTERVAL_SECONDS, TERMINAL_TTYD_INTERFACE, TtydNotAvailableError, ) from app.models.terminal_session import ( SESSION_STATUS_ACTIVE, SESSION_STATUS_CLOSED, CLOSE_REASON_USER, CLOSE_REASON_CLIENT_LOST, ) logger = logging.getLogger(__name__) router = APIRouter() _PORT_REACHABILITY_TIMEOUT_SECONDS = 1.0 _MAX_TERMINAL_COMMAND_LENGTH = 10000 def _as_utc_aware(dt: Optional[datetime]) -> Optional[datetime]: if dt is None: return None if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def _compute_remaining_seconds(expires_at: Optional[datetime], now: Optional[datetime] = None) -> int: now = now or datetime.now(timezone.utc) expires_at = _as_utc_aware(expires_at) if not expires_at: return 0 delta = (expires_at - now).total_seconds() return max(0, int(delta)) async def _is_port_reachable(request: Request, port: int) -> bool: candidates = ["127.0.0.1", "localhost"] try: if request.url.hostname: candidates.append(request.url.hostname) except Exception: pass ttyd_iface = TERMINAL_TTYD_INTERFACE if ttyd_iface and ttyd_iface not in {"0.0.0.0", "::"}: candidates.append(ttyd_iface) for host_candidate in dict.fromkeys(candidates): try: with socket.create_connection((host_candidate, int(port)), timeout=_PORT_REACHABILITY_TIMEOUT_SECONDS): return True except Exception: continue return False def _get_session_token_from_headers(auth_header: Optional[str]) -> Optional[str]: if not auth_header: return None if not auth_header.startswith("Bearer "): return None parts = auth_header.split(" ", 1) return parts[1].strip() if len(parts) == 2 else None def _build_session_limit_error(active_sessions, same_host_session=None) -> dict: now = datetime.now(timezone.utc) infos = [] for s in (active_sessions or []): created_at = _as_utc_aware(getattr(s, "created_at", None)) last_seen_at = _as_utc_aware(getattr(s, "last_seen_at", None)) age_seconds = int(max(0, (now - created_at).total_seconds())) if created_at else 0 last_seen_seconds = int(max(0, (now - last_seen_at).total_seconds())) if last_seen_at else 0 infos.append( ActiveSessionInfo( session_id=str(getattr(s, "id", "")), host_id=str(getattr(s, "host_id", "")), host_name=str(getattr(s, "host_name", "")), mode=str(getattr(s, "mode", "")), age_seconds=age_seconds, last_seen_seconds=last_seen_seconds, ) ) infos.sort(key=lambda x: x.last_seen_seconds, reverse=True) suggested_actions = [] can_reuse = False reusable_session_id = None if same_host_session is not None: can_reuse = True reusable_session_id = str(getattr(same_host_session, "id", "")) if reusable_session_id: suggested_actions.append("reuse_existing") suggested_actions.append(f"reuse_session:{reusable_session_id}") suggested_actions.append("close_oldest") if infos: suggested_actions.append(f"close_session:{infos[-1].session_id}") err = SessionLimitError( message="Maximum active terminal sessions reached", max_active=TERMINAL_MAX_SESSIONS_PER_USER, current_count=len(active_sessions or []), active_sessions=infos, suggested_actions=suggested_actions, can_reuse=can_reuse, reusable_session_id=reusable_session_id, ) return err.model_dump() def _get_session_token_from_request(request: Request, session_id: str, token: Optional[str] = None) -> Optional[str]: if token: return token cookie_key = f"terminal_token_{session_id}" cookie_token = request.cookies.get(cookie_key) or request.cookies.get("terminal_token") if cookie_token: return cookie_token hdr_token = _get_session_token_from_headers(request.headers.get("authorization")) if hdr_token: return hdr_token referer = request.headers.get("referer") if referer: try: referer_qs = parse_qs(urlparse(referer).query) referer_token = referer_qs.get("token", [None])[0] if referer_token: return referer_token except Exception: pass return None def _get_session_token_from_websocket(websocket: WebSocket, session_id: str) -> Optional[str]: token = websocket.query_params.get("token") if token: return token cookie_key = f"terminal_token_{session_id}" cookie_token = websocket.cookies.get(cookie_key) or websocket.cookies.get("terminal_token") return cookie_token @router.get("/status") async def get_terminal_status(current_user: dict = Depends(get_current_user)): available = bool(terminal_service.check_ttyd_available()) return { "available": available, "debug_mode": bool(settings.debug_mode), } @router.get("/sessions/{session_id}/probe") async def probe_terminal_session( session_id: str, token: str, request: Request, debug_enabled: bool = Depends(require_debug_mode), db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") if not terminal_service.verify_token(token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") alive = False try: alive = await terminal_service.is_session_process_alive(session_id) except Exception: alive = False reachable = await _is_port_reachable(request, session.ttyd_port) return { "session_id": session.id, "ttyd_port": session.ttyd_port, "process_alive": alive, "port_reachable": reachable, "ttyd_interface": TERMINAL_TTYD_INTERFACE, "dashboard_scheme": request.url.scheme, "dashboard_host": request.url.hostname, } @router.get("/proxy/{session_id}") async def proxy_ttyd_http( session_id: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") effective_token = _get_session_token_from_request(request, session_id, token=token) if not effective_token: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing session token") if not terminal_service.verify_token(effective_token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") prefix = f"/api/terminal/proxy/{session_id}" path = request.url.path if path.startswith(prefix): path = path[len(prefix):] if not path: path = "/" query_params = list(request.query_params.multi_items()) query_params = [(k, v) for (k, v) in query_params if k != "token"] query_string = urlencode(query_params, doseq=True) if query_params else "" ttyd_url = f"http://127.0.0.1:{session.ttyd_port}{path}" if query_string: ttyd_url += f"?{query_string}" cookie_key = f"terminal_token_{session_id}" cookie_token = request.cookies.get(cookie_key) or request.cookies.get("terminal_token") try: async with httpx.AsyncClient(timeout=30.0) as client: ttyd_response = await client.request( method=request.method, url=ttyd_url, headers={k: v for k, v in request.headers.items() if k.lower() not in {"host", "connection"}}, content=await request.body() if request.method in {"POST", "PUT", "PATCH"} else None, follow_redirects=True, ) drop_headers = { "content-encoding", "transfer-encoding", "content-security-policy", "x-frame-options", "content-length", } response_headers = { k: v for (k, v) in ttyd_response.headers.items() if k.lower() not in drop_headers } response = StreamingResponse( iter([ttyd_response.content]), status_code=ttyd_response.status_code, headers=response_headers, media_type=ttyd_response.headers.get("content-type"), ) if effective_token and effective_token != cookie_token: response.set_cookie( key=cookie_key, value=effective_token, httponly=True, samesite="lax", secure=(request.url.scheme == "https"), path=f"/api/terminal/proxy/{session_id}/", max_age=TERMINAL_SESSION_TTL_SECONDS, ) return response except Exception as e: logger.error(f"Error proxying request to ttyd: {e}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Failed to proxy request to ttyd: {str(e)}", ) @router.api_route( "/proxy/{session_id}/{proxy_path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"], ) async def proxy_ttyd_http_assets( session_id: str, proxy_path: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") effective_token = _get_session_token_from_request(request, session_id, token=token) if not effective_token: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing session token") if not terminal_service.verify_token(effective_token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") path = "/" + (proxy_path or "") query_params = list(request.query_params.multi_items()) query_params = [(k, v) for (k, v) in query_params if k != "token"] query_string = urlencode(query_params, doseq=True) if query_params else "" ttyd_url = f"http://127.0.0.1:{session.ttyd_port}{path}" if query_string: ttyd_url += f"?{query_string}" cookie_key = f"terminal_token_{session_id}" cookie_token = request.cookies.get(cookie_key) or request.cookies.get("terminal_token") try: async with httpx.AsyncClient(timeout=30.0) as client: ttyd_response = await client.request( method=request.method, url=ttyd_url, headers={k: v for k, v in request.headers.items() if k.lower() not in {"host", "connection"}}, content=await request.body() if request.method in {"POST", "PUT", "PATCH"} else None, follow_redirects=True, ) drop_headers = { "content-encoding", "transfer-encoding", "content-security-policy", "x-frame-options", "content-length", } response_headers = { k: v for (k, v) in ttyd_response.headers.items() if k.lower() not in drop_headers } response = StreamingResponse( iter([ttyd_response.content]), status_code=ttyd_response.status_code, headers=response_headers, media_type=ttyd_response.headers.get("content-type"), ) if effective_token and effective_token != cookie_token: response.set_cookie( key=cookie_key, value=effective_token, httponly=True, samesite="lax", secure=(request.url.scheme == "https"), path=f"/api/terminal/proxy/{session_id}/", max_age=TERMINAL_SESSION_TTL_SECONDS, ) return response except Exception as e: logger.error(f"Error proxying request to ttyd: {e}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Failed to proxy request to ttyd: {str(e)}", ) @router.websocket("/proxy/{session_id}/ws") async def proxy_ttyd_websocket( websocket: WebSocket, session_id: str, db_session: AsyncSession = Depends(get_db), ): token = _get_session_token_from_websocket(websocket, session_id) if not token: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Missing token") return session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not found") return if not terminal_service.verify_token(token, session.token_hash): await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Invalid token") return now = datetime.now(timezone.utc) expires_at = _as_utc_aware(getattr(session, "expires_at", None)) if expires_at and expires_at <= now: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session expired") return subprotocol = None proto_hdr = websocket.headers.get("sec-websocket-protocol") if proto_hdr and "tty" in proto_hdr: subprotocol = "tty" await websocket.accept(subprotocol=subprotocol) query_params = list(websocket.query_params.multi_items()) query_params = [(k, v) for (k, v) in query_params if k != "token"] upstream_qs = urlencode(query_params, doseq=True) if query_params else "" upstream_url = f"ws://127.0.0.1:{session.ttyd_port}/ws" + (f"?{upstream_qs}" if upstream_qs else "") import websockets logger.info( json.dumps( { "component": "terminal", "session_id": session_id, "event": "ws_proxy_connect_upstream", "upstream_url": upstream_url, "ttyd_port": session.ttyd_port, } ) ) try: # Ttyd checks Origin by default. We must provide a valid Origin matching the host/port # or ttyd will reject the WebSocket handshake (403). upstream_headers = { "Origin": f"http://127.0.0.1:{session.ttyd_port}", } async with websockets.connect( upstream_url, subprotocols=["tty"], open_timeout=5, ping_interval=None, max_size=None, additional_headers=upstream_headers, ) as upstream: logger.info( json.dumps( { "component": "terminal", "session_id": session_id, "event": "ws_proxy_connected_upstream", "ttyd_port": session.ttyd_port, } ) ) async def _client_to_upstream(): try: while True: message = await websocket.receive() if message.get("type") == "websocket.disconnect": break data = message.get("bytes") if data is not None: await upstream.send(data) continue text_data = message.get("text") if text_data is not None: await upstream.send(text_data) except WebSocketDisconnect: pass finally: try: await upstream.close() except Exception: pass async def _upstream_to_client(): try: async for data in upstream: if isinstance(data, bytes): await websocket.send_bytes(data) else: await websocket.send_text(data) finally: try: await websocket.close() except Exception: pass await asyncio.gather(_client_to_upstream(), _upstream_to_client(), return_exceptions=True) except Exception as e: logger.exception( json.dumps( { "component": "terminal", "session_id": session_id, "event": "ws_proxy_error", "error": str(e), "ttyd_port": getattr(session, "ttyd_port", None), } ) ) try: await websocket.close(code=1011, reason="Proxy error") except Exception: pass @router.post("/{host_id}/terminal-sessions", response_model=TerminalSessionResponse) async def create_terminal_session( host_id: str, session_request: TerminalSessionRequest, http_request: Request, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), ): if not terminal_service.check_ttyd_available(): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Terminal feature unavailable: ttyd is not installed", ) session_repo = TerminalSessionRepository(db_session) host_repo = HostRepository(db_session) bs_repo = BootstrapStatusRepository(db_session) host = await host_repo.get(host_id) if not host: host = await host_repo.get_by_name(host_id) if not host: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Host '{host_id}' not found" ) if host.status == "offline": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Host '{host.name}' is offline. Cannot open terminal." ) bootstrap = await bs_repo.latest_for_host(host.id) bootstrap_ok = bootstrap and bootstrap.status == "success" if not bootstrap_ok: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Host '{host.name}' has not been bootstrapped. Run bootstrap first to configure SSH access." ) user_id = current_user.get("user_id") or current_user.get("type", "api_key") username = current_user.get("username", "api_user") existing_session = await session_repo.find_reusable_session( user_id=user_id, host_id=host.id, mode=session_request.mode, idle_timeout_seconds=TERMINAL_SESSION_IDLE_TIMEOUT_SECONDS, ) if existing_session: await session_repo.update_last_seen(existing_session.id) await db_session.commit() terminal_service.record_session_created(reused=True) logger.info(f"session_reused session={existing_session.id[:8]}... user={username} host={host.name}") expires_at = _as_utc_aware(existing_session.expires_at) remaining = _compute_remaining_seconds(expires_at) token, token_hash = terminal_service.generate_session_token() existing_session.token_hash = token_hash await db_session.commit() connect_path = f"/api/terminal/connect/{existing_session.id}?token={token}" popout_path = f"/api/terminal/popout/{existing_session.id}?token={token}" session_url = popout_path if session_request.mode == "popout" else connect_path ws_scheme = "wss" if (http_request and http_request.url.scheme == "https") else "ws" ws_host = (http_request.url.netloc if http_request else "localhost") ws_token = urlencode({"token": token}) return TerminalSessionResponse( session_id=existing_session.id, url=session_url, websocket_url=f"{ws_scheme}://{ws_host}/api/terminal/proxy/{existing_session.id}/ws?{ws_token}", expires_at=expires_at, ttl_seconds=remaining, mode=existing_session.mode, host=TerminalSessionHost( id=host.id, name=host.name, ip=host.ip_address, status=host.status or "unknown", bootstrap_ok=bootstrap_ok, ), reused=True, token=token, ) active_sessions = await session_repo.list_active_for_user(user_id) active_count = len(active_sessions) if active_count >= TERMINAL_MAX_SESSIONS_PER_USER: same_host_session = next((s for s in active_sessions if s.host_id == host.id), None) terminal_service.record_session_limit_hit() logger.warning(f"session_limit_hit user={username} count={active_count} max={TERMINAL_MAX_SESSIONS_PER_USER}") from fastapi.responses import JSONResponse return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content=_build_session_limit_error(active_sessions, same_host_session), ) session_id = terminal_service.generate_session_id() token, token_hash = terminal_service.generate_session_token() port = None pid = None last_spawn_error = None for _ in range(3): try: port = await terminal_service.allocate_port(session_id) except Exception as e: logger.error(f"Failed to allocate port: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No available ports for terminal session", ) try: pid = await terminal_service.spawn_ttyd( session_id=session_id, host_ip=host.ip_address, port=port, token=token, ) if pid is None: last_spawn_error = "Failed to start terminal process" await terminal_service.release_port(port) port = None continue break except TtydNotAvailableError: await terminal_service.release_port(port) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Terminal feature unavailable: ttyd is not installed", ) except Exception as e: last_spawn_error = str(e) await terminal_service.release_port(port) port = None continue if pid is None or port is None: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to start terminal: {last_spawn_error or 'unknown error'}", ) expires_at = datetime.now(timezone.utc) + timedelta(minutes=TERMINAL_SESSION_TTL_MINUTES) session = None try: session = await session_repo.create( id=session_id, host_id=host.id, host_name=host.name, host_ip=host.ip_address, token_hash=token_hash, ttyd_port=port, ttyd_pid=pid, expires_at=expires_at, user_id=user_id, username=username, mode=session_request.mode, ) await db_session.commit() except Exception as e: logger.exception(f"Failed to create session in DB: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create terminal session" ) finally: if session is None: try: await terminal_service.terminate_session(session_id) except Exception: pass try: await terminal_service.release_port(port) except Exception: pass terminal_service.record_session_created(reused=False) logger.info(f"session_created session={session_id[:8]}... user={username} host={host.name}") connect_path = f"/api/terminal/connect/{session_id}?token={token}" popout_path = f"/api/terminal/popout/{session_id}?token={token}" session_url = popout_path if session_request.mode == "popout" else connect_path ws_scheme = "wss" if (http_request and http_request.url.scheme == "https") else "ws" ws_host = (http_request.url.netloc if http_request else "localhost") ws_token = urlencode({"token": token}) return TerminalSessionResponse( session_id=session_id, url=session_url, websocket_url=f"{ws_scheme}://{ws_host}/api/terminal/proxy/{session_id}/ws?{ws_token}", expires_at=expires_at, ttl_seconds=TERMINAL_SESSION_TTL_MINUTES * 60, mode=session_request.mode, host=TerminalSessionHost( id=host.id, name=host.name, ip=host.ip_address, status=host.status or "unknown", bootstrap_ok=bootstrap_ok, ), reused=False, token=token, ) @router.delete("/sessions/{session_id}") async def close_terminal_session( session_id: str, request: Request, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: return {"message": "Session not found or already closed", "session_id": session_id} if session.status in [SESSION_STATUS_CLOSED, "expired", "error"]: return {"message": "Session already closed", "session_id": session_id} token = _get_session_token_from_request(request, session_id, token=request.query_params.get("token")) if not token or not terminal_service.verify_token(token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") await terminal_service.close_session_with_cleanup( session_id=session_id, port=session.ttyd_port, reason=CLOSE_REASON_USER ) await session_repo.close_session(session_id, reason=CLOSE_REASON_USER) await db_session.commit() logger.info(f"session_closed session={session_id[:8]}... host={session.host_name} reason=user_close") return {"message": "Session closed", "session_id": session_id} @router.post("/sessions/{session_id}/heartbeat", response_model=HeartbeatResponse) async def heartbeat_terminal_session( session_id: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get_active_by_id(session_id) if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found or expired" ) token = _get_session_token_from_request(request, session_id, token=request.query_params.get("token")) if not token or not terminal_service.verify_token(token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") await session_repo.update_last_seen(session_id) await db_session.commit() now = datetime.now(timezone.utc) expires_at = _as_utc_aware(session.expires_at) remaining = _compute_remaining_seconds(expires_at, now=now) return HeartbeatResponse( session_id=session_id, status=session.status, last_seen_at=now, remaining_seconds=remaining, healthy=True, ) @router.post("/sessions/{session_id}/close-beacon") async def close_beacon_terminal_session( session_id: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if session and session.status == SESSION_STATUS_ACTIVE: await terminal_service.close_session_with_cleanup( session_id=session_id, port=session.ttyd_port, reason=CLOSE_REASON_CLIENT_LOST ) await session_repo.close_session(session_id, reason=CLOSE_REASON_CLIENT_LOST) await db_session.commit() logger.info(f"session_closed session={session_id[:8]}... host={session.host_name} reason=client_lost") from fastapi.responses import Response return Response(status_code=204) @router.post("/cleanup") async def cleanup_terminal_sessions( debug_enabled: bool = Depends(require_debug_mode), current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) user_id = current_user.get("user_id") or current_user.get("type", "api_key") active_sessions = await session_repo.list_active_for_user(user_id) terminated = 0 closed = 0 for session in active_sessions: try: await terminal_service.terminate_session(session.id) await terminal_service.release_port(session.ttyd_port) terminated += 1 except Exception as e: logger.warning(f"Failed to terminate session {session.id[:8]}: {e}") try: await session_repo.close_session(session.id, reason=CLOSE_REASON_USER) closed += 1 except Exception as e: logger.warning(f"Failed to close session {session.id[:8]} in DB: {e}") await db_session.commit() logger.info(f"Terminal cleanup: {terminated} processes terminated, {closed} sessions closed") return { "message": "Cleanup completed", "processes_terminated": terminated, "sessions_closed": closed, "expired_marked": 0, } @router.post("/sessions/{session_id}/command") async def log_terminal_command( session_id: str, command_data: dict, request: Request, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) token = _get_session_token_from_request(request, session_id, token=request.query_params.get("token")) if not token or not terminal_service.verify_token(token, session.token_hash): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token" ) command = command_data.get("command") if not command: return {"status": "ignored", "reason": "empty_command"} if len(command) > _MAX_TERMINAL_COMMAND_LENGTH: command = command[:_MAX_TERMINAL_COMMAND_LENGTH] cmd_repo = TerminalCommandLogRepository(db_session) command_hash = hashlib.sha256(command.encode()).hexdigest() await cmd_repo.create( host_id=session.host_id, host_name=session.host_name, user_id=session.user_id, username=session.username, terminal_session_id=session.id, command=command, command_hash=command_hash, source="terminal_interactive" ) await db_session.commit() return {"status": "logged", "command_len": len(command)} @router.get("/connect/{session_id}") async def get_terminal_connect_page( session_id: str, token: str, request: Request, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get_active_by_id(session_id) if not session: safe_title = "Session Expirée" return HTMLResponse( content=f""" {safe_title}

Session Expirée ou Invalide

Cette session terminal n'existe pas ou a expiré.

Retour au Dashboard

""", status_code=404 ) if not terminal_service.verify_token(token, session.token_hash): safe_title = html.escape(session.host_name or "Terminal") return HTMLResponse( content=f""" {safe_title}

Accès Refusé

Token de session invalide.

""", status_code=403 ) alive = True try: alive = await terminal_service.is_session_process_alive(session_id) except Exception: alive = True if not alive: if await _is_port_reachable(request, session.ttyd_port): alive = True else: try: pid = await terminal_service.spawn_ttyd( session_id=session_id, host_ip=session.host_ip, port=session.ttyd_port, token=token, ) if pid: session.ttyd_pid = pid await db_session.commit() alive = True except Exception: alive = False if not alive: safe_title = html.escape(session.host_name or "Terminal") return HTMLResponse( content=f""" {safe_title}

Terminal indisponible

Le service terminal (ttyd) ne répond pas pour cette session.

Essayez de reconnecter ou de recréer une session depuis le dashboard.

Retour au Dashboard
""", status_code=503, ) now = datetime.now(timezone.utc) expires_at = _as_utc_aware(session.expires_at) remaining_seconds = _compute_remaining_seconds(expires_at, now=now) ttyd_port = session.ttyd_port embed_mode = request.query_params.get("embed") in {"1", "true", "yes"} safe_host_name_html = html.escape(session.host_name or "") safe_host_ip_html = html.escape(session.host_ip or "") safe_title = html.escape(session.host_name or "Terminal") js_session_id = json.dumps(session_id) js_token = json.dumps(token) js_host_id = json.dumps(session.host_id) js_host_name = json.dumps(session.host_name) js_host_ip = json.dumps(session.host_ip) history_script_block = """ // ===== HISTORY FUNCTIONS ===== let historyPanelOpen = false; let historySelectedIndex = -1; let historySearchQuery = ''; let historyTimeFilter = 'all'; async function toggleHistory() { const panel = document.getElementById('terminalHistoryPanel'); const btn = document.getElementById('btnHistory'); if (historyPanelOpen) { closeHistoryPanel(); } else { panel.style.display = 'flex'; panel.classList.add('open'); btn.classList.add('active'); historyPanelOpen = true; historySelectedIndex = -1; const searchInput = document.getElementById('terminalHistorySearch'); if (searchInput) { searchInput.focus(); searchInput.select(); } if (historyData.length === 0) { loadHistory(); } } } function closeHistoryPanel() { const panel = document.getElementById('terminalHistoryPanel'); const btn = document.getElementById('btnHistory'); panel.classList.remove('open'); btn.classList.remove('active'); historyPanelOpen = false; historySelectedIndex = -1; setTimeout(() => { try { panel.style.display = 'none'; } catch (e) { // ignore } }, 200); document.getElementById('terminalFrame').focus(); } async function loadHistory() { const list = document.getElementById('terminalHistoryList'); list.innerHTML = '
Chargement...
'; const allHosts = document.getElementById('terminalHistoryAllHosts')?.checked || false; const query = historySearchQuery; let endpoint; if (allHosts) { endpoint = '/api/terminal/command-history?limit=100'; } else { endpoint = `/api/terminal/${HOST_ID}/shell-history?limit=100`; } if (query) { endpoint += (endpoint.includes('?') ? '&' : '?') + 'query=' + encodeURIComponent(query); } try { const res = await fetch(endpoint, { headers: { 'Authorization': 'Bearer ' + TOKEN } }); const data = await res.json(); let commands = data.commands || []; // Client-side time filtering if (historyTimeFilter !== 'all') { const now = new Date(); let cutoff; switch (historyTimeFilter) { case 'today': cutoff = new Date(now.getFullYear(), now.getMonth(), now.getDate()); break; case 'week': cutoff = new Date(now.getTime() - 7 * 24 * 60 * 60 * 1000); break; case 'month': cutoff = new Date(now.getTime() - 30 * 24 * 60 * 60 * 1000); break; } if (cutoff) { commands = commands.filter(cmd => { const cmdDate = new Date(cmd.last_used || cmd.created_at); return cmdDate >= cutoff; }); } } historyData = commands; historySelectedIndex = -1; renderHistory(); } catch (e) { list.innerHTML = '
Erreur de chargement
'; } } function escapeHtml(text) { if (!text) return ''; return text .replace(/&/g, "&") .replace(//g, ">") .replace(/\"/g, """) .replace(/'/g, "'"); } function escapeRegExp(string) { return string.replace(/[.*+?^${}()|[\\]\\\\]/g, '\\$&'); } function formatRelativeTime(dateStr) { if (!dateStr) return ''; const date = new Date(dateStr); const now = new Date(); const diffMs = now - date; const diffSec = Math.floor(diffMs / 1000); const diffMin = Math.floor(diffSec / 60); const diffHour = Math.floor(diffMin / 60); const diffDay = Math.floor(diffHour / 24); if (diffSec < 60) return "À l'instant"; if (diffMin < 60) return `Il y a ${diffMin}min`; if (diffHour < 24) return `Il y a ${diffHour}h`; if (diffDay < 7) return `Il y a ${diffDay}j`; return date.toLocaleDateString('fr-FR', { day: 'numeric', month: 'short' }); } function renderHistory() { const list = document.getElementById('terminalHistoryList'); const query = historySearchQuery || ''; if (historyData.length === 0) { const emptyMessage = query ? `Aucun résultat pour "${escapeHtml(query)}"` : 'Aucune commande dans l historique'; list.innerHTML = `
${emptyMessage}
`; return; } const items = historyData.map((cmd, index) => { const command = cmd.command || ''; const timeAgo = formatRelativeTime(cmd.last_used || cmd.created_at); const execCount = cmd.execution_count || 1; const isSelected = index === historySelectedIndex; let displayCommand = escapeHtml(command.length > 60 ? command.substring(0, 60) + '...' : command); if (query) { const regex = new RegExp(`(${escapeRegExp(query)})`, 'gi'); displayCommand = displayCommand.replace(regex, '$1'); } return `
${displayCommand}
${timeAgo} ${execCount > 1 ? `×${execCount}` : ''}
`; }).join(''); list.innerHTML = items; if (historySelectedIndex >= 0) { const selectedEl = list.querySelector('.terminal-history-item.selected'); if (selectedEl) { selectedEl.scrollIntoView({ block: 'nearest', behavior: 'smooth' }); } } } function handleHistoryKeydown(event) { const key = event.key; const historyLength = historyData.length; switch (key) { case 'ArrowDown': event.preventDefault(); if (historyLength > 0) { historySelectedIndex = Math.min(historySelectedIndex + 1, historyLength - 1); renderHistory(); } break; case 'ArrowUp': event.preventDefault(); if (historyLength > 0) { historySelectedIndex = Math.max(historySelectedIndex - 1, 0); renderHistory(); } break; case 'Enter': event.preventDefault(); if (historySelectedIndex >= 0) { selectHistoryCommand(historySelectedIndex); } else if (historyLength > 0) { selectHistoryCommand(0); } break; case 'Escape': event.preventDefault(); closeHistoryPanel(); break; } } let searchTimeout = null; function searchHistory(query) { historySearchQuery = query; historySelectedIndex = -1; if (searchTimeout) clearTimeout(searchTimeout); searchTimeout = setTimeout(() => loadHistory(), 250); } function clearHistorySearch() { const input = document.getElementById('terminalHistorySearch'); if (input) { input.value = ''; input.focus(); } historySearchQuery = ''; historySelectedIndex = -1; loadHistory(); } function setHistoryTimeFilter(value) { historyTimeFilter = value; historySelectedIndex = -1; loadHistory(); } function toggleHistoryScope() { historySelectedIndex = -1; loadHistory(); } function showNotification(message) { const notification = document.createElement('div'); notification.style.cssText = ` position: fixed; bottom: 20px; right: 20px; background: rgba(124, 58, 237, 0.9); color: white; padding: 0.75rem 1rem; border-radius: 0.5rem; font-size: 0.875rem; z-index: 9999; `; notification.textContent = message; document.body.appendChild(notification); setTimeout(() => { notification.style.opacity = '0'; notification.style.transition = 'opacity 0.3s'; setTimeout(() => notification.remove(), 300); }, 2500); } async function logCommand(command) { try { const cmd = String(command ?? '').trim(); if (!cmd) return; await fetch('/api/terminal/sessions/' + encodeURIComponent(SESSION_ID) + '/command?token=' + encodeURIComponent(TOKEN), { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ command: cmd }) }); } catch (e) { // Best-effort } } function selectHistoryCommand(index) { const cmd = historyData[index]; if (!cmd) return; copyTextToClipboard(cmd.command || '') .then(() => { showNotification('Commande copiée - Collez avec Ctrl+Shift+V'); closeHistoryPanel(); }) .catch(() => showNotification('Commande: ' + (cmd.command || ''))); } function executeHistoryCommand(index) { if (!Array.isArray(historyData)) { console.warn('historyData n’est pas un tableau'); return; } if (typeof index !== 'number' || index < 0 || index >= historyData.length) { console.warn('Index invalide:', index); return; } const cmd = historyData[index]; if (!cmd || typeof cmd.command !== 'string' || !cmd.command.trim()) { showNotification('Commande invalide ou vide'); return; } const commandText = cmd.command.trim() + '\\n'; copyTextToClipboard(commandText) .then(() => { showNotification('Commande copiée — collez pour exécuter'); logCommand(cmd.command); closeHistoryPanel(); }) .catch((err) => { console.error('Erreur copie presse-papiers:', err); showNotification('Impossible de copier la commande'); }); } function copyHistoryCommand(index) { const cmd = historyData[index]; if (!cmd) return; copyTextToClipboard(cmd.command || '') .then(() => { showNotification('Commande copiée'); logCommand(cmd.command || ''); }) .catch(() => showNotification('Impossible de copier')); } // Keyboard shortcuts document.addEventListener('keydown', (e) => { if (e.key === 'Escape') { if (historyPanelOpen) { closeHistoryPanel(); } } if ((e.ctrlKey || e.metaKey) && e.key === 'r') { e.preventDefault(); toggleHistory(); } }); """ debug_mode_enabled = bool(settings.debug_mode) debug_panel_html = ( """
Debug Terminal
boot: page_rendered\nsession_id: {session_id}\nttyd_port: {ttyd_port}\n
""".format(session_id=html.escape(session_id), ttyd_port=ttyd_port) if debug_mode_enabled else "" ) debug_js = ( "" if not debug_mode_enabled else " let debugVisible = false;\n\n" " function toggleDebug(force) {\n" " debugVisible = typeof force === 'boolean' ? force : !debugVisible;\n" " const el = document.getElementById('terminalDebug');\n" " if (el) el.style.display = debugVisible ? 'block' : 'none';\n" " }\n\n" " async function updateDebug(reason) {\n" " const body = document.getElementById('terminalDebugBody');\n" " if (!body) return;\n\n" " try {\n" " const probeUrl = '/api/terminal/sessions/' + encodeURIComponent(SESSION_ID) + '/probe?token=' + encodeURIComponent(TOKEN);\n" " const res = await fetch(probeUrl);\n" " if (!res.ok) {\n" " const errText = await res.text();\n" " body.textContent = 'reason: ' + (reason || '') + '\\nprobe_error: ' + res.status + ' ' + (errText.substring(0, 100) || 'no response');\n" " toggleDebug(true);\n" " return;\n" " }\n" " const data = await res.json();\n" " body.textContent = [\n" " 'reason: ' + (reason || ''),\n" " 'page: ' + window.location.href,\n" " 'computed_ttyd_url: ' + (ttydUrl || '(non définie)'),\n" " 'probe.ttyd_port: ' + (data.ttyd_port ?? 'n/a'),\n" " 'probe.process_alive: ' + String(data.process_alive),\n" " 'probe.port_reachable: ' + String(data.port_reachable),\n" " 'probe.ttyd_interface: ' + (data.ttyd_interface || '(unset)'),\n" " 'probe.dashboard_host: ' + (data.dashboard_host || '(n/a)'),\n" " 'probe.dashboard_scheme: ' + (data.dashboard_scheme || '(n/a)')\n" " ].join('\\n');\n" " toggleDebug(true);\n" " } catch (e) {\n" " body.textContent = 'reason: ' + (reason || '') + '\\nerror: ' + (e && e.message ? e.message : String(e));\n" " toggleDebug(true);\n" " }\n" " }\n\n" " window.addEventListener('keydown', (e) => {\n" " if ((e.ctrlKey || e.metaKey) && e.shiftKey && (e.key === 'D' || e.key === 'd')) {\n" " e.preventDefault();\n" " updateDebug('manual');\n" " }\n" " });\n\n" ) script_block = ( "" ) html_content = f""" {safe_title}
💡 Pour une expérience sans barre d'outils : installez en PWA ou utilisez chrome --app=URL
{safe_host_name_html} {safe_host_ip_html} Connecté
{remaining_seconds // 60}:{remaining_seconds % 60:02d}
Connexion au terminal...
session={session_id[:8]}… · ttyd_port={ttyd_port} · Debug: Ctrl+Shift+D
{debug_panel_html}
{script_block} """ return HTMLResponse(content=html_content, headers={"Cache-Control": "no-store"}) @router.get("/popout/{session_id}") async def get_terminal_popout_page( session_id: str, token: str, request: Request, db_session: AsyncSession = Depends(get_db), ): """ Serve the terminal popout page (fullscreen, minimal UI). This is designed to be opened in a popup window or PWA. """ # Reuse the same connect page logic but with minimal header return await get_terminal_connect_page(session_id, token, request, db_session) # ============================================================================ # Command History Endpoints # ============================================================================ @router.get("/{host_id}/command-history", response_model=CommandHistoryResponse) async def get_host_command_history( host_id: str, query: Optional[str] = None, limit: int = 50, offset: int = 0, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), debug_enabled: bool = Depends(require_debug_mode), ): """ Get command history for a specific host. Args: host_id: Host ID to get history for query: Optional search query to filter commands limit: Maximum number of results (default 50) offset: Number of results to skip for pagination Returns: List of commands with timestamps and metadata """ # Verify host exists host_repo = HostRepository(db_session) host = await host_repo.get(host_id) if not host: host = await host_repo.get_by_name(host_id) if not host: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Host '{host_id}' not found" ) # Get command history cmd_repo = TerminalCommandLogRepository(db_session) logs = await cmd_repo.list_for_host( host_id=host.id, query=query, limit=min(limit, 100), # Cap at 100 offset=offset, ) total = await cmd_repo.count_for_host(host.id) commands = [ CommandHistoryItem( id=log.id, command=log.command, created_at=log.created_at, host_name=log.host_name, username=log.username, ) for log in logs ] return CommandHistoryResponse( commands=commands, total=total, host_id=host.id, query=query, ) @router.get("/{host_id}/shell-history") async def get_host_shell_history( host_id: str, query: Optional[str] = None, limit: int = 100, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), debug_enabled: bool = Depends(require_debug_mode), ): """ Get shell history directly from the remote host via SSH. This fetches the actual ~/.bash_history file from the host, providing real-time command history. Args: host_id: Host ID to get history for query: Optional search query to filter commands limit: Maximum number of results (default 100) Returns: List of commands from the remote shell history """ # Verify host exists host_repo = HostRepository(db_session) host = await host_repo.get(host_id) if not host: host = await host_repo.get_by_name(host_id) if not host: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Host '{host_id}' not found" ) try: # Fetch history from the remote host via SSH commands = await shell_history_service.fetch_combined_history( host_ip=host.ip_address, limit=min(limit, 200), user="automation" ) # Apply search filter if provided if query: q = query.lower() commands = [c for c in commands if q in c.get("command", "").lower()] return { "commands": commands, "total": len(commands), "host_id": host.id, "host_name": host.name, "source": "remote_shell", } except ShellHistoryError as e: logger.warning(f"Failed to fetch shell history from {host.name}: {e}") # Fallback to database history cmd_repo = TerminalCommandLogRepository(db_session) unique_cmds = await cmd_repo.get_unique_commands_for_host( host_id=host.id, query=query, limit=min(limit, 100), ) return { "commands": unique_cmds, "total": len(unique_cmds), "host_id": host.id, "host_name": host.name, "source": "database", "error": str(e), } @router.get("/{host_id}/command-history/unique", response_model=UniqueCommandsResponse) async def get_host_unique_commands( host_id: str, query: Optional[str] = None, limit: int = 50, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), debug_enabled: bool = Depends(require_debug_mode), ): """ Get unique commands for a host (deduplicated). Returns each unique command once with execution count and last used time. Useful for command suggestions/autocomplete. """ # Verify host exists host_repo = HostRepository(db_session) host = await host_repo.get(host_id) if not host: host = await host_repo.get_by_name(host_id) if not host: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Host '{host_id}' not found" ) # Get unique commands cmd_repo = TerminalCommandLogRepository(db_session) unique_cmds = await cmd_repo.get_unique_commands_for_host( host_id=host.id, query=query, limit=min(limit, 100), ) commands = [ UniqueCommandItem( command=cmd["command"], command_hash=cmd["command_hash"], last_used=cmd["last_used"], execution_count=cmd["execution_count"], ) for cmd in unique_cmds ] return UniqueCommandsResponse( commands=commands, total=len(commands), host_id=host.id, ) @router.get("/command-history", response_model=CommandHistoryResponse) async def get_global_command_history( query: Optional[str] = None, host_id: Optional[str] = None, limit: int = 50, offset: int = 0, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), debug_enabled: bool = Depends(require_debug_mode), ): """ Get command history globally (across all hosts). Args: query: Optional search query to filter commands host_id: Optional host ID to filter by limit: Maximum number of results (default 50) offset: Number of results to skip for pagination Returns: List of commands with timestamps and metadata """ user_id = current_user.get("user_id") or current_user.get("type", "api_key") cmd_repo = TerminalCommandLogRepository(db_session) logs = await cmd_repo.list_global( query=query, host_id=host_id, user_id=str(user_id) if user_id else None, limit=min(limit, 100), offset=offset, ) commands = [ CommandHistoryItem( id=log.id, command=log.command, created_at=log.created_at, host_name=log.host_name, username=log.username, ) for log in logs ] return CommandHistoryResponse( commands=commands, total=len(commands), host_id=host_id, query=query, ) @router.delete("/{host_id}/command-history") async def clear_host_command_history( host_id: str, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), debug_enabled: bool = Depends(require_debug_mode), ): """ Clear command history for a specific host. Requires admin role. """ # Check admin permission if current_user.get("role") != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can clear command history" ) # Verify host exists host_repo = HostRepository(db_session) host = await host_repo.get(host_id) if not host: host = await host_repo.get_by_name(host_id) if not host: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Host '{host_id}' not found" ) cmd_repo = TerminalCommandLogRepository(db_session) deleted = await cmd_repo.delete_for_host(host.id) await db_session.commit() logger.info(f"Cleared {deleted} command logs for host {host.name}") return {"message": f"Cleared {deleted} command logs", "host_id": host.id} @router.post("/command-history/purge") async def purge_old_command_history( days: int = 30, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), debug_enabled: bool = Depends(require_debug_mode), ): """ Purge command history older than specified days. Requires admin role. """ # Check admin permission if current_user.get("role") != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only admins can purge command history" ) cmd_repo = TerminalCommandLogRepository(db_session) deleted = await cmd_repo.purge_old_logs(days=days) await db_session.commit() logger.info(f"Purged {deleted} command logs older than {days} days") return {"message": f"Purged {deleted} command logs", "retention_days": days}