mail/app/services/store.py
deeily 5024bf9a8d init: full mail stack — phases 0..8 (web client, admin, IMAP/SMTP,
sieve, search, sessions, dramatiq, deploy/install, ELK, monitoring)
2026-04-29 16:30:43 +03:00

311 lines
9.7 KiB
Python

"""Persistent store: signatures, groups, shared mailboxes, autoreply, rules.
Drop-in replacement for the in-memory state previously kept in mock_mail.py.
The mock_mail module re-exports these as a thin wrapper for backwards-compat.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from sqlalchemy import select, delete, update
from sqlalchemy.exc import SQLAlchemyError
from ..db import db_session
from ..models import (
Signature, Group, GroupMember, SharedMailbox, SharedMember, Autoreply, Rule,
)
# ── Signatures ─────────────────────────────────────────────────────────────
@dataclass
class SignatureDTO:
id: int
name: str
body_md: str
is_default: bool
def _sig_to_dto(s: Signature) -> SignatureDTO:
return SignatureDTO(id=s.id, name=s.name, body_md=s.body_md, is_default=s.is_default)
def list_signatures(owner: str) -> list[SignatureDTO]:
s = db_session()
rows = s.execute(select(Signature).where(Signature.owner_email == owner).order_by(Signature.id)).scalars().all()
return [_sig_to_dto(r) for r in rows]
def default_signature(owner: str) -> Optional[SignatureDTO]:
s = db_session()
r = s.execute(select(Signature).where(Signature.owner_email == owner, Signature.is_default == True)).scalar_one_or_none()
return _sig_to_dto(r) if r else None
def get_signature(sid: int, owner: str | None = None) -> Optional[SignatureDTO]:
s = db_session()
q = select(Signature).where(Signature.id == sid)
if owner:
q = q.where(Signature.owner_email == owner)
r = s.execute(q).scalar_one_or_none()
return _sig_to_dto(r) if r else None
def add_signature(owner: str, name: str, body_md: str, is_default: bool = False) -> int:
s = db_session()
if is_default:
s.execute(update(Signature).where(Signature.owner_email == owner).values(is_default=False))
sig = Signature(owner_email=owner, name=name.strip(), body_md=body_md, is_default=is_default)
s.add(sig); s.commit()
return sig.id
def update_signature(sid: int, owner: str, name: str, body_md: str, is_default: bool):
s = db_session()
if is_default:
s.execute(update(Signature).where(Signature.owner_email == owner, Signature.id != sid).values(is_default=False))
s.execute(update(Signature).where(Signature.id == sid, Signature.owner_email == owner).values(
name=name.strip(), body_md=body_md, is_default=is_default,
))
s.commit()
def delete_signature(sid: int, owner: str):
s = db_session()
s.execute(delete(Signature).where(Signature.id == sid, Signature.owner_email == owner))
s.commit()
# ── Groups ─────────────────────────────────────────────────────────────────
@dataclass
class GroupDTO:
id: int
name: str
members: list[str]
def _grp_to_dto(g: Group) -> GroupDTO:
return GroupDTO(id=g.id, name=g.name, members=[m.email for m in g.members])
def list_groups(owner: str) -> list[GroupDTO]:
s = db_session()
rows = s.execute(select(Group).where(Group.owner_email == owner).order_by(Group.name)).scalars().all()
return [_grp_to_dto(r) for r in rows]
def get_group(gid: int, owner: str | None = None) -> Optional[GroupDTO]:
s = db_session()
q = select(Group).where(Group.id == gid)
if owner:
q = q.where(Group.owner_email == owner)
r = s.execute(q).scalar_one_or_none()
return _grp_to_dto(r) if r else None
def save_group(owner: str, gid: int | None, name: str, members: list[str]) -> int:
s = db_session()
members = [m.strip() for m in members if m.strip()]
if gid:
g = s.execute(select(Group).where(Group.id == gid, Group.owner_email == owner)).scalar_one_or_none()
if not g:
return 0
g.name = name.strip()
s.execute(delete(GroupMember).where(GroupMember.group_id == g.id))
for m in members:
s.add(GroupMember(group_id=g.id, email=m))
else:
g = Group(owner_email=owner, name=name.strip())
s.add(g); s.flush()
for m in members:
s.add(GroupMember(group_id=g.id, email=m))
s.commit()
return g.id
def delete_group(gid: int, owner: str):
s = db_session()
s.execute(delete(Group).where(Group.id == gid, Group.owner_email == owner))
s.commit()
# ── Shared mailboxes ───────────────────────────────────────────────────────
@dataclass
class SharedDTO:
id: int
name: str
email: str
members: list[str]
def _sh_to_dto(sm: SharedMailbox) -> SharedDTO:
return SharedDTO(id=sm.id, name=sm.name, email=sm.email, members=[m.email for m in sm.members])
def list_shared_all() -> list[SharedDTO]:
s = db_session()
rows = s.execute(select(SharedMailbox).order_by(SharedMailbox.name)).scalars().all()
return [_sh_to_dto(r) for r in rows]
def list_shared_for(user_email: str) -> list[SharedDTO]:
"""Return shared mailboxes where user_email is a member."""
s = db_session()
rows = s.execute(
select(SharedMailbox)
.join(SharedMember, SharedMember.shared_id == SharedMailbox.id)
.where(SharedMember.email == user_email)
.order_by(SharedMailbox.name)
).scalars().unique().all()
return [_sh_to_dto(r) for r in rows]
def get_shared(sid: int) -> Optional[SharedDTO]:
s = db_session()
r = s.get(SharedMailbox, sid)
return _sh_to_dto(r) if r else None
def save_shared(sid: int | None, name: str, email: str, members: list[str]) -> int:
s = db_session()
members = [m.strip() for m in members if m.strip()]
if sid:
sm = s.get(SharedMailbox, sid)
if not sm:
return 0
sm.name = name.strip()
sm.email = email.strip().lower()
s.execute(delete(SharedMember).where(SharedMember.shared_id == sm.id))
for m in members:
s.add(SharedMember(shared_id=sm.id, email=m))
else:
sm = SharedMailbox(name=name.strip(), email=email.strip().lower())
s.add(sm); s.flush()
for m in members:
s.add(SharedMember(shared_id=sm.id, email=m))
s.commit()
return sm.id
def delete_shared(sid: int):
s = db_session()
s.execute(delete(SharedMailbox).where(SharedMailbox.id == sid))
s.commit()
# ── Autoreply ──────────────────────────────────────────────────────────────
@dataclass
class AutoreplyDTO:
enabled: bool
subject: str
body_md: str
date_from: str
date_to: str
only_contacts: bool
reply_once_per_day: bool
def get_autoreply(owner: str) -> AutoreplyDTO:
s = db_session()
r = s.get(Autoreply, owner)
if not r:
return AutoreplyDTO(False, "", "", "", "", False, True)
return AutoreplyDTO(
enabled=r.enabled, subject=r.subject, body_md=r.body_md,
date_from=r.date_from, date_to=r.date_to,
only_contacts=r.only_contacts, reply_once_per_day=r.reply_once_per_day,
)
def update_autoreply(owner: str, **fields):
s = db_session()
r = s.get(Autoreply, owner)
if r is None:
r = Autoreply(owner_email=owner)
s.add(r)
for k, v in fields.items():
if hasattr(r, k):
setattr(r, k, v)
s.commit()
# ── Rules ──────────────────────────────────────────────────────────────────
@dataclass
class RuleDTO:
id: int
name: str
field: str
op: str
value: str
action: str
action_param: str
enabled: bool
apply_to_existing: bool
continue_chain: bool
apply_to_spam: bool
position: int
def _rule_to_dto(r: Rule) -> RuleDTO:
return RuleDTO(
id=r.id, name=r.name, field=r.field, op=r.op, value=r.value,
action=r.action, action_param=r.action_param,
enabled=r.enabled,
apply_to_existing=r.apply_to_existing, continue_chain=r.continue_chain,
apply_to_spam=r.apply_to_spam, position=r.position,
)
def toggle_rule(rid: int, owner: str) -> bool:
s = db_session()
r = s.execute(select(Rule).where(Rule.id == rid, Rule.owner_email == owner)).scalar_one_or_none()
if not r:
return False
r.enabled = not r.enabled
s.commit()
return True
def list_rules(owner: str) -> list[RuleDTO]:
s = db_session()
rows = s.execute(
select(Rule).where(Rule.owner_email == owner).order_by(Rule.position, Rule.id)
).scalars().all()
return [_rule_to_dto(r) for r in rows]
def get_rule(rid: int, owner: str | None = None) -> Optional[RuleDTO]:
s = db_session()
q = select(Rule).where(Rule.id == rid)
if owner:
q = q.where(Rule.owner_email == owner)
r = s.execute(q).scalar_one_or_none()
return _rule_to_dto(r) if r else None
def save_rule(owner: str, rid: int | None, **fields) -> int:
s = db_session()
if rid:
r = s.execute(select(Rule).where(Rule.id == rid, Rule.owner_email == owner)).scalar_one_or_none()
if not r:
return 0
for k, v in fields.items():
if hasattr(r, k):
setattr(r, k, v)
else:
r = Rule(owner_email=owner, **{k: v for k, v in fields.items() if hasattr(Rule, k)})
s.add(r); s.flush()
s.commit()
return r.id
def delete_rule(rid: int, owner: str):
s = db_session()
s.execute(delete(Rule).where(Rule.id == rid, Rule.owner_email == owner))
s.commit()