aboutsummaryrefslogtreecommitdiffstats
path: root/src/expense/auth
diff options
context:
space:
mode:
Diffstat (limited to 'src/expense/auth')
-rw-r--r--src/expense/auth/__init__.py5
-rw-r--r--src/expense/auth/blueprint.py20
-rw-r--r--src/expense/auth/delete.py31
-rw-r--r--src/expense/auth/login.py63
-rw-r--r--src/expense/auth/logout.py14
-rw-r--r--src/expense/auth/password.py45
-rw-r--r--src/expense/auth/register.py80
7 files changed, 258 insertions, 0 deletions
diff --git a/src/expense/auth/__init__.py b/src/expense/auth/__init__.py
new file mode 100644
index 0000000..ebd4e03
--- /dev/null
+++ b/src/expense/auth/__init__.py
@@ -0,0 +1,5 @@
+"""
+The authentication blueprint
+"""
+
+from .blueprint import auth
diff --git a/src/expense/auth/blueprint.py b/src/expense/auth/blueprint.py
new file mode 100644
index 0000000..4f36c12
--- /dev/null
+++ b/src/expense/auth/blueprint.py
@@ -0,0 +1,20 @@
+from typing import Optional
+from flask import Blueprint
+
+from ..model import User
+from .. import loginManager, db
+
+auth = Blueprint("auth", __name__, url_prefix="/auth")
+
+
+@loginManager.user_loader
+def load_user(id: str) -> Optional[User]:
+ """
+ Callback to load user from id which is a number
+ """
+ if id is not None:
+ return db.session.scalars(db.select(User).where(User.id == id)).one_or_none()
+ return None
+
+
+from . import login, logout, register, delete
diff --git a/src/expense/auth/delete.py b/src/expense/auth/delete.py
new file mode 100644
index 0000000..aaa7477
--- /dev/null
+++ b/src/expense/auth/delete.py
@@ -0,0 +1,31 @@
+from typing import Union
+from flask import redirect, url_for, render_template
+from flask_login import current_user, login_required, logout_user
+from flask_wtf import FlaskForm
+from werkzeug import Response
+from wtforms import SubmitField
+
+from ..model import User
+from .blueprint import auth, db
+
+
+class DeleteForm(FlaskForm):
+ """To Handle Delete the user"""
+ submit = SubmitField("Delete")
+
+
+@auth.route("/delete", methods=["GET", "POST"])
+@login_required
+def delete() -> Union[str, Response]:
+ """
+ Delete the user account. Has both GET and POST
+ """
+ form = DeleteForm()
+ if form.validate_on_submit():
+ user_id = current_user.id # type: ignore
+ logout_user()
+ user = db.session.scalars(db.select(User).where(User.id == user_id)).one()
+ db.session.delete(user)
+ db.session.commit()
+ return redirect(url_for("index"))
+ return render_template("auth/delete.html", form=form)
diff --git a/src/expense/auth/login.py b/src/expense/auth/login.py
new file mode 100644
index 0000000..00f78eb
--- /dev/null
+++ b/src/expense/auth/login.py
@@ -0,0 +1,63 @@
+from typing import Optional, Union
+from flask import (
+ redirect,
+ render_template,
+ request,
+ url_for,
+)
+from flask_login import current_user, login_user
+
+from flask_wtf import FlaskForm
+from werkzeug import Response
+from wtforms import StringField, PasswordField, SubmitField
+from wtforms.validators import DataRequired, InputRequired
+
+from .blueprint import auth
+from ..model import User
+from .. import db
+
+
+class LoginForm(FlaskForm):
+ """
+ The Login Form.
+ The validate method has be improved to do validating the user
+ """
+ username = StringField(validators=[DataRequired(), InputRequired()])
+ password = PasswordField(validators=[DataRequired(), InputRequired()])
+ submit = SubmitField("Log In")
+
+ def validate(self, *args, **kwargs):
+ if not super().validate(*args, **kwargs):
+ return False
+
+ username = self.username.data
+ password = self.password.data
+
+ user = db.session.scalars(
+ db.select(User).where(User.name == username)
+ ).one_or_none()
+ if user is None:
+ self.username.errors.append("Invalid username") # type: ignore
+ return False
+
+ if not user.password == password:
+ self.password.errors.append("Invalid password") # type: ignore
+ return False
+
+ self.user = user
+ return True
+
+
+@auth.route("/login", methods=("GET", "POST"))
+def login() -> Union[str, Response]:
+ """
+ The view for Loging in the User
+ """
+ if current_user.is_authenticated: # type: ignore
+ return redirect(request.args.get("next", default=url_for("index")))
+ form = LoginForm()
+ if form.validate_on_submit():
+ login_user(form.user)
+ return redirect(request.args.get("next", default=url_for("index")))
+
+ return render_template("auth/login.html", form=form)
diff --git a/src/expense/auth/logout.py b/src/expense/auth/logout.py
new file mode 100644
index 0000000..47fa306
--- /dev/null
+++ b/src/expense/auth/logout.py
@@ -0,0 +1,14 @@
+from flask import redirect, url_for
+from flask_login import login_required, logout_user
+from werkzeug import Response
+
+from .blueprint import auth
+
+@auth.route("/logout")
+@login_required
+def logout() -> Response:
+ """
+ Logout user View.
+ """
+ logout_user()
+ return redirect(url_for("index"))
diff --git a/src/expense/auth/password.py b/src/expense/auth/password.py
new file mode 100644
index 0000000..0c7090c
--- /dev/null
+++ b/src/expense/auth/password.py
@@ -0,0 +1,45 @@
+from dataclasses import dataclass
+from sqlalchemy import Text, TypeDecorator
+from werkzeug.security import check_password_hash, generate_password_hash
+
+
+@dataclass(eq=False)
+class PasswordHash:
+ """
+ Generate hash for password storage. It wraps arround the werkzeug.security
+ module.
+ """
+
+ hash: str
+
+ def __eq__(self, candidate) -> bool:
+ return check_password_hash(self.hash, candidate)
+
+ @classmethod
+ def new(cls, password: str, method: str = "pbkdf2", salt_length: int = 16):
+ """Wrapper over generate_password"""
+ hashed_password = generate_password_hash(password, method, salt_length)
+ return cls(hashed_password)
+
+
+class Password(TypeDecorator):
+ """The SQLAlchemy Addapter for PasswordHash"""
+
+ impl = Text
+
+ def process_bind_param(self, value, dialect) -> str | None:
+ if value is not None:
+ if isinstance(value, PasswordHash):
+ return value.hash
+ raise ValueError("Invalid value type. Expected PasswordHash object.")
+ return None
+
+ def process_result_value(self, value: str, dialect) -> PasswordHash | None:
+ if value is not None:
+ return PasswordHash(value)
+ return None
+
+ def coerce_compared_value(self, op, value):
+ if isinstance(value, PasswordHash):
+ return value.hash
+ return super().coerce_compared_value(op, value)
diff --git a/src/expense/auth/register.py b/src/expense/auth/register.py
new file mode 100644
index 0000000..0f43c9f
--- /dev/null
+++ b/src/expense/auth/register.py
@@ -0,0 +1,80 @@
+"""Register User"""
+from typing import Union
+from flask import (
+ redirect,
+ render_template,
+ url_for,
+)
+from flask_login import current_user, login_user
+
+from flask_wtf import FlaskForm
+from werkzeug import Response
+from wtforms import StringField, PasswordField, SubmitField
+from wtforms.validators import (
+ DataRequired,
+ InputRequired,
+ Length,
+ ValidationError,
+ EqualTo,
+)
+
+from ..model import User, Category
+
+from .password import PasswordHash
+
+from .blueprint import auth
+from .. import db
+
+
+class RegisterForm(FlaskForm):
+ """User Registeration form"""
+ username = StringField(validators=[DataRequired(), InputRequired(), Length(max=64)])
+ password = PasswordField(
+ validators=[
+ DataRequired(),
+ InputRequired(),
+ Length(min=8, message="Select a stronger password."),
+ ]
+ )
+ confirm = PasswordField(
+ "Repeat Password",
+ validators=[
+ DataRequired(),
+ InputRequired(),
+ EqualTo("password", message="Passwords must match."),
+ ],
+ )
+ submit = SubmitField("Register")
+
+ def validate_username(self, field: StringField) -> None:
+ user = db.session.execute(
+ db.select(User).where(User.name == field.data)
+ ).first()
+ if user is not None:
+ raise ValidationError("Please use a different username.")
+
+
+@auth.route("/register", methods=("GET", "POST"))
+def register() -> Union[str, Response]:
+ """
+ Registering User Route
+ Support both POST and GET
+ """
+ if current_user.is_authenticated: # type: ignore
+ return redirect(url_for("index"))
+ form = RegisterForm()
+ if form.validate_on_submit():
+ user = User(name=form.username.data, password=PasswordHash.new(form.password.data)) # type: ignore
+
+ for cat in ["Misc", "Income"]:
+ category = db.session.scalars(
+ db.select(Category).where(Category.name == cat)
+ ).one()
+ user.categories.append(category)
+
+ db.session.add(user)
+ db.session.commit()
+ login_user(user)
+ return redirect(url_for("index"))
+ print(form.confirm.errors)
+ return render_template("auth/register.html", form=form)