160 lines
5.8 KiB
Python
160 lines
5.8 KiB
Python
"""IMAP IDLE manager.
|
|
|
|
One background thread per (user, folder) pair keeps an IMAP IDLE connection
|
|
open. When Dovecot pushes EXISTS/RECENT/FETCH notifications, the thread puts
|
|
an event into every registered SSE queue for that user so the browser
|
|
receives a push without polling.
|
|
|
|
Usage:
|
|
manager = IdleManager()
|
|
manager.start(user_email, password, folder_key) # call once per session
|
|
q = manager.subscribe(user_email, folder_key) # per SSE connection
|
|
manager.unsubscribe(user_email, folder_key, q) # on disconnect
|
|
"""
|
|
|
|
import queue
|
|
import threading
|
|
import time
|
|
import logging
|
|
from imapclient import IMAPClient
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# IMAP IDLE must be renewed every <29 min per RFC 2177
|
|
_IDLE_REFRESH_SEC = 25 * 60
|
|
_IDLE_POLL_SEC = 10 # check for server responses every N seconds
|
|
|
|
|
|
class IdleManager:
|
|
def __init__(self):
|
|
self._lock = threading.Lock()
|
|
# (user, folder) -> set of queue.Queue
|
|
self._subscribers: dict[tuple, set] = {}
|
|
# (user, folder) -> threading.Thread
|
|
self._threads: dict[tuple, threading.Thread] = {}
|
|
# (user, folder) -> stop event
|
|
self._stops: dict[tuple, threading.Event] = {}
|
|
# credentials cache: user -> password
|
|
self._creds: dict[str, str] = {}
|
|
|
|
# ── public API ────────────────────────────────────────────────────────
|
|
|
|
def start(self, user: str, password: str, folder_key: str,
|
|
host: str = "127.0.0.1", port: int = 143):
|
|
"""Ensure an IDLE thread is running for this user+folder."""
|
|
key = (user, folder_key)
|
|
with self._lock:
|
|
self._creds[user] = password
|
|
if key in self._threads and self._threads[key].is_alive():
|
|
return
|
|
stop = threading.Event()
|
|
self._stops[key] = stop
|
|
self._subscribers.setdefault(key, set())
|
|
t = threading.Thread(
|
|
target=self._idle_loop,
|
|
args=(user, password, folder_key, host, port, stop),
|
|
daemon=True,
|
|
name=f"idle-{user}-{folder_key}",
|
|
)
|
|
self._threads[key] = t
|
|
t.start()
|
|
|
|
def subscribe(self, user: str, folder_key: str) -> queue.Queue:
|
|
"""Return a new queue that receives events for this user+folder."""
|
|
key = (user, folder_key)
|
|
q: queue.Queue = queue.Queue(maxsize=20)
|
|
with self._lock:
|
|
self._subscribers.setdefault(key, set()).add(q)
|
|
return q
|
|
|
|
def unsubscribe(self, user: str, folder_key: str, q: queue.Queue):
|
|
key = (user, folder_key)
|
|
with self._lock:
|
|
self._subscribers.get(key, set()).discard(q)
|
|
|
|
def stop(self, user: str, folder_key: str):
|
|
key = (user, folder_key)
|
|
with self._lock:
|
|
ev = self._stops.get(key)
|
|
if ev:
|
|
ev.set()
|
|
|
|
# ── internal ──────────────────────────────────────────────────────────
|
|
|
|
def _notify(self, user: str, folder_key: str):
|
|
try:
|
|
from . import imap_client
|
|
imap_client.invalidate_folder_counts(user)
|
|
except Exception:
|
|
pass
|
|
# Enqueue Meili indexing of the new messages.
|
|
try:
|
|
password = self._creds.get(user)
|
|
if password:
|
|
from ..tasks import index_new_messages
|
|
index_new_messages.send(user, password, folder_key)
|
|
except Exception:
|
|
log.warning("queue index_new_messages failed", exc_info=True)
|
|
key = (user, folder_key)
|
|
with self._lock:
|
|
queues = list(self._subscribers.get(key, set()))
|
|
for q in queues:
|
|
try:
|
|
q.put_nowait("new_mail")
|
|
except queue.Full:
|
|
pass
|
|
|
|
def _idle_loop(self, user: str, password: str, folder_key: str,
|
|
host: str, port: int, stop: threading.Event):
|
|
imap_folder = _imap_folder_name(folder_key)
|
|
while not stop.is_set():
|
|
conn = None
|
|
try:
|
|
conn = IMAPClient(host, port=port, use_uid=True, ssl=False)
|
|
conn.login(user, password)
|
|
conn.select_folder(imap_folder, readonly=True)
|
|
conn.idle()
|
|
deadline = time.monotonic() + _IDLE_REFRESH_SEC
|
|
|
|
while not stop.is_set():
|
|
remaining = deadline - time.monotonic()
|
|
if remaining <= 0:
|
|
# renew IDLE before 29-min RFC limit
|
|
conn.idle_done()
|
|
conn.idle()
|
|
deadline = time.monotonic() + _IDLE_REFRESH_SEC
|
|
continue
|
|
|
|
responses = conn.idle_check(timeout=min(_IDLE_POLL_SEC, remaining))
|
|
if responses:
|
|
kinds = {r[1] for r in responses if len(r) > 1}
|
|
if kinds & {b"EXISTS", b"RECENT", b"FETCH"}:
|
|
self._notify(user, folder_key)
|
|
|
|
conn.idle_done()
|
|
except Exception as exc:
|
|
log.warning("IDLE error for %s/%s: %s — reconnecting in 5s",
|
|
user, folder_key, exc)
|
|
time.sleep(5)
|
|
finally:
|
|
if conn:
|
|
try:
|
|
conn.logout()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _imap_folder_name(folder_key: str) -> str:
|
|
return {
|
|
"inbox": "INBOX",
|
|
"sent": "Sent",
|
|
"drafts": "Drafts",
|
|
"scheduled": "Drafts",
|
|
"spam": "Junk",
|
|
"trash": "Trash",
|
|
}.get(folder_key, folder_key)
|
|
|
|
|
|
# Singleton — shared across all requests
|
|
manager = IdleManager()
|