mail/app/services/imap_idle.py
deeily 5024bf9a8d init: full mail stack — phases 0..8 (web client, admin, IMAP/SMTP,
sieve, search, sessions, dramatiq, deploy/install, ELK, monitoring)
2026-04-29 16:30:43 +03:00

160 lines
5.8 KiB
Python

"""IMAP IDLE manager.
One background thread per (user, folder) pair keeps an IMAP IDLE connection
open. When Dovecot pushes EXISTS/RECENT/FETCH notifications, the thread puts
an event into every registered SSE queue for that user so the browser
receives a push without polling.
Usage:
manager = IdleManager()
manager.start(user_email, password, folder_key) # call once per session
q = manager.subscribe(user_email, folder_key) # per SSE connection
manager.unsubscribe(user_email, folder_key, q) # on disconnect
"""
import queue
import threading
import time
import logging
from imapclient import IMAPClient
log = logging.getLogger(__name__)
# IMAP IDLE must be renewed every <29 min per RFC 2177
_IDLE_REFRESH_SEC = 25 * 60
_IDLE_POLL_SEC = 10 # check for server responses every N seconds
class IdleManager:
def __init__(self):
self._lock = threading.Lock()
# (user, folder) -> set of queue.Queue
self._subscribers: dict[tuple, set] = {}
# (user, folder) -> threading.Thread
self._threads: dict[tuple, threading.Thread] = {}
# (user, folder) -> stop event
self._stops: dict[tuple, threading.Event] = {}
# credentials cache: user -> password
self._creds: dict[str, str] = {}
# ── public API ────────────────────────────────────────────────────────
def start(self, user: str, password: str, folder_key: str,
host: str = "127.0.0.1", port: int = 143):
"""Ensure an IDLE thread is running for this user+folder."""
key = (user, folder_key)
with self._lock:
self._creds[user] = password
if key in self._threads and self._threads[key].is_alive():
return
stop = threading.Event()
self._stops[key] = stop
self._subscribers.setdefault(key, set())
t = threading.Thread(
target=self._idle_loop,
args=(user, password, folder_key, host, port, stop),
daemon=True,
name=f"idle-{user}-{folder_key}",
)
self._threads[key] = t
t.start()
def subscribe(self, user: str, folder_key: str) -> queue.Queue:
"""Return a new queue that receives events for this user+folder."""
key = (user, folder_key)
q: queue.Queue = queue.Queue(maxsize=20)
with self._lock:
self._subscribers.setdefault(key, set()).add(q)
return q
def unsubscribe(self, user: str, folder_key: str, q: queue.Queue):
key = (user, folder_key)
with self._lock:
self._subscribers.get(key, set()).discard(q)
def stop(self, user: str, folder_key: str):
key = (user, folder_key)
with self._lock:
ev = self._stops.get(key)
if ev:
ev.set()
# ── internal ──────────────────────────────────────────────────────────
def _notify(self, user: str, folder_key: str):
try:
from . import imap_client
imap_client.invalidate_folder_counts(user)
except Exception:
pass
# Enqueue Meili indexing of the new messages.
try:
password = self._creds.get(user)
if password:
from ..tasks import index_new_messages
index_new_messages.send(user, password, folder_key)
except Exception:
log.warning("queue index_new_messages failed", exc_info=True)
key = (user, folder_key)
with self._lock:
queues = list(self._subscribers.get(key, set()))
for q in queues:
try:
q.put_nowait("new_mail")
except queue.Full:
pass
def _idle_loop(self, user: str, password: str, folder_key: str,
host: str, port: int, stop: threading.Event):
imap_folder = _imap_folder_name(folder_key)
while not stop.is_set():
conn = None
try:
conn = IMAPClient(host, port=port, use_uid=True, ssl=False)
conn.login(user, password)
conn.select_folder(imap_folder, readonly=True)
conn.idle()
deadline = time.monotonic() + _IDLE_REFRESH_SEC
while not stop.is_set():
remaining = deadline - time.monotonic()
if remaining <= 0:
# renew IDLE before 29-min RFC limit
conn.idle_done()
conn.idle()
deadline = time.monotonic() + _IDLE_REFRESH_SEC
continue
responses = conn.idle_check(timeout=min(_IDLE_POLL_SEC, remaining))
if responses:
kinds = {r[1] for r in responses if len(r) > 1}
if kinds & {b"EXISTS", b"RECENT", b"FETCH"}:
self._notify(user, folder_key)
conn.idle_done()
except Exception as exc:
log.warning("IDLE error for %s/%s: %s — reconnecting in 5s",
user, folder_key, exc)
time.sleep(5)
finally:
if conn:
try:
conn.logout()
except Exception:
pass
def _imap_folder_name(folder_key: str) -> str:
return {
"inbox": "INBOX",
"sent": "Sent",
"drafts": "Drafts",
"scheduled": "Drafts",
"spam": "Junk",
"trash": "Trash",
}.get(folder_key, folder_key)
# Singleton — shared across all requests
manager = IdleManager()