173 lines
5.3 KiB
Python
173 lines
5.3 KiB
Python
# backend/auth/user_store.py
|
|
# CRUD operations on data/users.json with atomic writes.
|
|
# File is read on each auth request (no stale cache), written atomically
|
|
# via tmp+rename to prevent corruption on crash.
|
|
|
|
import json
|
|
import uuid
|
|
import logging
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional, List
|
|
|
|
from .password import hash_password
|
|
|
|
logger = logging.getLogger("obsigate.auth.users")
|
|
|
|
USERS_FILE = Path("data/users.json")
|
|
|
|
|
|
def _read() -> dict:
|
|
"""Read users.json. Returns empty structure if file doesn't exist."""
|
|
if not USERS_FILE.exists():
|
|
return {"version": 1, "users": {}}
|
|
try:
|
|
return json.loads(USERS_FILE.read_text(encoding="utf-8"))
|
|
except (json.JSONDecodeError, OSError) as e:
|
|
logger.error(f"Failed to read users.json: {e}")
|
|
return {"version": 1, "users": {}}
|
|
|
|
|
|
def _write(data: dict):
|
|
"""Atomic write: write to .tmp then rename to prevent corruption."""
|
|
USERS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = USERS_FILE.with_suffix(".tmp")
|
|
tmp.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8")
|
|
shutil.move(str(tmp), str(USERS_FILE))
|
|
try:
|
|
USERS_FILE.chmod(0o600)
|
|
except OSError:
|
|
pass # Windows doesn't support Unix permissions
|
|
|
|
|
|
def has_users() -> bool:
|
|
"""Check if any users exist."""
|
|
return bool(_read()["users"])
|
|
|
|
|
|
def get_user(username: str) -> Optional[dict]:
|
|
"""Get a user by username. Returns None if not found."""
|
|
return _read()["users"].get(username)
|
|
|
|
|
|
def get_all_users() -> List[dict]:
|
|
"""Get all users WITHOUT password_hash (safe for API responses)."""
|
|
users = _read()["users"]
|
|
return [
|
|
{k: v for k, v in u.items() if k != "password_hash"}
|
|
for u in users.values()
|
|
]
|
|
|
|
|
|
def create_user(
|
|
username: str,
|
|
password: str,
|
|
role: str = "user",
|
|
vaults: Optional[List[str]] = None,
|
|
display_name: Optional[str] = None,
|
|
) -> dict:
|
|
"""Create a new user. Raises ValueError if username already taken."""
|
|
data = _read()
|
|
if username in data["users"]:
|
|
raise ValueError(f"User '{username}' already exists")
|
|
|
|
user = {
|
|
"id": str(uuid.uuid4()),
|
|
"username": username,
|
|
"display_name": display_name or username,
|
|
"password_hash": hash_password(password),
|
|
"role": role,
|
|
"vaults": vaults or [],
|
|
"active": True,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"last_login": None,
|
|
"failed_attempts": 0,
|
|
"locked_until": None,
|
|
}
|
|
data["users"][username] = user
|
|
_write(data)
|
|
logger.info(f"Created user '{username}' (role={role})")
|
|
return {k: v for k, v in user.items() if k != "password_hash"}
|
|
|
|
|
|
def update_user(username: str, updates: dict) -> dict:
|
|
"""Update user fields. Raises ValueError if user not found.
|
|
|
|
Forbidden fields (id, username, created_at) are silently ignored.
|
|
If 'password' is in updates, it's hashed and stored as password_hash.
|
|
"""
|
|
data = _read()
|
|
if username not in data["users"]:
|
|
raise ValueError(f"User '{username}' not found")
|
|
|
|
forbidden = {"id", "username", "created_at"}
|
|
safe_updates = {k: v for k, v in updates.items() if k not in forbidden}
|
|
|
|
if "password" in safe_updates:
|
|
safe_updates["password_hash"] = hash_password(safe_updates.pop("password"))
|
|
|
|
data["users"][username].update(safe_updates)
|
|
_write(data)
|
|
return {k: v for k, v in data["users"][username].items() if k != "password_hash"}
|
|
|
|
|
|
def delete_user(username: str):
|
|
"""Delete a user. Raises ValueError if not found."""
|
|
data = _read()
|
|
if username not in data["users"]:
|
|
raise ValueError(f"User '{username}' not found")
|
|
del data["users"][username]
|
|
_write(data)
|
|
logger.info(f"Deleted user '{username}'")
|
|
|
|
|
|
def record_login_success(username: str):
|
|
"""Record a successful login: update last_login, reset failed attempts."""
|
|
update_user(username, {
|
|
"last_login": datetime.now(timezone.utc).isoformat(),
|
|
"failed_attempts": 0,
|
|
"locked_until": None,
|
|
})
|
|
|
|
|
|
def record_login_failure(username: str) -> int:
|
|
"""Record a failed login. Returns the new attempt count.
|
|
|
|
After 5 failures, locks the account for 15 minutes.
|
|
"""
|
|
data = _read()
|
|
user = data["users"].get(username)
|
|
if not user:
|
|
return 0
|
|
|
|
attempts = user.get("failed_attempts", 0) + 1
|
|
updates = {"failed_attempts": attempts}
|
|
|
|
# Lock after 5 failed attempts (15 minutes)
|
|
if attempts >= 5:
|
|
locked_until = (
|
|
datetime.now(timezone.utc) + timedelta(minutes=15)
|
|
).isoformat()
|
|
updates["locked_until"] = locked_until
|
|
logger.warning(f"Account '{username}' locked after {attempts} failed attempts")
|
|
|
|
update_user(username, updates)
|
|
return attempts
|
|
|
|
|
|
def is_locked(username: str) -> bool:
|
|
"""Check if an account is temporarily locked.
|
|
|
|
Auto-unlocks if the lock period has expired.
|
|
"""
|
|
user = get_user(username)
|
|
if not user or not user.get("locked_until"):
|
|
return False
|
|
locked_until = datetime.fromisoformat(user["locked_until"])
|
|
if datetime.now(timezone.utc) > locked_until:
|
|
# Lock expired — unlock
|
|
update_user(username, {"locked_until": None, "failed_attempts": 0})
|
|
return False
|
|
return True
|