import os from datetime import timedelta from flask import Flask, g, redirect, request, session, url_for from .config import Config from .db import init_engine, db_session def create_app(config_class: type = Config) -> Flask: app = Flask(__name__, static_folder="static", template_folder="templates") app.config.from_object(config_class) init_engine(app.config["SQLALCHEMY_DATABASE_URI"]) app.permanent_session_lifetime = timedelta(days=14) @app.teardown_appcontext def _remove_session(exc=None): db_session().remove() PUBLIC_PATHS = ("/auth/", "/static/", "/favicon.ico", "/api/health") def _is_admin(email: str | None) -> bool: if not email: return False admins = {a.strip().lower() for a in (app.config.get("ADMINS", "") or "").split(",") if a.strip()} return email.lower() in admins @app.before_request def _require_login(): from .services import sessions as _sess path = request.path or "/" if any(path.startswith(p) for p in PUBLIC_PATHS): return None token = session.get("sid") # Backwards-compat with old session format (user_email/user_password in cookie). if not token and session.get("user_email"): email = session.get("user_email") pw = session.get("user_password") or "" ip = (request.headers.get("X-Forwarded-For", "").split(",")[0].strip() or request.remote_addr or "") token = _sess.create(email, pw, ip=ip, user_agent=request.headers.get("User-Agent", "")) session.clear() session.permanent = True session["sid"] = token record = _sess.load(token) if not record: session.clear() if path.startswith("/api/"): from flask import jsonify return jsonify(ok=False, error="unauthorized"), 401 return redirect(url_for("auth.login", next=request.full_path if request.query_string else path)) g.session_token = token g.user_email = record["user_email"] g.user_password = record["password"] g.is_admin = _is_admin(g.user_email) if path.startswith("/admin/") and not g.is_admin: from flask import abort abort(403) return None from .blueprints.mail import bp as mail_bp from .blueprints.admin import bp as admin_bp from .blueprints.auth import bp as auth_bp from .blueprints.api import bp as api_bp from .blueprints.groups import bp as groups_bp from .blueprints.rules import bp as rules_bp from .blueprints.settings import bp as settings_bp from .blueprints.shared import bp as shared_bp app.register_blueprint(mail_bp) app.register_blueprint(admin_bp, url_prefix="/admin") app.register_blueprint(auth_bp, url_prefix="/auth") app.register_blueprint(api_bp, url_prefix="/api") app.register_blueprint(groups_bp, url_prefix="/groups") app.register_blueprint(rules_bp, url_prefix="/rules") app.register_blueprint(settings_bp, url_prefix="/settings") app.register_blueprint(shared_bp, url_prefix="/shared") from .services import mock_mail from .services import mail_service as ms def _quota_for(email: str) -> dict | None: try: from .services import dms_config for mb in dms_config.list_mailboxes(): if mb.email == email: return {"used": mb.used_human, "quota": mb.quota_human, "pct": mb.pct} except Exception: pass return None @app.context_processor def inject_nav(): me = mock_mail.CURRENT_USER_EMAIL shared_my = mock_mail.list_shared_for(me) try: jsmt = int(os.path.getmtime(os.path.join(app.static_folder, "app.js"))) cssmt = int(os.path.getmtime(os.path.join(app.static_folder, "styles.css"))) asset_v = max(jsmt, cssmt) except Exception: asset_v = 0 return { "asset_v": asset_v, "is_admin": _is_admin(me), "user_quota": _quota_for(me), "current_user_email": me, "mail_server_hostname": app.config["MAIL_SERVER_HOSTNAME"], "all_folders_nav": mock_mail.all_folders(), "folder_counts": ms.get_folder_counts(), "compose_signatures": mock_mail.list_signatures(), "compose_default_sig": mock_mail.default_signature(), "shared_my": shared_my, "compose_from_options": [(me, me)] + [ (s.email, s.name + " — " + s.email) for s in shared_my ], } @app.after_request def _no_cache_html(resp): ct = resp.headers.get("Content-Type", "") if ct.startswith("text/html"): resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" return resp try: from .tasks import ensure_trash_cleanup_scheduled ensure_trash_cleanup_scheduled() except Exception: app.logger.warning("trash cleanup scheduler init failed", exc_info=True) return app