""" Routes API pour les sessions terminal SSH web. Provides endpoints for: - Creating terminal sessions - Listing active sessions - Closing sessions - Serving terminal popout page """ import asyncio import json import hashlib import httpx import logging import socket import html from urllib.parse import urlencode, urlparse, parse_qs from datetime import datetime, timedelta, timezone from typing import Optional from pydantic import BaseModel from app.services.shell_history_service import shell_history_service, ShellHistoryError from fastapi import APIRouter, Depends, HTTPException, Request, WebSocket, WebSocketDisconnect, status from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.templating import Jinja2Templates from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.dependencies import get_db, get_current_user, get_current_user_optional, require_debug_mode _templates = Jinja2Templates(directory=str(settings.base_dir / "templates")) from app.crud.host import HostRepository from app.crud.bootstrap_status import BootstrapStatusRepository from app.crud.terminal_session import TerminalSessionRepository from app.crud.terminal_command_log import TerminalCommandLogRepository from app.schemas.terminal import ( TerminalSessionRequest, TerminalSessionResponse, TerminalSessionHost, TerminalSessionStatus, TerminalSessionList, CommandHistoryItem, CommandHistoryResponse, UniqueCommandItem, UniqueCommandsResponse, ActiveSessionInfo, SessionLimitError, HeartbeatResponse, SessionMetrics, ) from app.services.terminal_service import ( terminal_service, TERMINAL_SESSION_TTL_MINUTES, TERMINAL_SESSION_TTL_SECONDS, TERMINAL_MAX_SESSIONS_PER_USER, TERMINAL_SESSION_IDLE_TIMEOUT_SECONDS, TERMINAL_HEARTBEAT_INTERVAL_SECONDS, TERMINAL_TTYD_INTERFACE, TtydNotAvailableError, ) from app.models.terminal_session import ( SESSION_STATUS_ACTIVE, SESSION_STATUS_CLOSED, CLOSE_REASON_USER, CLOSE_REASON_CLIENT_LOST, ) logger = logging.getLogger(__name__) router = APIRouter() _PORT_REACHABILITY_TIMEOUT_SECONDS = 1.0 _MAX_TERMINAL_COMMAND_LENGTH = 10000 def _as_utc_aware(dt: Optional[datetime]) -> Optional[datetime]: if dt is None: return None if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def _compute_remaining_seconds(expires_at: Optional[datetime], now: Optional[datetime] = None) -> int: now = now or datetime.now(timezone.utc) expires_at = _as_utc_aware(expires_at) if not expires_at: return 0 delta = (expires_at - now).total_seconds() return max(0, int(delta)) async def _is_port_reachable(request: Request, port: int) -> bool: candidates = ["127.0.0.1", "localhost"] try: if request.url.hostname: candidates.append(request.url.hostname) except Exception: pass ttyd_iface = TERMINAL_TTYD_INTERFACE if ttyd_iface and ttyd_iface not in {"0.0.0.0", "::"}: candidates.append(ttyd_iface) for host_candidate in dict.fromkeys(candidates): try: with socket.create_connection((host_candidate, int(port)), timeout=_PORT_REACHABILITY_TIMEOUT_SECONDS): return True except Exception: continue return False def _get_session_token_from_headers(auth_header: Optional[str]) -> Optional[str]: if not auth_header: return None if not auth_header.startswith("Bearer "): return None parts = auth_header.split(" ", 1) return parts[1].strip() if len(parts) == 2 else None def _build_session_limit_error(active_sessions, same_host_session=None) -> dict: now = datetime.now(timezone.utc) infos = [] for s in (active_sessions or []): created_at = _as_utc_aware(getattr(s, "created_at", None)) last_seen_at = _as_utc_aware(getattr(s, "last_seen_at", None)) age_seconds = int(max(0, (now - created_at).total_seconds())) if created_at else 0 last_seen_seconds = int(max(0, (now - last_seen_at).total_seconds())) if last_seen_at else 0 infos.append( ActiveSessionInfo( session_id=str(getattr(s, "id", "")), host_id=str(getattr(s, "host_id", "")), host_name=str(getattr(s, "host_name", "")), mode=str(getattr(s, "mode", "")), age_seconds=age_seconds, last_seen_seconds=last_seen_seconds, ) ) infos.sort(key=lambda x: x.last_seen_seconds, reverse=True) suggested_actions = [] can_reuse = False reusable_session_id = None if same_host_session is not None: can_reuse = True reusable_session_id = str(getattr(same_host_session, "id", "")) if reusable_session_id: suggested_actions.append("reuse_existing") suggested_actions.append(f"reuse_session:{reusable_session_id}") suggested_actions.append("close_oldest") if infos: suggested_actions.append(f"close_session:{infos[-1].session_id}") err = SessionLimitError( message="Maximum active terminal sessions reached", max_active=TERMINAL_MAX_SESSIONS_PER_USER, current_count=len(active_sessions or []), active_sessions=infos, suggested_actions=suggested_actions, can_reuse=can_reuse, reusable_session_id=reusable_session_id, ) return err.model_dump() def _get_session_token_from_request(request: Request, session_id: str, token: Optional[str] = None) -> Optional[str]: if token: return token cookie_key = f"terminal_token_{session_id}" cookie_token = request.cookies.get(cookie_key) or request.cookies.get("terminal_token") if cookie_token: return cookie_token hdr_token = _get_session_token_from_headers(request.headers.get("authorization")) if hdr_token: return hdr_token referer = request.headers.get("referer") if referer: try: parsed = urlparse(referer) qs = parse_qs(parsed.query) if "token" in qs: return qs["token"][0] except Exception: pass return None async def _verify_history_access( host_id: str, request: Request, db_session: AsyncSession, current_user: Optional[dict] = None ) -> bool: """Verify access to history for a specific host (JWT or Session Token).""" if current_user: return True token = _get_session_token_from_request(request, host_id, token=request.query_params.get("token")) if not token: return False session_repo = TerminalSessionRepository(db_session) # Try by ID first sessions = await session_repo.list_active_for_host(host_id) # Try by name if no sessions found (host_id might be a name) if not sessions: host_repo = HostRepository(db_session) host = await host_repo.get_by_name(host_id) if host: sessions = await session_repo.list_active_for_host(host.id) for s in sessions: if terminal_service.verify_token(token, s.token_hash): return True return False def _get_session_token_from_websocket(websocket: WebSocket, session_id: str) -> Optional[str]: token = websocket.query_params.get("token") if token: return token cookie_key = f"terminal_token_{session_id}" cookie_token = websocket.cookies.get(cookie_key) or websocket.cookies.get("terminal_token") return cookie_token @router.get("/status") async def get_terminal_status(current_user: dict = Depends(get_current_user)): available = bool(terminal_service.check_ttyd_available()) return { "available": available, "debug_mode": bool(settings.debug_mode), } @router.get("/sessions/{session_id}/probe") async def probe_terminal_session( session_id: str, token: str, request: Request, debug_enabled: bool = Depends(require_debug_mode), db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") if not terminal_service.verify_token(token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") alive = False try: alive = await terminal_service.is_session_process_alive(session_id) except Exception: alive = False reachable = await _is_port_reachable(request, session.ttyd_port) return { "session_id": session.id, "ttyd_port": session.ttyd_port, "process_alive": alive, "port_reachable": reachable, "ttyd_interface": TERMINAL_TTYD_INTERFACE, "dashboard_scheme": request.url.scheme, "dashboard_host": request.url.hostname, } @router.get("/proxy/{session_id}") async def proxy_ttyd_http( session_id: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") effective_token = _get_session_token_from_request(request, session_id, token=token) if not effective_token: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing session token") if not terminal_service.verify_token(effective_token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") prefix = f"/api/terminal/proxy/{session_id}" path = request.url.path if path.startswith(prefix): path = path[len(prefix):] if not path: path = "/" query_params = list(request.query_params.multi_items()) query_params = [(k, v) for (k, v) in query_params if k != "token"] query_string = urlencode(query_params, doseq=True) if query_params else "" ttyd_url = f"http://127.0.0.1:{session.ttyd_port}{path}" if query_string: ttyd_url += f"?{query_string}" cookie_key = f"terminal_token_{session_id}" cookie_token = request.cookies.get(cookie_key) or request.cookies.get("terminal_token") try: async with httpx.AsyncClient(timeout=30.0) as client: ttyd_response = await client.request( method=request.method, url=ttyd_url, headers={k: v for k, v in request.headers.items() if k.lower() not in {"host", "connection"}}, content=await request.body() if request.method in {"POST", "PUT", "PATCH"} else None, follow_redirects=True, ) drop_headers = { "content-encoding", "transfer-encoding", "content-security-policy", "x-frame-options", "content-length", } response_headers = { k: v for (k, v) in ttyd_response.headers.items() if k.lower() not in drop_headers } response = StreamingResponse( iter([ttyd_response.content]), status_code=ttyd_response.status_code, headers=response_headers, media_type=ttyd_response.headers.get("content-type"), ) if effective_token and effective_token != cookie_token: response.set_cookie( key=cookie_key, value=effective_token, httponly=True, samesite="lax", secure=(request.url.scheme == "https"), path=f"/api/terminal/proxy/{session_id}/", max_age=TERMINAL_SESSION_TTL_SECONDS, ) return response except Exception as e: logger.error(f"Error proxying request to ttyd: {e}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Failed to proxy request to ttyd: {str(e)}", ) @router.api_route( "/proxy/{session_id}/{proxy_path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"], ) async def proxy_ttyd_http_assets( session_id: str, proxy_path: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") effective_token = _get_session_token_from_request(request, session_id, token=token) if not effective_token: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Missing session token") if not terminal_service.verify_token(effective_token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") path = "/" + (proxy_path or "") query_params = list(request.query_params.multi_items()) query_params = [(k, v) for (k, v) in query_params if k != "token"] query_string = urlencode(query_params, doseq=True) if query_params else "" ttyd_url = f"http://127.0.0.1:{session.ttyd_port}{path}" if query_string: ttyd_url += f"?{query_string}" cookie_key = f"terminal_token_{session_id}" cookie_token = request.cookies.get(cookie_key) or request.cookies.get("terminal_token") try: async with httpx.AsyncClient(timeout=30.0) as client: ttyd_response = await client.request( method=request.method, url=ttyd_url, headers={k: v for k, v in request.headers.items() if k.lower() not in {"host", "connection"}}, content=await request.body() if request.method in {"POST", "PUT", "PATCH"} else None, follow_redirects=True, ) drop_headers = { "content-encoding", "transfer-encoding", "content-security-policy", "x-frame-options", "content-length", } response_headers = { k: v for (k, v) in ttyd_response.headers.items() if k.lower() not in drop_headers } response = StreamingResponse( iter([ttyd_response.content]), status_code=ttyd_response.status_code, headers=response_headers, media_type=ttyd_response.headers.get("content-type"), ) if effective_token and effective_token != cookie_token: response.set_cookie( key=cookie_key, value=effective_token, httponly=True, samesite="lax", secure=(request.url.scheme == "https"), path=f"/api/terminal/proxy/{session_id}/", max_age=TERMINAL_SESSION_TTL_SECONDS, ) return response except Exception as e: logger.error(f"Error proxying request to ttyd: {e}") raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Failed to proxy request to ttyd: {str(e)}", ) @router.websocket("/proxy/{session_id}/ws") async def proxy_ttyd_websocket( websocket: WebSocket, session_id: str, db_session: AsyncSession = Depends(get_db), ): token = _get_session_token_from_websocket(websocket, session_id) if not token: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Missing token") return session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session not found") return if not terminal_service.verify_token(token, session.token_hash): await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Invalid token") return now = datetime.now(timezone.utc) expires_at = _as_utc_aware(getattr(session, "expires_at", None)) if expires_at and expires_at <= now: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Session expired") return subprotocol = None proto_hdr = websocket.headers.get("sec-websocket-protocol") if proto_hdr and "tty" in proto_hdr: subprotocol = "tty" await websocket.accept(subprotocol=subprotocol) query_params = list(websocket.query_params.multi_items()) query_params = [(k, v) for (k, v) in query_params if k != "token"] upstream_qs = urlencode(query_params, doseq=True) if query_params else "" upstream_url = f"ws://127.0.0.1:{session.ttyd_port}/ws" + (f"?{upstream_qs}" if upstream_qs else "") import websockets logger.info( json.dumps( { "component": "terminal", "session_id": session_id, "event": "ws_proxy_connect_upstream", "upstream_url": upstream_url, "ttyd_port": session.ttyd_port, } ) ) try: # Ttyd checks Origin by default. We must provide a valid Origin matching the host/port # or ttyd will reject the WebSocket handshake (403). upstream_headers = { "Origin": f"http://127.0.0.1:{session.ttyd_port}", } async with websockets.connect( upstream_url, subprotocols=["tty"], open_timeout=5, ping_interval=None, max_size=None, additional_headers=upstream_headers, ) as upstream: logger.info( json.dumps( { "component": "terminal", "session_id": session_id, "event": "ws_proxy_connected_upstream", "ttyd_port": session.ttyd_port, } ) ) async def _client_to_upstream(): try: while True: message = await websocket.receive() if message.get("type") == "websocket.disconnect": break data = message.get("bytes") if data is not None: await upstream.send(data) continue text_data = message.get("text") if text_data is not None: await upstream.send(text_data) except WebSocketDisconnect: pass finally: try: await upstream.close() except Exception: pass async def _upstream_to_client(): try: async for data in upstream: if isinstance(data, bytes): await websocket.send_bytes(data) else: await websocket.send_text(data) finally: try: await websocket.close() except Exception: pass await asyncio.gather(_client_to_upstream(), _upstream_to_client(), return_exceptions=True) except Exception as e: logger.exception( json.dumps( { "component": "terminal", "session_id": session_id, "event": "ws_proxy_error", "error": str(e), "ttyd_port": getattr(session, "ttyd_port", None), } ) ) try: await websocket.close(code=1011, reason="Proxy error") except Exception: pass @router.post("/{host_id}/terminal-sessions", response_model=TerminalSessionResponse) async def create_terminal_session( host_id: str, session_request: TerminalSessionRequest, http_request: Request, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), ): if not terminal_service.check_ttyd_available(): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Terminal feature unavailable: ttyd is not installed", ) session_repo = TerminalSessionRepository(db_session) host_repo = HostRepository(db_session) bs_repo = BootstrapStatusRepository(db_session) host = await host_repo.get(host_id) if not host: host = await host_repo.get_by_name(host_id) if not host: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Host '{host_id}' not found" ) if host.status == "offline": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Host '{host.name}' is offline. Cannot open terminal." ) bootstrap = await bs_repo.latest_for_host(host.id) bootstrap_ok = bootstrap and bootstrap.status == "success" if not bootstrap_ok: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Host '{host.name}' has not been bootstrapped. Run bootstrap first to configure SSH access." ) user_id = current_user.get("user_id") or current_user.get("type", "api_key") username = current_user.get("username", "api_user") existing_session = await session_repo.find_reusable_session( user_id=user_id, host_id=host.id, mode=session_request.mode, idle_timeout_seconds=TERMINAL_SESSION_IDLE_TIMEOUT_SECONDS, ) if existing_session: await session_repo.update_last_seen(existing_session.id) await db_session.commit() terminal_service.record_session_created(reused=True) logger.info(f"session_reused session={existing_session.id[:8]}... user={username} host={host.name}") expires_at = _as_utc_aware(existing_session.expires_at) remaining = _compute_remaining_seconds(expires_at) token, token_hash = terminal_service.generate_session_token() existing_session.token_hash = token_hash await db_session.commit() connect_path = f"/api/terminal/connect/{existing_session.id}?token={token}" popout_path = f"/api/terminal/popout/{existing_session.id}?token={token}" session_url = popout_path if session_request.mode == "popout" else connect_path ws_scheme = "wss" if (http_request and http_request.url.scheme == "https") else "ws" ws_host = (http_request.url.netloc if http_request else "localhost") ws_token = urlencode({"token": token}) return TerminalSessionResponse( session_id=existing_session.id, url=session_url, websocket_url=f"{ws_scheme}://{ws_host}/api/terminal/proxy/{existing_session.id}/ws?{ws_token}", expires_at=expires_at, ttl_seconds=remaining, mode=existing_session.mode, host=TerminalSessionHost( id=host.id, name=host.name, ip=host.ip_address, status=host.status or "unknown", bootstrap_ok=bootstrap_ok, ), reused=True, token=token, ) active_sessions = await session_repo.list_active_for_user(user_id) active_count = len(active_sessions) if active_count >= TERMINAL_MAX_SESSIONS_PER_USER: same_host_session = next((s for s in active_sessions if s.host_id == host.id), None) terminal_service.record_session_limit_hit() logger.warning(f"session_limit_hit user={username} count={active_count} max={TERMINAL_MAX_SESSIONS_PER_USER}") from fastapi.responses import JSONResponse return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content=_build_session_limit_error(active_sessions, same_host_session), ) session_id = terminal_service.generate_session_id() token, token_hash = terminal_service.generate_session_token() port = None pid = None last_spawn_error = None for _ in range(3): try: port = await terminal_service.allocate_port(session_id) except Exception as e: logger.error(f"Failed to allocate port: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No available ports for terminal session", ) try: pid = await terminal_service.spawn_ttyd( session_id=session_id, host_ip=host.ip_address, port=port, token=token, ) if pid is None: last_spawn_error = "Failed to start terminal process" await terminal_service.release_port(port) port = None continue break except TtydNotAvailableError: await terminal_service.release_port(port) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Terminal feature unavailable: ttyd is not installed", ) except Exception as e: last_spawn_error = str(e) await terminal_service.release_port(port) port = None continue if pid is None or port is None: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to start terminal: {last_spawn_error or 'unknown error'}", ) expires_at = datetime.now(timezone.utc) + timedelta(minutes=TERMINAL_SESSION_TTL_MINUTES) session = None try: session = await session_repo.create( id=session_id, host_id=host.id, host_name=host.name, host_ip=host.ip_address, token_hash=token_hash, ttyd_port=port, ttyd_pid=pid, expires_at=expires_at, user_id=user_id, username=username, mode=session_request.mode, ) await db_session.commit() except Exception as e: logger.exception(f"Failed to create session in DB: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create terminal session" ) finally: if session is None: try: await terminal_service.terminate_session(session_id) except Exception: pass try: await terminal_service.release_port(port) except Exception: pass terminal_service.record_session_created(reused=False) logger.info(f"session_created session={session_id[:8]}... user={username} host={host.name}") connect_path = f"/api/terminal/connect/{session_id}?token={token}" popout_path = f"/api/terminal/popout/{session_id}?token={token}" session_url = popout_path if session_request.mode == "popout" else connect_path ws_scheme = "wss" if (http_request and http_request.url.scheme == "https") else "ws" ws_host = (http_request.url.netloc if http_request else "localhost") ws_token = urlencode({"token": token}) return TerminalSessionResponse( session_id=session_id, url=session_url, websocket_url=f"{ws_scheme}://{ws_host}/api/terminal/proxy/{session_id}/ws?{ws_token}", expires_at=expires_at, ttl_seconds=TERMINAL_SESSION_TTL_MINUTES * 60, mode=session_request.mode, host=TerminalSessionHost( id=host.id, name=host.name, ip=host.ip_address, status=host.status or "unknown", bootstrap_ok=bootstrap_ok, ), reused=False, token=token, ) @router.delete("/sessions/{session_id}") async def close_terminal_session( session_id: str, request: Request, current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if not session: return {"message": "Session not found or already closed", "session_id": session_id} if session.status in [SESSION_STATUS_CLOSED, "expired", "error"]: return {"message": "Session already closed", "session_id": session_id} token = _get_session_token_from_request(request, session_id, token=request.query_params.get("token")) if not token or not terminal_service.verify_token(token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") await terminal_service.close_session_with_cleanup( session_id=session_id, port=session.ttyd_port, reason=CLOSE_REASON_USER ) await session_repo.close_session(session_id, reason=CLOSE_REASON_USER) await db_session.commit() logger.info(f"session_closed session={session_id[:8]}... host={session.host_name} reason=user_close") return {"message": "Session closed", "session_id": session_id} @router.post("/sessions/{session_id}/heartbeat", response_model=HeartbeatResponse) async def heartbeat_terminal_session( session_id: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get_active_by_id(session_id) if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found or expired" ) token = _get_session_token_from_request(request, session_id, token=request.query_params.get("token")) if not token or not terminal_service.verify_token(token, session.token_hash): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid session token") await session_repo.update_last_seen(session_id) await db_session.commit() now = datetime.now(timezone.utc) expires_at = _as_utc_aware(session.expires_at) remaining = _compute_remaining_seconds(expires_at, now=now) return HeartbeatResponse( session_id=session_id, status=session.status, last_seen_at=now, remaining_seconds=remaining, healthy=True, ) @router.post("/sessions/{session_id}/close-beacon") async def close_beacon_terminal_session( session_id: str, request: Request, token: Optional[str] = None, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get(session_id) if session and session.status == SESSION_STATUS_ACTIVE: await terminal_service.close_session_with_cleanup( session_id=session_id, port=session.ttyd_port, reason=CLOSE_REASON_CLIENT_LOST ) await session_repo.close_session(session_id, reason=CLOSE_REASON_CLIENT_LOST) await db_session.commit() logger.info(f"session_closed session={session_id[:8]}... host={session.host_name} reason=client_lost") from fastapi.responses import Response return Response(status_code=204) @router.post("/cleanup") async def cleanup_terminal_sessions( debug_enabled: bool = Depends(require_debug_mode), current_user: dict = Depends(get_current_user), db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) user_id = current_user.get("user_id") or current_user.get("type", "api_key") active_sessions = await session_repo.list_active_for_user(user_id) terminated = 0 closed = 0 for session in active_sessions: try: await terminal_service.terminate_session(session.id) await terminal_service.release_port(session.ttyd_port) terminated += 1 except Exception as e: logger.warning(f"Failed to terminate session {session.id[:8]}: {e}") try: await session_repo.close_session(session.id, reason=CLOSE_REASON_USER) closed += 1 except Exception as e: logger.warning(f"Failed to close session {session.id[:8]} in DB: {e}") await db_session.commit() logger.info(f"Terminal cleanup: {terminated} processes terminated, {closed} sessions closed") return { "message": "Cleanup completed", "processes_terminated": terminated, "sessions_closed": closed, "expired_marked": 0, } class LogCommandRequest(BaseModel): command: str @router.post("/sessions/{session_id}/command") async def log_terminal_command( session_id: str, payload: LogCommandRequest, request: Request, db_session: AsyncSession = Depends(get_db), ): try: session_repo = TerminalSessionRepository(db_session) session = await session_repo.get_active_by_id(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found") # Verify session token actual_token = _get_session_token_from_request(request, session_id, token=request.query_params.get("token")) if not actual_token or not terminal_service.verify_token(actual_token, session.token_hash): raise HTTPException(status_code=403, detail="Invalid session token") cmd = payload.command.strip() if not cmd: return {"status": "ignored", "reason": "empty_command"} if len(cmd) > _MAX_TERMINAL_COMMAND_LENGTH: cmd = cmd[:_MAX_TERMINAL_COMMAND_LENGTH] try: from app.security.command_policy import get_command_policy policy = get_command_policy() result = policy.evaluate(cmd) # If not allowed and not blocked, we ignore it (UNKNOWN) if not result.should_log and not result.is_blocked: return {"status": "ignored", "reason": "not_allowed_to_log"} is_blocked = result.is_blocked reason = result.reason masked_cmd = result.masked_command or ("[BLOCKED]" if is_blocked else cmd) cmd_hash = result.command_hash or hashlib.sha256(masked_cmd.encode('utf-8')).hexdigest() except Exception as pe: logger.warning(f"Policy evaluation failure: {pe}") is_blocked = False reason = str(pe) masked_cmd = cmd cmd_hash = hashlib.sha256(cmd.encode('utf-8')).hexdigest() cmd_repo = TerminalCommandLogRepository(db_session) await cmd_repo.create( host_id=session.host_id, host_name=session.host_name, user_id=session.user_id, username=session.username, terminal_session_id=session.id, command=masked_cmd, command_hash=cmd_hash, source="terminal_dynamic", is_blocked=is_blocked, blocked_reason=reason, ) await db_session.commit() return {"status": "success", "blocked": is_blocked} except HTTPException: raise except Exception as e: logger.error(f"Error logging terminal command: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/connect/{session_id}") async def get_terminal_connect_page( session_id: str, token: str, request: Request, db_session: AsyncSession = Depends(get_db), ): session_repo = TerminalSessionRepository(db_session) session = await session_repo.get_active_by_id(session_id) if not session: return _templates.TemplateResponse( "terminal/error.html", { "request": request, "title": "Session Expirée", "heading": "Session Expirée ou Invalide", "messages": ["Cette session terminal n'existe pas ou a expiré."], "show_back_link": True, }, status_code=404, ) if not terminal_service.verify_token(token, session.token_hash): return _templates.TemplateResponse( "terminal/error.html", { "request": request, "title": html.escape(session.host_name or "Terminal"), "heading": "Accès Refusé", "messages": ["Token de session invalide."], "show_back_link": False, }, status_code=403, ) alive = True try: alive = await terminal_service.is_session_process_alive(session_id) except Exception: alive = True if not alive: if await _is_port_reachable(request, session.ttyd_port): alive = True else: try: pid = await terminal_service.spawn_ttyd( session_id=session_id, host_ip=session.host_ip, port=session.ttyd_port, token=token, ) if pid: session.ttyd_pid = pid await db_session.commit() alive = True except Exception: alive = False if not alive: return _templates.TemplateResponse( "terminal/error.html", { "request": request, "title": html.escape(session.host_name or "Terminal"), "heading": "Terminal indisponible", "messages": [ "Le service terminal (ttyd) ne répond pas pour cette session.", "Essayez de reconnecter ou de recréer une session depuis le dashboard.", ], "show_back_link": True, }, status_code=503, ) now = datetime.now(timezone.utc) expires_at = _as_utc_aware(session.expires_at) remaining_seconds = _compute_remaining_seconds(expires_at, now=now) ttyd_port = session.ttyd_port embed_mode = request.query_params.get("embed") in {"1", "true", "yes"} js_session_id = json.dumps(session_id) js_token = json.dumps(token) js_host_id = json.dumps(session.host_id) js_host_name = json.dumps(session.host_name) js_host_ip = json.dumps(session.host_ip) debug_mode_enabled = bool(settings.debug_mode) debug_panel_html = ( """
""".format(session_id=html.escape(session_id), ttyd_port=ttyd_port) if debug_mode_enabled else "" ) # JavaScript blocks are built server-side so SESSION_ID/TOKEN can be JSON-encoded debug_js = ( "" if not debug_mode_enabled else " let debugVisible = false;\n\n" " function toggleDebug(force) {\n" " debugVisible = typeof force === 'boolean' ? force : !debugVisible;\n" " const el = document.getElementById('terminalDebug');\n" " if (el) el.style.display = debugVisible ? 'block' : 'none';\n" " }\n\n" " window.addEventListener('keydown', (e) => { if ((e.ctrlKey||e.metaKey)&&e.shiftKey&&(e.key==='D'||e.key==='d')) { e.preventDefault(); toggleDebug(); } });\n\n" ) history_js = """ // ---- History panel state ---- let historyPanelOpen = false, historyPanelPinned = false; let historySelectedIndex = -1, historySearchQuery = '', historyTimeFilter = 'all'; let historyPinnedOnly = false; // ---- Panel open/close ---- function toggleHistory() { if (historyPanelOpen && !historyPanelPinned) { closeHistoryPanel(); return; } const panel = document.getElementById('terminalHistoryPanel'); const btn = document.getElementById('btnHistory'); panel.style.display = 'flex'; panel.classList.add('open'); btn.classList.add('active'); historyPanelOpen = true; historySelectedIndex = -1; const si = document.getElementById('terminalHistorySearch'); if (si) { si.focus(); si.select(); } if (historyData.length === 0) loadHistory(); } function closeHistoryPanel() { if (historyPanelPinned) return; const panel = document.getElementById('terminalHistoryPanel'); const btn = document.getElementById('btnHistory'); panel.classList.remove('open'); btn.classList.remove('active'); historyPanelOpen = false; historySelectedIndex = -1; setTimeout(() => { try { panel.style.display = 'none'; } catch(e){} }, 200); document.getElementById('terminalFrame').focus(); } function toggleHistoryPin() { historyPanelPinned = !historyPanelPinned; const btn = document.getElementById('btnHistoryPin'); const panel = document.getElementById('terminalHistoryPanel'); btn?.classList.toggle('active', historyPanelPinned); panel?.classList.toggle('docked', historyPanelPinned); } function togglePinnedOnly() { historyPinnedOnly = !historyPinnedOnly; const btn = document.getElementById('btnPinnedOnly'); btn?.classList.toggle('active', historyPinnedOnly); historySelectedIndex = -1; loadHistory(); } // ---- Data loading ---- async function loadHistory() { const list = document.getElementById('terminalHistoryList'); list.innerHTML = '${pinned?'':''}${dc}