from fastapi import APIRouter, HTTPException, status
from api.database import database, user_table
from api.models.user import UserIn
from api.security import authenticate_user, get_password_hash, get_user, create_access_token
router = APIRouter()
@router.post("/register", status_code=201)
async def register(user: UserIn):
if await get_user(user.email):
raise HTTPException(status_code=400, detail="A user with that email already exists")
hashed_password = get_password_hash(user.password)
query = user_table.insert().values(email=user.email, password=hashed_password)
await database.execute(query)
return {"detail": "User created"}
@router.post("/login")
async def login(user: UserIn):
user = await authenticate_user(user.email, user.password)
access_token = create_access_token(user.email)
return {"access_token": access_token, "token_type": "bearer"}
User router
import datetime
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import ExpiredSignatureError, JWTError, jwt
from passlib.context import CryptContext
from api.database import database, user_table
SECRET_KEY = "2b31f2a"
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"])
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
def access_token_expire_minutes() -> int:
return 30
def create_access_token(email: str):
expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=30)
jwt_data = {"sub": email, "exp": expire}
encoded_jwt = jwt.encode(jwt_data, key=SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
async def get_user(email:str):
query = user_table.select().where(user_table.c.email == email)
result = await database.fetch_one(query)
if result:
return result
async def authenticate_user(email: str, password: str):
user = await get_user(email)
if not user:
raise credentials_exception
if not verify_password(password, user.password):
raise credentials_exception
return user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
try:
payload = jwt.decode(token, key=SECRET_KEY, algorithms=[ALGORITHM])
email = payload.get("sub")
if email is None:
raise credentials_exception
except ExpiredSignatureError as e:
raise HTTPException(
status_code=401,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"}
) from e
except JWTError as e:
raise credentials_exception from e
user = await get_user(email=email)
if user is None:
raise credentials_exception
return user
Security module
So the way it is configured right now makes it difficult to implement a logout, and the way it's implemented right now forces us to use get_user several times if we were to hold a column called session_active to implement logout, or a column called secret to hold the secret so we can logout, but then as I've said we need to call get_user several times if we were to do that, so I am wondering if there's any issue with the code.