311 lines
9.7 KiB
Python
311 lines
9.7 KiB
Python
"""Persistent store: signatures, groups, shared mailboxes, autoreply, rules.
|
|
|
|
Drop-in replacement for the in-memory state previously kept in mock_mail.py.
|
|
The mock_mail module re-exports these as a thin wrapper for backwards-compat.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import select, delete, update
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from ..db import db_session
|
|
from ..models import (
|
|
Signature, Group, GroupMember, SharedMailbox, SharedMember, Autoreply, Rule,
|
|
)
|
|
|
|
|
|
# ── Signatures ─────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class SignatureDTO:
|
|
id: int
|
|
name: str
|
|
body_md: str
|
|
is_default: bool
|
|
|
|
|
|
def _sig_to_dto(s: Signature) -> SignatureDTO:
|
|
return SignatureDTO(id=s.id, name=s.name, body_md=s.body_md, is_default=s.is_default)
|
|
|
|
|
|
def list_signatures(owner: str) -> list[SignatureDTO]:
|
|
s = db_session()
|
|
rows = s.execute(select(Signature).where(Signature.owner_email == owner).order_by(Signature.id)).scalars().all()
|
|
return [_sig_to_dto(r) for r in rows]
|
|
|
|
|
|
def default_signature(owner: str) -> Optional[SignatureDTO]:
|
|
s = db_session()
|
|
r = s.execute(select(Signature).where(Signature.owner_email == owner, Signature.is_default == True)).scalar_one_or_none()
|
|
return _sig_to_dto(r) if r else None
|
|
|
|
|
|
def get_signature(sid: int, owner: str | None = None) -> Optional[SignatureDTO]:
|
|
s = db_session()
|
|
q = select(Signature).where(Signature.id == sid)
|
|
if owner:
|
|
q = q.where(Signature.owner_email == owner)
|
|
r = s.execute(q).scalar_one_or_none()
|
|
return _sig_to_dto(r) if r else None
|
|
|
|
|
|
def add_signature(owner: str, name: str, body_md: str, is_default: bool = False) -> int:
|
|
s = db_session()
|
|
if is_default:
|
|
s.execute(update(Signature).where(Signature.owner_email == owner).values(is_default=False))
|
|
sig = Signature(owner_email=owner, name=name.strip(), body_md=body_md, is_default=is_default)
|
|
s.add(sig); s.commit()
|
|
return sig.id
|
|
|
|
|
|
def update_signature(sid: int, owner: str, name: str, body_md: str, is_default: bool):
|
|
s = db_session()
|
|
if is_default:
|
|
s.execute(update(Signature).where(Signature.owner_email == owner, Signature.id != sid).values(is_default=False))
|
|
s.execute(update(Signature).where(Signature.id == sid, Signature.owner_email == owner).values(
|
|
name=name.strip(), body_md=body_md, is_default=is_default,
|
|
))
|
|
s.commit()
|
|
|
|
|
|
def delete_signature(sid: int, owner: str):
|
|
s = db_session()
|
|
s.execute(delete(Signature).where(Signature.id == sid, Signature.owner_email == owner))
|
|
s.commit()
|
|
|
|
|
|
# ── Groups ─────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class GroupDTO:
|
|
id: int
|
|
name: str
|
|
members: list[str]
|
|
|
|
|
|
def _grp_to_dto(g: Group) -> GroupDTO:
|
|
return GroupDTO(id=g.id, name=g.name, members=[m.email for m in g.members])
|
|
|
|
|
|
def list_groups(owner: str) -> list[GroupDTO]:
|
|
s = db_session()
|
|
rows = s.execute(select(Group).where(Group.owner_email == owner).order_by(Group.name)).scalars().all()
|
|
return [_grp_to_dto(r) for r in rows]
|
|
|
|
|
|
def get_group(gid: int, owner: str | None = None) -> Optional[GroupDTO]:
|
|
s = db_session()
|
|
q = select(Group).where(Group.id == gid)
|
|
if owner:
|
|
q = q.where(Group.owner_email == owner)
|
|
r = s.execute(q).scalar_one_or_none()
|
|
return _grp_to_dto(r) if r else None
|
|
|
|
|
|
def save_group(owner: str, gid: int | None, name: str, members: list[str]) -> int:
|
|
s = db_session()
|
|
members = [m.strip() for m in members if m.strip()]
|
|
if gid:
|
|
g = s.execute(select(Group).where(Group.id == gid, Group.owner_email == owner)).scalar_one_or_none()
|
|
if not g:
|
|
return 0
|
|
g.name = name.strip()
|
|
s.execute(delete(GroupMember).where(GroupMember.group_id == g.id))
|
|
for m in members:
|
|
s.add(GroupMember(group_id=g.id, email=m))
|
|
else:
|
|
g = Group(owner_email=owner, name=name.strip())
|
|
s.add(g); s.flush()
|
|
for m in members:
|
|
s.add(GroupMember(group_id=g.id, email=m))
|
|
s.commit()
|
|
return g.id
|
|
|
|
|
|
def delete_group(gid: int, owner: str):
|
|
s = db_session()
|
|
s.execute(delete(Group).where(Group.id == gid, Group.owner_email == owner))
|
|
s.commit()
|
|
|
|
|
|
# ── Shared mailboxes ───────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class SharedDTO:
|
|
id: int
|
|
name: str
|
|
email: str
|
|
members: list[str]
|
|
|
|
|
|
def _sh_to_dto(sm: SharedMailbox) -> SharedDTO:
|
|
return SharedDTO(id=sm.id, name=sm.name, email=sm.email, members=[m.email for m in sm.members])
|
|
|
|
|
|
def list_shared_all() -> list[SharedDTO]:
|
|
s = db_session()
|
|
rows = s.execute(select(SharedMailbox).order_by(SharedMailbox.name)).scalars().all()
|
|
return [_sh_to_dto(r) for r in rows]
|
|
|
|
|
|
def list_shared_for(user_email: str) -> list[SharedDTO]:
|
|
"""Return shared mailboxes where user_email is a member."""
|
|
s = db_session()
|
|
rows = s.execute(
|
|
select(SharedMailbox)
|
|
.join(SharedMember, SharedMember.shared_id == SharedMailbox.id)
|
|
.where(SharedMember.email == user_email)
|
|
.order_by(SharedMailbox.name)
|
|
).scalars().unique().all()
|
|
return [_sh_to_dto(r) for r in rows]
|
|
|
|
|
|
def get_shared(sid: int) -> Optional[SharedDTO]:
|
|
s = db_session()
|
|
r = s.get(SharedMailbox, sid)
|
|
return _sh_to_dto(r) if r else None
|
|
|
|
|
|
def save_shared(sid: int | None, name: str, email: str, members: list[str]) -> int:
|
|
s = db_session()
|
|
members = [m.strip() for m in members if m.strip()]
|
|
if sid:
|
|
sm = s.get(SharedMailbox, sid)
|
|
if not sm:
|
|
return 0
|
|
sm.name = name.strip()
|
|
sm.email = email.strip().lower()
|
|
s.execute(delete(SharedMember).where(SharedMember.shared_id == sm.id))
|
|
for m in members:
|
|
s.add(SharedMember(shared_id=sm.id, email=m))
|
|
else:
|
|
sm = SharedMailbox(name=name.strip(), email=email.strip().lower())
|
|
s.add(sm); s.flush()
|
|
for m in members:
|
|
s.add(SharedMember(shared_id=sm.id, email=m))
|
|
s.commit()
|
|
return sm.id
|
|
|
|
|
|
def delete_shared(sid: int):
|
|
s = db_session()
|
|
s.execute(delete(SharedMailbox).where(SharedMailbox.id == sid))
|
|
s.commit()
|
|
|
|
|
|
# ── Autoreply ──────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class AutoreplyDTO:
|
|
enabled: bool
|
|
subject: str
|
|
body_md: str
|
|
date_from: str
|
|
date_to: str
|
|
only_contacts: bool
|
|
reply_once_per_day: bool
|
|
|
|
|
|
def get_autoreply(owner: str) -> AutoreplyDTO:
|
|
s = db_session()
|
|
r = s.get(Autoreply, owner)
|
|
if not r:
|
|
return AutoreplyDTO(False, "", "", "", "", False, True)
|
|
return AutoreplyDTO(
|
|
enabled=r.enabled, subject=r.subject, body_md=r.body_md,
|
|
date_from=r.date_from, date_to=r.date_to,
|
|
only_contacts=r.only_contacts, reply_once_per_day=r.reply_once_per_day,
|
|
)
|
|
|
|
|
|
def update_autoreply(owner: str, **fields):
|
|
s = db_session()
|
|
r = s.get(Autoreply, owner)
|
|
if r is None:
|
|
r = Autoreply(owner_email=owner)
|
|
s.add(r)
|
|
for k, v in fields.items():
|
|
if hasattr(r, k):
|
|
setattr(r, k, v)
|
|
s.commit()
|
|
|
|
|
|
# ── Rules ──────────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class RuleDTO:
|
|
id: int
|
|
name: str
|
|
field: str
|
|
op: str
|
|
value: str
|
|
action: str
|
|
action_param: str
|
|
enabled: bool
|
|
apply_to_existing: bool
|
|
continue_chain: bool
|
|
apply_to_spam: bool
|
|
position: int
|
|
|
|
|
|
def _rule_to_dto(r: Rule) -> RuleDTO:
|
|
return RuleDTO(
|
|
id=r.id, name=r.name, field=r.field, op=r.op, value=r.value,
|
|
action=r.action, action_param=r.action_param,
|
|
enabled=r.enabled,
|
|
apply_to_existing=r.apply_to_existing, continue_chain=r.continue_chain,
|
|
apply_to_spam=r.apply_to_spam, position=r.position,
|
|
)
|
|
|
|
|
|
def toggle_rule(rid: int, owner: str) -> bool:
|
|
s = db_session()
|
|
r = s.execute(select(Rule).where(Rule.id == rid, Rule.owner_email == owner)).scalar_one_or_none()
|
|
if not r:
|
|
return False
|
|
r.enabled = not r.enabled
|
|
s.commit()
|
|
return True
|
|
|
|
|
|
def list_rules(owner: str) -> list[RuleDTO]:
|
|
s = db_session()
|
|
rows = s.execute(
|
|
select(Rule).where(Rule.owner_email == owner).order_by(Rule.position, Rule.id)
|
|
).scalars().all()
|
|
return [_rule_to_dto(r) for r in rows]
|
|
|
|
|
|
def get_rule(rid: int, owner: str | None = None) -> Optional[RuleDTO]:
|
|
s = db_session()
|
|
q = select(Rule).where(Rule.id == rid)
|
|
if owner:
|
|
q = q.where(Rule.owner_email == owner)
|
|
r = s.execute(q).scalar_one_or_none()
|
|
return _rule_to_dto(r) if r else None
|
|
|
|
|
|
def save_rule(owner: str, rid: int | None, **fields) -> int:
|
|
s = db_session()
|
|
if rid:
|
|
r = s.execute(select(Rule).where(Rule.id == rid, Rule.owner_email == owner)).scalar_one_or_none()
|
|
if not r:
|
|
return 0
|
|
for k, v in fields.items():
|
|
if hasattr(r, k):
|
|
setattr(r, k, v)
|
|
else:
|
|
r = Rule(owner_email=owner, **{k: v for k, v in fields.items() if hasattr(Rule, k)})
|
|
s.add(r); s.flush()
|
|
s.commit()
|
|
return r.id
|
|
|
|
|
|
def delete_rule(rid: int, owner: str):
|
|
s = db_session()
|
|
s.execute(delete(Rule).where(Rule.id == rid, Rule.owner_email == owner))
|
|
s.commit()
|