Source code for maintenance_script

#!/usr/bin/env python3
"""
Maintenance script for managing users, resetting passwords and generating password hashes.

Run './maintenance_script.py' to see the help message.
"""

import uuid

from authentication import hash_password
import sys

from sqlalchemy.orm import Session

from config import config
from sqlalchemy import create_engine, select, func

from models.user import User


[docs] def create_user(db_session: Session, username: str, email: str, password: str): """ Create a new user with the given username, email and password. """ user_id = uuid.uuid4() password_hash = hash_password(password) user = User( id=user_id, name=username, email=email, password_hash=password_hash, two_fa_secret=None, is_admin=False, must_change_password=True, ) db_session.add(user) db_session.commit()
[docs] def delete_user(db_session: Session, username: str): """ Delete the user with the given username. """ stmt = select(User).where(func.lower(User.name) == func.lower(username)) user = db_session.scalars(stmt).one_or_none() if user is None: raise ValueError("user doesn't exist") db_session.delete(user) db_session.commit()
[docs] def reset_password(db_session: Session, username: str, new_password: str): """ Reset the password of the user with the given username. """ stmt = select(User).where(func.lower(User.name) == func.lower(username)) user = db_session.scalars(stmt).one_or_none() if user is None: raise ValueError("user doesn't exist") user.password_hash = hash_password(new_password) # also reset 2fa because the user likely also lost these credentials user.two_fa_secret = None if user.webauthn: db_session.delete(user.webauthn) db_session.commit()
[docs] def set_user_admin_state(db_session: Session, username: str, is_admin: bool): """ Promote a user to become admin or demote an admin to become a normal user. :param is_admin: whether the user should be promoted to admin (True) or demoted to user (False) """ stmt = select(User).where(func.lower(User.name) == func.lower(username)) user = db_session.scalars(stmt).one_or_none() if user is None: raise ValueError("user doesn't exist") user.is_admin = is_admin db_session.commit()
if __name__ == "__main__": # the script doesn't run async, so the aiosqlite extension is not needed db_connection_url = config.database_url.replace("+aiosqlite", "") engine = create_engine(db_connection_url) if len(sys.argv) <= 1: print( """ Usage: - `./maintenance_script.py hash-password "<password>"` - `./maintenance_script.py add-user "<username>" "<email>" "<password>"` - `./maintenance_script.py delete-user "<username>" - `./maintenance_script.py reset-password "<username>" "<password>"` - `./maintenance_script.py promote-user "<username>" - `./maintenance_script.py demote-user "<username>" """.strip() ) sys.exit(0) # first arg is the script name, so skip it command = sys.argv[1] args = sys.argv[2:] with Session(engine) as db_session: match command: case "add-user": assert len(args) == 3, f"expected to get 3 arguments, got {len(args)}" create_user(db_session, *args) case "reset-password": assert len(args) == 2, f"expected to get 2 arguments, got {len(args)}" reset_password(db_session, *args) case "delete-user": assert len(args) == 1, f"expected to get 1 argument, got {len(args)}" delete_user(db_session, *args) case "promote-user": assert len(args) == 1, f"expected to get 1 argument, got {len(args)}" set_user_admin_state(db_session, args[0], True) case "demote-user": assert len(args) == 1, f"expected to get 1 argument, got {len(args)}" set_user_admin_state(db_session, args[0], False) case "hash-password": assert len(args) == 1, f"expected to get 1 argument, got {len(args)}" print(hash_password(*args)) case _: raise ValueError(f"unknown command '{command}'")