133 lines
5.1 KiB
Python
133 lines
5.1 KiB
Python
import os
|
|
from datetime import timedelta
|
|
from flask import Flask, g, redirect, request, session, url_for
|
|
|
|
from .config import Config
|
|
from .db import init_engine, db_session
|
|
|
|
|
|
def create_app(config_class: type = Config) -> Flask:
|
|
app = Flask(__name__, static_folder="static", template_folder="templates")
|
|
app.config.from_object(config_class)
|
|
|
|
init_engine(app.config["SQLALCHEMY_DATABASE_URI"])
|
|
app.permanent_session_lifetime = timedelta(days=14)
|
|
|
|
@app.teardown_appcontext
|
|
def _remove_session(exc=None):
|
|
db_session().remove()
|
|
|
|
PUBLIC_PATHS = ("/auth/", "/static/", "/favicon.ico", "/api/health")
|
|
|
|
def _is_admin(email: str | None) -> bool:
|
|
if not email:
|
|
return False
|
|
admins = {a.strip().lower() for a in (app.config.get("ADMINS", "") or "").split(",") if a.strip()}
|
|
return email.lower() in admins
|
|
|
|
@app.before_request
|
|
def _require_login():
|
|
from .services import sessions as _sess
|
|
path = request.path or "/"
|
|
if any(path.startswith(p) for p in PUBLIC_PATHS):
|
|
return None
|
|
token = session.get("sid")
|
|
# Backwards-compat with old session format (user_email/user_password in cookie).
|
|
if not token and session.get("user_email"):
|
|
email = session.get("user_email")
|
|
pw = session.get("user_password") or ""
|
|
ip = (request.headers.get("X-Forwarded-For", "").split(",")[0].strip()
|
|
or request.remote_addr or "")
|
|
token = _sess.create(email, pw, ip=ip, user_agent=request.headers.get("User-Agent", ""))
|
|
session.clear()
|
|
session.permanent = True
|
|
session["sid"] = token
|
|
record = _sess.load(token)
|
|
if not record:
|
|
session.clear()
|
|
if path.startswith("/api/"):
|
|
from flask import jsonify
|
|
return jsonify(ok=False, error="unauthorized"), 401
|
|
return redirect(url_for("auth.login", next=request.full_path if request.query_string else path))
|
|
g.session_token = token
|
|
g.user_email = record["user_email"]
|
|
g.user_password = record["password"]
|
|
g.is_admin = _is_admin(g.user_email)
|
|
if path.startswith("/admin/") and not g.is_admin:
|
|
from flask import abort
|
|
abort(403)
|
|
return None
|
|
|
|
from .blueprints.mail import bp as mail_bp
|
|
from .blueprints.admin import bp as admin_bp
|
|
from .blueprints.auth import bp as auth_bp
|
|
from .blueprints.api import bp as api_bp
|
|
from .blueprints.groups import bp as groups_bp
|
|
from .blueprints.rules import bp as rules_bp
|
|
from .blueprints.settings import bp as settings_bp
|
|
from .blueprints.shared import bp as shared_bp
|
|
|
|
app.register_blueprint(mail_bp)
|
|
app.register_blueprint(admin_bp, url_prefix="/admin")
|
|
app.register_blueprint(auth_bp, url_prefix="/auth")
|
|
app.register_blueprint(api_bp, url_prefix="/api")
|
|
app.register_blueprint(groups_bp, url_prefix="/groups")
|
|
app.register_blueprint(rules_bp, url_prefix="/rules")
|
|
app.register_blueprint(settings_bp, url_prefix="/settings")
|
|
app.register_blueprint(shared_bp, url_prefix="/shared")
|
|
|
|
from .services import mock_mail
|
|
from .services import mail_service as ms
|
|
|
|
def _quota_for(email: str) -> dict | None:
|
|
try:
|
|
from .services import dms_config
|
|
for mb in dms_config.list_mailboxes():
|
|
if mb.email == email:
|
|
return {"used": mb.used_human, "quota": mb.quota_human, "pct": mb.pct}
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
@app.context_processor
|
|
def inject_nav():
|
|
me = mock_mail.CURRENT_USER_EMAIL
|
|
shared_my = mock_mail.list_shared_for(me)
|
|
try:
|
|
jsmt = int(os.path.getmtime(os.path.join(app.static_folder, "app.js")))
|
|
cssmt = int(os.path.getmtime(os.path.join(app.static_folder, "styles.css")))
|
|
asset_v = max(jsmt, cssmt)
|
|
except Exception:
|
|
asset_v = 0
|
|
return {
|
|
"asset_v": asset_v,
|
|
"is_admin": _is_admin(me),
|
|
"user_quota": _quota_for(me),
|
|
"current_user_email": me,
|
|
"mail_server_hostname": app.config["MAIL_SERVER_HOSTNAME"],
|
|
"all_folders_nav": mock_mail.all_folders(),
|
|
"folder_counts": ms.get_folder_counts(),
|
|
"compose_signatures": mock_mail.list_signatures(),
|
|
"compose_default_sig": mock_mail.default_signature(),
|
|
"shared_my": shared_my,
|
|
"compose_from_options": [(me, me)] + [
|
|
(s.email, s.name + " — " + s.email) for s in shared_my
|
|
],
|
|
}
|
|
|
|
@app.after_request
|
|
def _no_cache_html(resp):
|
|
ct = resp.headers.get("Content-Type", "")
|
|
if ct.startswith("text/html"):
|
|
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
|
|
resp.headers["Pragma"] = "no-cache"
|
|
return resp
|
|
|
|
try:
|
|
from .tasks import ensure_trash_cleanup_scheduled
|
|
ensure_trash_cleanup_scheduled()
|
|
except Exception:
|
|
app.logger.warning("trash cleanup scheduler init failed", exc_info=True)
|
|
|
|
return app
|