Coverage for slidge / core / session.py: 83%
229 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import abc
2import asyncio
3import logging
4from asyncio.tasks import Task
5from collections.abc import Coroutine
6from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Self, cast
8import aiohttp
9import sqlalchemy as sa
10from slixmpp import JID, Iq, Message, Presence
11from slixmpp.exceptions import XMPPError
12from slixmpp.types import PresenceShows, ResourceDict
14from slidge.db.meta import JSONSerializable
16from ..command import SearchResult
17from ..contact import LegacyContact
18from ..db.models import Contact, GatewayUser
19from ..util import SubclassableOnce
20from ..util.lock import NamedLockMixin
21from ..util.types import (
22 AnyBookmarks,
23 AnyMUC,
24 AnyParticipant,
25 AnyRoster,
26 AnySession,
27 LegacyContactType,
28 PseudoPresenceShow,
29)
30from ..util.util import noop_coro
32if TYPE_CHECKING:
33 from .gateway import BaseGateway
36class CachedPresence(NamedTuple):
37 status: str | None
38 show: str | None
39 kwargs: dict[str, Any]
42class BaseSession(
43 Generic[LegacyContactType], NamedLockMixin, SubclassableOnce, abc.ABC
44):
45 """
46 The session of a registered :term:`User`.
48 Represents a gateway user logged in to the legacy network and performing actions.
50 Will be instantiated automatically on slidge startup for each registered user,
51 or upon registration for new (validated) users.
53 Must be subclassed for a functional :term:`Legacy Module`.
54 """
56 """
57 Since we cannot set the XMPP ID of messages sent by XMPP clients, we need to keep a mapping
58 between XMPP IDs and legacy message IDs if we want to further refer to a message that was sent
59 by the user. This also applies to 'carboned' messages, ie, messages sent by the user from
60 the official client of a legacy network.
61 """
63 xmpp: "BaseGateway[Self]"
64 """
65 The gateway instance singleton. Use it for low-level XMPP calls or custom methods that are not
66 session-specific.
67 """
69 MESSAGE_IDS_ARE_THREAD_IDS = False
70 """
71 Set this to True if the legacy service uses message IDs as thread IDs,
72 eg Mattermost, where you can only 'create a thread' by replying to the message,
73 in which case the message ID is also a thread ID (and all messages are potential
74 threads).
75 """
76 SPECIAL_MSG_ID_PREFIX: str | None = None
77 """
78 If you set this, XMPP message IDs starting with this won't be converted to legacy ID,
79 but passed as is to :meth:`LegacyContact.on_react`, and usual checks for emoji restriction won't be
80 applied.
81 This can be used to implement voting in polls in a hacky way.
82 """
84 _roster_cls: type[AnyRoster]
85 _bookmarks_cls: type[AnyBookmarks]
87 def __init__(self, user: GatewayUser) -> None:
88 super().__init__()
89 self.user = user
90 """
91 The :term:`slidge user <User>`.
92 """
93 self.log = logging.getLogger(user.jid.bare)
95 self.ignore_messages = set[str]()
97 self.contacts: AnyRoster = self._roster_cls(self)
98 self.is_logging_in = False
99 self._logged = False
100 self.__reset_ready()
102 self.bookmarks = self._bookmarks_cls(self)
104 self.thread_creation_lock = asyncio.Lock()
106 self.__cached_presence: CachedPresence | None = None
108 self.__tasks = set[asyncio.Task[Any]]()
110 @property
111 def user_jid(self) -> JID:
112 return self.user.jid
114 @property
115 def user_pk(self) -> int:
116 return self.user.id
118 @property
119 def http(self) -> aiohttp.ClientSession:
120 return self.xmpp.http
122 def __remove_task(self, fut: Task[Any]) -> None:
123 self.log.debug("Removing fut %s", fut)
124 self.__tasks.remove(fut)
126 def create_task(
127 self, coro: Coroutine[Any, Any, Any], name: str | None = None
128 ) -> asyncio.Task[Any]:
129 task = self.xmpp.loop.create_task(coro, name=name)
130 self.__tasks.add(task)
131 self.log.debug("Creating task %s", task)
132 task.add_done_callback(lambda _: self.__remove_task(task))
133 return task
135 def cancel_all_tasks(self) -> None:
136 for task in self.__tasks:
137 task.cancel()
139 @abc.abstractmethod
140 async def login(self) -> str | None:
141 """
142 Logs in the gateway user to the legacy network.
144 Triggered when the gateway start and on user registration.
145 It is recommended that this function returns once the user is logged in,
146 so if you need to await forever (for instance to listen to incoming events),
147 it's a good idea to wrap your listener in an asyncio.Task.
149 :return: Optionally, a text to use as the gateway status, e.g., "Connected as 'dude@legacy.network'"
150 """
151 raise NotImplementedError
153 async def logout(self) -> None:
154 """
155 Logs out the gateway user from the legacy network.
157 Called on gateway shutdown.
158 """
159 raise NotImplementedError
161 async def on_presence(
162 self,
163 resource: str,
164 show: PseudoPresenceShow,
165 status: str,
166 resources: dict[str, ResourceDict],
167 merged_resource: ResourceDict | None,
168 ) -> None:
169 """
170 Called when the gateway component receives a presence, ie, when
171 one of the user's clients goes online of offline, or changes its
172 status.
174 :param resource: The XMPP client identifier, arbitrary string.
175 :param show: The presence ``<show>``, if available. If the resource is
176 just 'available' without any ``<show>`` element, this is an empty
177 str.
178 :param status: A status message, like a deeply profound quote, eg,
179 "Roses are red, violets are blue, [INSERT JOKE]".
180 :param resources: A summary of all the resources for this user.
181 :param merged_resource: A global presence for the user account,
182 following rules described in :meth:`merge_resources`
183 """
184 raise NotImplementedError
186 async def on_search(self, form_values: dict[str, str]) -> SearchResult | None:
187 """
188 Triggered when the user uses Jabber Search (:xep:`0055`) on the component
190 Form values is a dict in which keys are defined in :attr:`.BaseGateway.SEARCH_FIELDS`
192 :param form_values: search query, defined for a specific plugin by overriding
193 in :attr:`.BaseGateway.SEARCH_FIELDS`
194 :return:
195 """
196 raise NotImplementedError
198 async def on_avatar(
199 self,
200 bytes_: bytes | None,
201 hash_: str | None,
202 type_: str | None,
203 width: int | None,
204 height: int | None,
205 ) -> None:
206 """
207 Triggered when the user uses modifies their avatar via :xep:`0084`.
209 :param bytes_: The data of the avatar. According to the spec, this
210 should always be a PNG, but some implementations do not respect
211 that. If `None` it means the user has unpublished their avatar.
212 :param hash_: The SHA1 hash of the avatar data. This is an identifier of
213 the avatar.
214 :param type_: The MIME type of the avatar.
215 :param width: The width of the avatar image.
216 :param height: The height of the avatar image.
217 """
218 raise NotImplementedError
220 async def on_create_group(
221 self, name: str, contacts: list[LegacyContactType]
222 ) -> str:
223 """
224 Triggered when the user request the creation of a group via the
225 dedicated :term:`Command`.
227 :param name: Name of the group
228 :param contacts: list of contacts that should be members of the group
229 """
230 raise NotImplementedError
232 async def on_leave_space(self, space_legacy_id: str) -> None:
233 """
234 Triggered when the user sends a request to leave a :xep:`0503` space.
236 :param space_legacy_id: The legacy ID of the space to leave
237 """
238 raise NotImplementedError
240 async def on_preferences(
241 self, previous: dict[str, Any], new: dict[str, Any]
242 ) -> None:
243 """
244 This is called when the user updates their preferences.
246 Override this if you need set custom preferences field and need to trigger
247 something when a preference has changed.
248 """
249 raise NotImplementedError
251 def __reset_ready(self) -> None:
252 self.ready = self.xmpp.loop.create_future()
254 @property
255 def logged(self) -> bool:
256 return self._logged
258 @logged.setter
259 def logged(self, v: bool) -> None:
260 self.is_logging_in = False
261 self._logged = v
262 if self.ready.done():
263 if v:
264 return
265 self.__reset_ready()
266 self.shutdown(logout=False)
267 with self.xmpp.store.session() as orm:
268 self.xmpp.store.mam.reset_source(orm)
269 self.xmpp.store.rooms.reset_updated(orm)
270 self.xmpp.store.contacts.reset_updated(orm)
271 orm.commit()
272 else:
273 if v:
274 self.ready.set_result(True)
276 def __repr__(self) -> str:
277 return f"<Session of {self.user_jid}>"
279 def shutdown(self, logout: bool = True) -> asyncio.Task[None]:
280 for m in self.bookmarks:
281 m.shutdown()
282 with self.xmpp.store.session() as orm:
283 for jid in orm.execute(
284 sa.select(Contact.jid).filter_by(user=self.user, is_friend=True)
285 ).scalars():
286 pres = self.xmpp.make_presence(
287 pfrom=jid,
288 pto=self.user_jid,
289 ptype="unavailable",
290 pstatus="Gateway has shut down.",
291 )
292 pres.send()
293 if logout:
294 return self.xmpp.loop.create_task(self.__logout())
295 else:
296 return self.xmpp.loop.create_task(noop_coro())
298 async def __logout(self) -> None:
299 try:
300 await self.logout()
301 except NotImplementedError:
302 pass
303 except KeyboardInterrupt:
304 pass
306 def raise_if_not_logged(self) -> None:
307 if not self.logged:
308 raise XMPPError(
309 "internal-server-error",
310 text="You are not logged to the legacy network",
311 )
313 @classmethod
314 def _from_user_or_none(cls, user: GatewayUser | None) -> Self:
315 if user is None:
316 log.debug("user not found")
317 raise XMPPError(text="User not found", condition="subscription-required")
319 session = _sessions.get(user.jid.bare)
320 if session is None:
321 _sessions[user.jid.bare] = session = cls(user)
322 assert isinstance(session, cls)
323 return session
325 @classmethod
326 def from_user(cls, user: GatewayUser) -> Self:
327 return cls._from_user_or_none(user)
329 @classmethod
330 def from_stanza(cls, s: Message | Iq | Presence) -> Self:
331 # """
332 # Get a user's :class:`.LegacySession` using the "from" field of a stanza
333 #
334 # Meant to be called from :class:`BaseGateway` only.
335 #
336 # :param s:
337 # :return:
338 # """
339 return cls.from_jid(s.get_from())
341 @classmethod
342 def from_jid(cls, jid: JID) -> Self:
343 # """
344 # Get a user's :class:`.LegacySession` using its jid
345 #
346 # Meant to be called from :class:`BaseGateway` only.
347 #
348 # :param jid:
349 # :return:
350 # """
351 session = _sessions.get(jid.bare)
352 if session is not None:
353 assert isinstance(session, cls)
354 return session
355 with cls.xmpp.store.session() as orm:
356 user = orm.query(GatewayUser).filter_by(jid=jid.bare).one_or_none()
357 return cls._from_user_or_none(user)
359 @classmethod
360 async def kill_by_jid(cls, jid: JID) -> None:
361 # """
362 # Terminate a user session.
363 #
364 # Meant to be called from :class:`BaseGateway` only.
365 #
366 # :param jid:
367 # :return:
368 # """
369 log.debug("Killing session of %s", jid)
370 for user_jid, session in _sessions.items():
371 if user_jid == jid.bare:
372 break
373 else:
374 log.debug("Did not find a session for %s", jid)
375 return
376 for c in session.contacts:
377 c.unsubscribe()
378 for m in session.bookmarks:
379 m.shutdown()
381 try:
382 session = _sessions.pop(jid.bare)
383 except KeyError:
384 log.warning("User not found during unregistration")
385 return
387 session.cancel_all_tasks()
389 await cls.xmpp.unregister(cast(Self, session))
390 with cls.xmpp.store.session() as orm:
391 orm.delete(session.user)
392 orm.commit()
394 def __ack(self, msg: Message) -> None:
395 if not self.xmpp.PROPER_RECEIPTS:
396 self.xmpp.delivery_receipt.ack(msg)
398 def send_gateway_status(
399 self,
400 status: str | None = None,
401 show: PresenceShows | None = None,
402 **kwargs: Any, # noqa
403 ) -> None:
404 """
405 Send a presence from the gateway to the user.
407 Can be used to indicate the user session status, ie "SMS code required", "connected", …
409 :param status: A status message
410 :param show: Presence stanza 'show' element. I suggest using "dnd" to show
411 that the gateway is not fully functional
412 """
413 self.__cached_presence = CachedPresence(status, show, kwargs)
414 self.xmpp.send_presence(
415 pto=self.user_jid.bare, pstatus=status, pshow=show, **kwargs
416 )
418 def send_cached_presence(self, to: JID) -> None:
419 if not self.__cached_presence:
420 self.xmpp.send_presence(pto=to, ptype="unavailable")
421 return
422 self.xmpp.send_presence(
423 pto=to,
424 pstatus=self.__cached_presence.status,
425 pshow=self.__cached_presence.show,
426 **self.__cached_presence.kwargs,
427 )
429 def send_gateway_message(
430 self,
431 text: str,
432 **msg_kwargs: Any, # noqa
433 ) -> None:
434 """
435 Send a message from the gateway component to the user.
437 Can be used to indicate the user session status, ie "SMS code required", "connected", …
439 :param text: A text
440 """
441 self.xmpp.send_text(text, mto=self.user_jid, **msg_kwargs)
443 def send_gateway_invite(
444 self,
445 muc: AnyMUC,
446 reason: str | None = None,
447 password: str | None = None,
448 ) -> None:
449 """
450 Send an invitation to join a MUC, emanating from the gateway component.
452 :param muc:
453 :param reason:
454 :param password:
455 """
456 self.xmpp.invite_to(muc, reason=reason, password=password, mto=self.user_jid)
458 async def input(self, text: str, **msg_kwargs: Any) -> str: # noqa
459 """
460 Request user input via direct messages from the gateway component.
462 Wraps call to :meth:`.BaseSession.input`
464 :param text: The prompt to send to the user
465 :param msg_kwargs: Extra attributes
466 :return:
467 """
468 return await self.xmpp.input(self.user_jid, text, **msg_kwargs)
470 async def send_qr(self, text: str) -> None:
471 """
472 Sends a QR code generated from 'text' via HTTP Upload and send the URL to
473 ``self.user``
475 :param text: Text to encode as a QR code
476 """
477 await self.xmpp.send_qr(text, mto=self.user_jid)
479 async def get_contact_or_group_or_participant(
480 self, jid: JID, create: bool = True
481 ) -> "LegacyContact | AnyMUC | AnyParticipant | None":
482 if (contact := self.contacts.by_jid_only_if_exists(jid)) is not None:
483 return contact # type:ignore[no-any-return]
484 if (muc := self.bookmarks.by_jid_only_if_exists(JID(jid.bare))) is not None:
485 return await self.__get_muc_or_participant(muc, jid)
486 else:
487 muc = None
489 if not create:
490 return None
492 try:
493 return await self.contacts.by_jid(jid) # type:ignore[no-any-return]
494 except XMPPError:
495 if muc is None:
496 try:
497 muc = await self.bookmarks.by_jid(jid)
498 except XMPPError:
499 return None
500 return await self.__get_muc_or_participant(muc, jid)
502 @staticmethod
503 async def __get_muc_or_participant(
504 muc: AnyMUC, jid: JID
505 ) -> "AnyMUC | AnyParticipant | None":
506 if nick := jid.resource:
507 return await muc.get_participant(nick, create=False, fill_first=True)
508 return muc
510 async def wait_for_ready(self, timeout: int | float | None = 10) -> None:
511 # """
512 # Wait until session, contacts and bookmarks are ready
513 #
514 # (slidge internal use)
515 #
516 # :param timeout:
517 # :return:
518 # """
519 try:
520 await asyncio.wait_for(asyncio.shield(self.ready), timeout)
521 await asyncio.wait_for(asyncio.shield(self.contacts.ready), timeout)
522 await asyncio.wait_for(asyncio.shield(self.bookmarks.ready), timeout)
523 except TimeoutError:
524 raise XMPPError(
525 "recipient-unavailable",
526 "Legacy session is not fully initialized, retry later",
527 )
529 def legacy_module_data_update(self, data: JSONSerializable) -> None:
530 user = self.user
531 user.legacy_module_data.update(data)
532 self.xmpp.store.users.update(user)
534 def legacy_module_data_set(self, data: JSONSerializable) -> None:
535 user = self.user
536 user.legacy_module_data = data
537 self.xmpp.store.users.update(user)
539 def legacy_module_data_clear(self) -> None:
540 user = self.user
541 user.legacy_module_data.clear()
542 self.xmpp.store.users.update(user)
545# keys = user.jid.bare
546_sessions: dict[str, AnySession] = {}
547log = logging.getLogger(__name__)