"""IMAP IDLE manager. One background thread per (user, folder) pair keeps an IMAP IDLE connection open. When Dovecot pushes EXISTS/RECENT/FETCH notifications, the thread puts an event into every registered SSE queue for that user so the browser receives a push without polling. Usage: manager = IdleManager() manager.start(user_email, password, folder_key) # call once per session q = manager.subscribe(user_email, folder_key) # per SSE connection manager.unsubscribe(user_email, folder_key, q) # on disconnect """ import queue import threading import time import logging from imapclient import IMAPClient log = logging.getLogger(__name__) # IMAP IDLE must be renewed every <29 min per RFC 2177 _IDLE_REFRESH_SEC = 25 * 60 _IDLE_POLL_SEC = 10 # check for server responses every N seconds class IdleManager: def __init__(self): self._lock = threading.Lock() # (user, folder) -> set of queue.Queue self._subscribers: dict[tuple, set] = {} # (user, folder) -> threading.Thread self._threads: dict[tuple, threading.Thread] = {} # (user, folder) -> stop event self._stops: dict[tuple, threading.Event] = {} # credentials cache: user -> password self._creds: dict[str, str] = {} # ── public API ──────────────────────────────────────────────────────── def start(self, user: str, password: str, folder_key: str, host: str = "127.0.0.1", port: int = 143): """Ensure an IDLE thread is running for this user+folder.""" key = (user, folder_key) with self._lock: self._creds[user] = password if key in self._threads and self._threads[key].is_alive(): return stop = threading.Event() self._stops[key] = stop self._subscribers.setdefault(key, set()) t = threading.Thread( target=self._idle_loop, args=(user, password, folder_key, host, port, stop), daemon=True, name=f"idle-{user}-{folder_key}", ) self._threads[key] = t t.start() def subscribe(self, user: str, folder_key: str) -> queue.Queue: """Return a new queue that receives events for this user+folder.""" key = (user, folder_key) q: queue.Queue = queue.Queue(maxsize=20) with self._lock: self._subscribers.setdefault(key, set()).add(q) return q def unsubscribe(self, user: str, folder_key: str, q: queue.Queue): key = (user, folder_key) with self._lock: self._subscribers.get(key, set()).discard(q) def stop(self, user: str, folder_key: str): key = (user, folder_key) with self._lock: ev = self._stops.get(key) if ev: ev.set() # ── internal ────────────────────────────────────────────────────────── def _notify(self, user: str, folder_key: str): try: from . import imap_client imap_client.invalidate_folder_counts(user) except Exception: pass # Enqueue Meili indexing of the new messages. try: password = self._creds.get(user) if password: from ..tasks import index_new_messages index_new_messages.send(user, password, folder_key) except Exception: log.warning("queue index_new_messages failed", exc_info=True) key = (user, folder_key) with self._lock: queues = list(self._subscribers.get(key, set())) for q in queues: try: q.put_nowait("new_mail") except queue.Full: pass def _idle_loop(self, user: str, password: str, folder_key: str, host: str, port: int, stop: threading.Event): imap_folder = _imap_folder_name(folder_key) while not stop.is_set(): conn = None try: conn = IMAPClient(host, port=port, use_uid=True, ssl=False) conn.login(user, password) conn.select_folder(imap_folder, readonly=True) conn.idle() deadline = time.monotonic() + _IDLE_REFRESH_SEC while not stop.is_set(): remaining = deadline - time.monotonic() if remaining <= 0: # renew IDLE before 29-min RFC limit conn.idle_done() conn.idle() deadline = time.monotonic() + _IDLE_REFRESH_SEC continue responses = conn.idle_check(timeout=min(_IDLE_POLL_SEC, remaining)) if responses: kinds = {r[1] for r in responses if len(r) > 1} if kinds & {b"EXISTS", b"RECENT", b"FETCH"}: self._notify(user, folder_key) conn.idle_done() except Exception as exc: log.warning("IDLE error for %s/%s: %s — reconnecting in 5s", user, folder_key, exc) time.sleep(5) finally: if conn: try: conn.logout() except Exception: pass def _imap_folder_name(folder_key: str) -> str: return { "inbox": "INBOX", "sent": "Sent", "drafts": "Drafts", "scheduled": "Drafts", "spam": "Junk", "trash": "Trash", }.get(folder_key, folder_key) # Singleton — shared across all requests manager = IdleManager()