"""Generate Sieve scripts from app rules and deploy to user's Dovecot. Rule semantics → Sieve mapping: field: from/to/cc/subject/body/forwarded_from/forwarded_to/size op: contains, not_contains, equals, starts, ends, matches, gt action: move, star, important, mark_read, delete, forward_copy, autoreply Resulting script lives at /var/mail/sieve//managesieve.sieve inside DMS. """ from __future__ import annotations import re import subprocess import os from io import StringIO from typing import Iterable from .store import RuleDTO _FIELD_TO_HEADER = { "from": "From", "to": "To", "cc": "Cc", "subject": "Subject", "forwarded_from": "X-Forwarded-From", "forwarded_to": "X-Forwarded-To", } def _q(s: str) -> str: return '"' + str(s).replace("\\", "\\\\").replace('"', '\\"') + '"' def _condition(rule: RuleDTO) -> str | None: """Return Sieve test expression (without `if `) or None if unsupported.""" f, op, v = rule.field, rule.op, rule.value if f == "body": match = {"contains": ":contains", "matches": ":matches"}.get(op, ":contains") return f'body :text {match} {_q(v)}' if f == "size": try: kb = int(v) except ValueError: kb = 0 if op == "gt": return f"size :over {kb}K" return f"size :under {kb}K" header = _FIELD_TO_HEADER.get(f) if not header: return None if op == "equals": return f'header :is {_q(header)} {_q(v)}' if op == "starts": return f'header :matches {_q(header)} {_q(v + "*")}' if op == "ends": return f'header :matches {_q(header)} {_q("*" + v)}' if op == "matches": return f'header :matches {_q(header)} {_q(v)}' if op == "not_contains": return f'not header :contains {_q(header)} {_q(v)}' # default — contains return f'header :contains {_q(header)} {_q(v)}' def _action(rule: RuleDTO) -> list[str]: a, p = rule.action, rule.action_param if a == "move": return [f"fileinto {_q(p or 'INBOX')};"] if a == "delete": return ["discard;", "stop;"] if a == "mark_read": return ["addflag \"\\\\Seen\";"] if a == "star": return ["addflag \"\\\\Flagged\";"] if a == "important": return ["addflag \"$Important\";"] if a == "forward_copy" and p: return [f"redirect :copy {_q(p)};"] if a == "autoreply" and p: return [ 'vacation :days 1 :subject "Re: ${subject}" ' f'{_q(p)};' ] return [] def render(rules: Iterable[RuleDTO]) -> str: """Produce a complete sieve script for the given rules.""" out = StringIO() out.write('require ["fileinto","imap4flags","copy","body","vacation","envelope"];\n\n') for r in rules: if not r.enabled: continue cond = _condition(r) if not cond: continue actions = _action(r) if not actions: continue out.write(f"# rule {r.id}: {r.name}\n") if not r.apply_to_spam: cond = f'allof(not header :contains "X-Spam-Flag" "YES", {cond})' out.write(f"if {cond} {{\n") for a in actions: out.write(f" {a}\n") if not r.continue_chain: out.write(" stop;\n") out.write("}\n\n") return out.getvalue() # ── Deploy to Dovecot ───────────────────────────────────────────────────── def _container() -> str: return os.getenv("DMS_CONTAINER", "docker-mailserver-1") def deploy(user_email: str, script: str) -> tuple[bool, str]: """Write the script to the user's Sieve dir inside the DMS container.""" safe = re.sub(r"[^a-zA-Z0-9._@-]", "", user_email) if safe != user_email: return False, "invalid user_email" domain, local = "", user_email if "@" in user_email: local, domain = user_email.split("@", 1) target = f"/var/mail/{domain}/{local}/sieve/managesieve.sieve" if domain else f"/var/mail/{local}/sieve/managesieve.sieve" cmd = [ "docker", "exec", "-i", _container(), "bash", "-c", f"mkdir -p $(dirname {target}) && cat > {target} && " f"chown -R 5000:5000 $(dirname {target}) && " f"sievec {target}", ] try: p = subprocess.run(cmd, input=script, text=True, capture_output=True, timeout=20) except Exception as exc: return False, str(exc) if p.returncode != 0: return False, (p.stderr or p.stdout).strip() or "error" return True, target