"""Persistent store: signatures, groups, shared mailboxes, autoreply, rules. Drop-in replacement for the in-memory state previously kept in mock_mail.py. The mock_mail module re-exports these as a thin wrapper for backwards-compat. """ from __future__ import annotations from dataclasses import dataclass from typing import Optional from sqlalchemy import select, delete, update from sqlalchemy.exc import SQLAlchemyError from ..db import db_session from ..models import ( Signature, Group, GroupMember, SharedMailbox, SharedMember, Autoreply, Rule, ) # ── Signatures ───────────────────────────────────────────────────────────── @dataclass class SignatureDTO: id: int name: str body_md: str is_default: bool def _sig_to_dto(s: Signature) -> SignatureDTO: return SignatureDTO(id=s.id, name=s.name, body_md=s.body_md, is_default=s.is_default) def list_signatures(owner: str) -> list[SignatureDTO]: s = db_session() rows = s.execute(select(Signature).where(Signature.owner_email == owner).order_by(Signature.id)).scalars().all() return [_sig_to_dto(r) for r in rows] def default_signature(owner: str) -> Optional[SignatureDTO]: s = db_session() r = s.execute(select(Signature).where(Signature.owner_email == owner, Signature.is_default == True)).scalar_one_or_none() return _sig_to_dto(r) if r else None def get_signature(sid: int, owner: str | None = None) -> Optional[SignatureDTO]: s = db_session() q = select(Signature).where(Signature.id == sid) if owner: q = q.where(Signature.owner_email == owner) r = s.execute(q).scalar_one_or_none() return _sig_to_dto(r) if r else None def add_signature(owner: str, name: str, body_md: str, is_default: bool = False) -> int: s = db_session() if is_default: s.execute(update(Signature).where(Signature.owner_email == owner).values(is_default=False)) sig = Signature(owner_email=owner, name=name.strip(), body_md=body_md, is_default=is_default) s.add(sig); s.commit() return sig.id def update_signature(sid: int, owner: str, name: str, body_md: str, is_default: bool): s = db_session() if is_default: s.execute(update(Signature).where(Signature.owner_email == owner, Signature.id != sid).values(is_default=False)) s.execute(update(Signature).where(Signature.id == sid, Signature.owner_email == owner).values( name=name.strip(), body_md=body_md, is_default=is_default, )) s.commit() def delete_signature(sid: int, owner: str): s = db_session() s.execute(delete(Signature).where(Signature.id == sid, Signature.owner_email == owner)) s.commit() # ── Groups ───────────────────────────────────────────────────────────────── @dataclass class GroupDTO: id: int name: str members: list[str] def _grp_to_dto(g: Group) -> GroupDTO: return GroupDTO(id=g.id, name=g.name, members=[m.email for m in g.members]) def list_groups(owner: str) -> list[GroupDTO]: s = db_session() rows = s.execute(select(Group).where(Group.owner_email == owner).order_by(Group.name)).scalars().all() return [_grp_to_dto(r) for r in rows] def get_group(gid: int, owner: str | None = None) -> Optional[GroupDTO]: s = db_session() q = select(Group).where(Group.id == gid) if owner: q = q.where(Group.owner_email == owner) r = s.execute(q).scalar_one_or_none() return _grp_to_dto(r) if r else None def save_group(owner: str, gid: int | None, name: str, members: list[str]) -> int: s = db_session() members = [m.strip() for m in members if m.strip()] if gid: g = s.execute(select(Group).where(Group.id == gid, Group.owner_email == owner)).scalar_one_or_none() if not g: return 0 g.name = name.strip() s.execute(delete(GroupMember).where(GroupMember.group_id == g.id)) for m in members: s.add(GroupMember(group_id=g.id, email=m)) else: g = Group(owner_email=owner, name=name.strip()) s.add(g); s.flush() for m in members: s.add(GroupMember(group_id=g.id, email=m)) s.commit() return g.id def delete_group(gid: int, owner: str): s = db_session() s.execute(delete(Group).where(Group.id == gid, Group.owner_email == owner)) s.commit() # ── Shared mailboxes ─────────────────────────────────────────────────────── @dataclass class SharedDTO: id: int name: str email: str members: list[str] def _sh_to_dto(sm: SharedMailbox) -> SharedDTO: return SharedDTO(id=sm.id, name=sm.name, email=sm.email, members=[m.email for m in sm.members]) def list_shared_all() -> list[SharedDTO]: s = db_session() rows = s.execute(select(SharedMailbox).order_by(SharedMailbox.name)).scalars().all() return [_sh_to_dto(r) for r in rows] def list_shared_for(user_email: str) -> list[SharedDTO]: """Return shared mailboxes where user_email is a member.""" s = db_session() rows = s.execute( select(SharedMailbox) .join(SharedMember, SharedMember.shared_id == SharedMailbox.id) .where(SharedMember.email == user_email) .order_by(SharedMailbox.name) ).scalars().unique().all() return [_sh_to_dto(r) for r in rows] def get_shared(sid: int) -> Optional[SharedDTO]: s = db_session() r = s.get(SharedMailbox, sid) return _sh_to_dto(r) if r else None def save_shared(sid: int | None, name: str, email: str, members: list[str]) -> int: s = db_session() members = [m.strip() for m in members if m.strip()] if sid: sm = s.get(SharedMailbox, sid) if not sm: return 0 sm.name = name.strip() sm.email = email.strip().lower() s.execute(delete(SharedMember).where(SharedMember.shared_id == sm.id)) for m in members: s.add(SharedMember(shared_id=sm.id, email=m)) else: sm = SharedMailbox(name=name.strip(), email=email.strip().lower()) s.add(sm); s.flush() for m in members: s.add(SharedMember(shared_id=sm.id, email=m)) s.commit() return sm.id def delete_shared(sid: int): s = db_session() s.execute(delete(SharedMailbox).where(SharedMailbox.id == sid)) s.commit() # ── Autoreply ────────────────────────────────────────────────────────────── @dataclass class AutoreplyDTO: enabled: bool subject: str body_md: str date_from: str date_to: str only_contacts: bool reply_once_per_day: bool def get_autoreply(owner: str) -> AutoreplyDTO: s = db_session() r = s.get(Autoreply, owner) if not r: return AutoreplyDTO(False, "", "", "", "", False, True) return AutoreplyDTO( enabled=r.enabled, subject=r.subject, body_md=r.body_md, date_from=r.date_from, date_to=r.date_to, only_contacts=r.only_contacts, reply_once_per_day=r.reply_once_per_day, ) def update_autoreply(owner: str, **fields): s = db_session() r = s.get(Autoreply, owner) if r is None: r = Autoreply(owner_email=owner) s.add(r) for k, v in fields.items(): if hasattr(r, k): setattr(r, k, v) s.commit() # ── Rules ────────────────────────────────────────────────────────────────── @dataclass class RuleDTO: id: int name: str field: str op: str value: str action: str action_param: str enabled: bool apply_to_existing: bool continue_chain: bool apply_to_spam: bool position: int def _rule_to_dto(r: Rule) -> RuleDTO: return RuleDTO( id=r.id, name=r.name, field=r.field, op=r.op, value=r.value, action=r.action, action_param=r.action_param, enabled=r.enabled, apply_to_existing=r.apply_to_existing, continue_chain=r.continue_chain, apply_to_spam=r.apply_to_spam, position=r.position, ) def toggle_rule(rid: int, owner: str) -> bool: s = db_session() r = s.execute(select(Rule).where(Rule.id == rid, Rule.owner_email == owner)).scalar_one_or_none() if not r: return False r.enabled = not r.enabled s.commit() return True def list_rules(owner: str) -> list[RuleDTO]: s = db_session() rows = s.execute( select(Rule).where(Rule.owner_email == owner).order_by(Rule.position, Rule.id) ).scalars().all() return [_rule_to_dto(r) for r in rows] def get_rule(rid: int, owner: str | None = None) -> Optional[RuleDTO]: s = db_session() q = select(Rule).where(Rule.id == rid) if owner: q = q.where(Rule.owner_email == owner) r = s.execute(q).scalar_one_or_none() return _rule_to_dto(r) if r else None def save_rule(owner: str, rid: int | None, **fields) -> int: s = db_session() if rid: r = s.execute(select(Rule).where(Rule.id == rid, Rule.owner_email == owner)).scalar_one_or_none() if not r: return 0 for k, v in fields.items(): if hasattr(r, k): setattr(r, k, v) else: r = Rule(owner_email=owner, **{k: v for k, v in fields.items() if hasattr(Rule, k)}) s.add(r); s.flush() s.commit() return r.id def delete_rule(rid: int, owner: str): s = db_session() s.execute(delete(Rule).where(Rule.id == rid, Rule.owner_email == owner)) s.commit()