Coverage for slidge / group / participant.py: 87%
372 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import logging
2import string
3import uuid
4import warnings
5from copy import copy
6from datetime import datetime
7from typing import TYPE_CHECKING, Any, Generic, Literal
8from xml.etree import ElementTree as ET
10import sqlalchemy as sa
11from slixmpp import JID, InvalidJID, Message, Presence
12from slixmpp.plugins.xep_0030.stanza.info import DiscoInfo
13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
14from slixmpp.plugins.xep_0492.stanza import Never
15from slixmpp.types import MessageTypes, OptJid
16from sqlalchemy.orm.exc import DetachedInstanceError
18from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin
19from ..core.mixins.db import DBMixin
20from ..db.models import Participant
21from ..util import SubclassableOnce, strip_illegal_chars
22from ..util.types import (
23 AnyMUC,
24 CachedPresence,
25 Hat,
26 LegacyContactType,
27 MessageOrPresenceTypeVar,
28 MucAffiliation,
29 MucRole,
30)
32if TYPE_CHECKING:
33 from slidge.command.base import ContactCommand
36def strip_non_printable(nickname: str) -> str:
37 new = (
38 "".join(x for x in nickname if x in string.printable)
39 + f"-slidge-{hash(nickname)}"
40 )
41 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
42 return new
45class LegacyParticipant(
46 Generic[LegacyContactType],
47 PresenceMixin,
48 MessageMixin,
49 ChatterDiscoMixin,
50 DBMixin,
51 SubclassableOnce,
52):
53 """
54 A legacy participant of a legacy group chat.
55 """
57 is_participant: Literal[True] = True
59 mtype: MessageTypes = "groupchat"
60 _can_send_carbon = False
61 USE_STANZA_ID = True
62 STRIP_SHORT_DELAY = False
63 stored: Participant
64 contact: LegacyContactType | None
66 def __init__(
67 self,
68 muc: AnyMUC,
69 stored: Participant,
70 is_system: bool = False,
71 contact: LegacyContactType | None = None,
72 ) -> None:
73 self.muc = muc
74 self.session = muc.session
75 self.xmpp = muc.session.xmpp
76 self.is_system = is_system
78 if contact is None and stored.contact is not None:
79 contact = self.session.contacts.from_store(stored=stored.contact)
80 if contact is not None and stored.contact is None:
81 stored.contact = contact.stored
83 self.stored = stored
84 self.contact = contact
86 super().__init__()
88 if stored.resource is None:
89 self.__update_resource(stored.nickname)
91 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
93 def __eq__(self, other: object) -> bool:
94 return isinstance(other, LegacyParticipant) and self.jid == other.jid
96 @property
97 def is_user(self) -> bool:
98 try:
99 return self.stored.is_user
100 except DetachedInstanceError:
101 self.merge()
102 return self.stored.is_user
104 @is_user.setter
105 def is_user(self, is_user: bool) -> None:
106 with self.xmpp.store.session(expire_on_commit=True) as orm:
107 orm.add(self.stored)
108 self.stored.is_user = is_user
109 orm.commit()
111 @property
112 def jid(self) -> JID:
113 jid = JID(self.muc.jid)
114 if self.stored.resource:
115 jid.resource = self.stored.resource
116 return jid
118 @jid.setter
119 def jid(self, x: JID) -> None:
120 # FIXME: without this, mypy yields
121 # "Cannot override writeable attribute with read-only property"
122 # But it does not happen for LegacyContact. WTF?
123 raise RuntimeError
125 @property
126 def commands(self) -> dict[str, "type[ContactCommand[Any]]"]: # type:ignore[override]
127 if self.contact is None:
128 return {}
129 else:
130 return self.contact.commands
132 def __should_commit(self) -> bool:
133 if self.is_system:
134 return False
135 if self.muc.get_lock("fill participants"):
136 return False
137 return not self.muc.get_lock("fill history")
139 def commit(self) -> None:
140 if not self.__should_commit():
141 return
142 super().commit()
144 @property
145 def user_jid(self) -> JID:
146 return self.session.user_jid
148 def __repr__(self) -> str:
149 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
151 @property
152 def _presence_sent(self) -> bool:
153 # we track if we already sent a presence for this participant.
154 # if we didn't, we send it before the first message.
155 # this way, event in plugins that don't map "user has joined" events,
156 # we send a "join"-presence from the participant before the first message
157 return self.stored.presence_sent
159 @_presence_sent.setter
160 def _presence_sent(self, val: bool) -> None:
161 if self._presence_sent == val:
162 return
163 self.stored.presence_sent = val
164 if not self.__should_commit():
165 return
166 with self.xmpp.store.session() as orm:
167 orm.execute(
168 sa.update(Participant)
169 .where(Participant.id == self.stored.id)
170 .values(presence_sent=val)
171 )
172 orm.commit()
174 @property
175 def nickname_no_illegal(self) -> str:
176 return self.stored.nickname_no_illegal
178 @property
179 def affiliation(self) -> MucAffiliation:
180 return self.stored.affiliation
182 @affiliation.setter
183 def affiliation(self, affiliation: MucAffiliation) -> None:
184 if self.affiliation == affiliation:
185 return
186 was = self.stored.affiliation
187 self.stored.affiliation = affiliation
188 if not self.muc.participants_filled:
189 return
190 self.commit()
191 if self.cached_presence is None or self.cached_presence.ptype == "unavailable":
192 self.muc.send_affiliation_change(self, was)
193 self.send_last_presence(force=True, no_cache_online=True)
195 @property
196 def role(self) -> MucRole:
197 return self.stored.role
199 @role.setter
200 def role(self, role: MucRole) -> None:
201 if self.role == role:
202 return
203 self.stored.role = role
204 if not self.muc.participants_filled:
205 return
206 self.commit()
207 if not self._presence_sent:
208 return
209 self.send_last_presence(force=True, no_cache_online=True)
211 @property
212 def hats(self) -> list[Hat]:
213 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else []
215 def set_hats(self, hats: list[Hat]) -> None:
216 if self.hats == hats:
217 return
218 self.stored.hats = hats
219 if not self.muc.participants_filled:
220 return
221 self.commit()
222 if not self._presence_sent:
223 return
224 self.send_last_presence(force=True, no_cache_online=True)
226 def __update_resource(self, unescaped_nickname: str | None) -> None:
227 if not unescaped_nickname:
228 self.stored.resource = ""
229 if self.is_system:
230 self.stored.nickname_no_illegal = ""
231 else:
232 warnings.warn(
233 "Only the system participant is allowed to not have a nickname"
234 )
235 nickname = f"unnamed-{uuid.uuid4()}"
236 self.stored.resource = self.stored.nickname_no_illegal = nickname
237 return
239 self.stored.nickname_no_illegal, jid = escape_nickname(
240 self.muc.jid,
241 unescaped_nickname,
242 )
243 self.stored.resource = jid.resource
245 def send_configuration_change(self, codes: tuple[int, ...]) -> None:
246 if not self.is_system:
247 raise RuntimeError("This is only possible for the system participant")
248 msg = self._make_message()
249 msg["muc"]["status_codes"] = codes
250 self._send(msg)
252 @property
253 def nickname(self) -> str:
254 return self.stored.nickname
256 @nickname.setter
257 def nickname(self, new_nickname: str) -> None:
258 old = self.nickname
259 if new_nickname == old:
260 return
262 if self.muc.stored.id is not None:
263 with self.xmpp.store.session() as orm:
264 if not self.xmpp.store.rooms.nick_available(
265 orm, self.muc.stored.id, new_nickname
266 ):
267 if self.contact is None:
268 new_nickname = f"{new_nickname} ({self.occupant_id})"
269 else:
270 new_nickname = f"{new_nickname} ({self.contact.legacy_id})"
272 cache = getattr(self, "_last_presence", None)
273 if cache:
274 last_seen = cache.last_seen
275 kwargs = cache.presence_kwargs
276 else:
277 last_seen = None
278 kwargs = {}
280 kwargs["status_codes"] = {303}
282 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
283 # in this order so pfrom=old resource and we actually use the escaped nick
284 # in the muc/item/nick element
285 self.__update_resource(new_nickname)
286 p["muc"]["item"]["nick"] = self.jid.resource
287 self._send(p)
289 self.stored.nickname = new_nickname
290 self.commit()
291 kwargs["status_codes"] = set()
292 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
293 self._send(p)
295 def _make_presence( # type:ignore[no-untyped-def]
296 self,
297 *,
298 last_seen: datetime | None = None,
299 status_codes: set[int] | None = None,
300 user_full_jid: JID | None = None,
301 **presence_kwargs, # noqa type:ignore[no-untyped-def]
302 ) -> Presence:
303 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
304 p["muc"]["affiliation"] = self.affiliation
305 p["muc"]["role"] = self.role
306 if self.hats:
307 p["hats"].add_hats(self.hats)
308 codes = status_codes or set()
309 if self.is_user:
310 codes.add(110)
311 if not self.muc.is_anonymous and not self.is_system:
312 if self.is_user:
313 if user_full_jid:
314 p["muc"]["jid"] = user_full_jid
315 else:
316 jid = JID(self.user_jid)
317 try:
318 jid.resource = next(iter(self.muc.get_user_resources()))
319 except StopIteration:
320 jid.resource = "pseudo-resource"
321 p["muc"]["jid"] = self.user_jid
322 codes.add(100)
323 elif self.contact:
324 p["muc"]["jid"] = self.contact.jid
325 if a := self.contact.get_avatar():
326 p["vcard_temp_update"]["photo"] = a.id
327 else:
328 warnings.warn(
329 f"Private group but no 1:1 JID associated to '{self}'",
330 )
331 if self.is_user and (hash_ := self.session.user.avatar_hash):
332 p["vcard_temp_update"]["photo"] = hash_
333 p["muc"]["status_codes"] = codes
334 return p
336 @property
337 def DISCO_NAME(self) -> str:
338 return self.nickname
340 @DISCO_NAME.setter
341 def DISCO_NAME(self, _: str) -> Never:
342 raise RuntimeError
344 def __send_presence_if_needed(
345 self, stanza: Message | Presence, full_jid: JID, archive_only: bool
346 ) -> None:
347 if (
348 archive_only
349 or self.is_system
350 or self.is_user
351 or self._presence_sent
352 or stanza["subject"]
353 ):
354 return
355 if isinstance(stanza, Message):
356 if "muc" in stanza:
357 return
358 self.send_initial_presence(full_jid)
360 @property
361 def occupant_id(self) -> str:
362 return self.stored.occupant_id
364 def _send(
365 self,
366 stanza: MessageOrPresenceTypeVar,
367 full_jid: JID | None = None,
368 archive_only: bool = False,
369 legacy_msg_id: str | None = None,
370 initial_presence: bool = False,
371 **send_kwargs: Any, # noqa:ANN401
372 ) -> MessageOrPresenceTypeVar:
373 if stanza.get_from().resource:
374 stanza["occupant-id"]["id"] = self.occupant_id
375 else:
376 stanza["occupant-id"]["id"] = "room"
377 self.__add_nick_element(stanza)
378 if not self.is_user and isinstance(stanza, Presence):
379 if stanza["type"] == "unavailable" and not self._presence_sent:
380 return stanza
381 if initial_presence:
382 self._presence_sent = True
383 else:
384 self._presence_sent = True
385 if full_jid:
386 stanza["to"] = full_jid
387 self.__send_presence_if_needed(stanza, full_jid, archive_only)
388 if self.is_user:
389 assert stanza.stream is not None
390 stanza.stream.send(stanza, use_filters=False)
391 else:
392 stanza.send()
393 else:
394 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
395 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
396 if archive_only:
397 return stanza
398 for user_full_jid in self.muc.user_full_jids():
399 stanza = copy(stanza)
400 stanza["to"] = user_full_jid
401 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
402 stanza.send()
403 return stanza
405 def mucadmin_item(self) -> MUCAdminItem:
406 item = MUCAdminItem()
407 item["nick"] = self.nickname
408 item["affiliation"] = self.affiliation
409 item["role"] = self.role
410 if not self.muc.is_anonymous:
411 if self.is_user:
412 item["jid"] = self.user_jid.bare
413 elif self.contact:
414 item["jid"] = self.contact.jid.bare
415 else:
416 warnings.warn(
417 (
418 f"Private group but no contact JID associated to {self.jid} in"
419 f" {self}"
420 ),
421 )
422 return item
424 def __add_nick_element(self, stanza: Presence | Message) -> None:
425 if (nick := self.nickname_no_illegal) != self.jid.resource:
426 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
427 n["nick"] = nick
428 stanza.append(n)
430 def _get_last_presence(self) -> CachedPresence | None:
431 own = super()._get_last_presence()
432 if own is None and self.contact:
433 return self.contact._get_last_presence()
434 return own
436 def send_initial_presence(
437 self,
438 full_jid: JID,
439 nick_change: bool = False,
440 presence_id: str | None = None,
441 mav_until: str | None = None,
442 ) -> None:
443 """
444 Called when the user joins a MUC, as a mechanism
445 to indicate to the joining XMPP client the list of "participants".
447 Can be called this to trigger a "participant has joined the group" event.
449 :param full_jid: Set this to only send to a specific user XMPP resource.
450 :param nick_change: Used when the user joins and the MUC renames them (code 210)
451 :param presence_id: set the presence ID. used internally by slidge
452 """
453 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
454 codes = set()
455 if nick_change:
456 codes.add(210)
458 if self.is_user:
459 # the "initial presence" of the user has to be vanilla, as it is
460 # a crucial part of the MUC join sequence for XMPP clients.
461 kwargs = {}
462 else:
463 cache = self._get_last_presence()
464 self.log.debug("Join muc, initial presence: %s", cache)
465 if cache:
466 ptype = cache.ptype
467 if ptype == "unavailable":
468 return
469 kwargs = dict(
470 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
471 )
472 else:
473 kwargs = {}
474 p = self._make_presence(
475 status_codes=codes,
476 user_full_jid=full_jid,
477 **kwargs, # type:ignore
478 )
479 if presence_id:
480 p["id"] = presence_id
481 if self.is_user and mav_until is not None:
482 p["muc"]["mav"]["until"] = mav_until
483 self._send(p, full_jid, initial_presence=True)
485 def leave(self) -> None:
486 """
487 Call this when the participant leaves the room
488 """
489 self.muc.remove_participant(self)
491 def kick(self, reason: str | None = None) -> None:
492 """
493 Call this when the participant is kicked from the room
494 """
495 self.muc.remove_participant(self, kick=True, reason=reason)
497 def ban(self, reason: str | None = None) -> None:
498 """
499 Call this when the participant is banned from the room
500 """
501 self.muc.remove_participant(self, ban=True, reason=reason)
503 async def get_disco_info(
504 self, jid: OptJid = None, node: str | None = None
505 ) -> DiscoInfo:
506 if self.contact is not None:
507 return await self.contact.get_disco_info()
508 return await super().get_disco_info()
510 def moderate(self, legacy_msg_id: str, reason: str | None = None) -> None:
511 for i in self._legacy_to_xmpp(legacy_msg_id):
512 m = self.muc.get_system_participant()._make_message()
513 m["retract"]["id"] = i
514 if self.is_system:
515 m["retract"].enable("moderated")
516 else:
517 m["retract"]["moderated"]["by"] = self.jid
518 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id
519 if reason:
520 m["retract"]["reason"] = reason
521 self._send(m)
523 def set_room_subject(
524 self,
525 subject: str,
526 full_jid: JID | None = None,
527 when: datetime | None = None,
528 update_muc: bool = True,
529 ) -> None:
530 if update_muc:
531 self.muc._subject = subject # type: ignore
532 self.muc.subject_setter = self.nickname
533 self.muc.subject_date = when
535 msg = self._make_message()
536 if when is not None:
537 msg["delay"].set_stamp(when)
538 msg["delay"]["from"] = self.muc.jid
539 if subject:
540 msg["subject"] = subject
541 else:
542 # may be simplified if slixmpp lets it do it more easily some day
543 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
544 self._send(msg, full_jid)
546 def set_thread_subject(
547 self,
548 thread: str,
549 subject: str | None,
550 when: datetime | None = None,
551 ) -> None:
552 msg = self._make_message()
553 msg["thread"] = str(thread)
554 if when is not None:
555 msg["delay"].set_stamp(when)
556 msg["delay"]["from"] = self.muc.jid
557 if subject:
558 msg["subject"] = subject
559 else:
560 # may be simplified if slixmpp lets it do it more easily some day
561 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
562 self._send(msg)
564 async def on_set_affiliation(
565 self,
566 affiliation: MucAffiliation,
567 reason: str | None,
568 nickname: str | None,
569 ) -> None:
570 """
571 Triggered when the user requests changing the affiliation of a contact
572 for this group.
574 Examples: promotion them to moderator, ban (affiliation=outcast).
576 :param contact: The contact whose affiliation change is requested
577 :param affiliation: The new affiliation
578 :param reason: A reason for this affiliation change
579 :param nickname:
580 """
581 raise NotImplementedError
583 async def on_kick(self, reason: str | None) -> None:
584 """
585 Triggered when the user requests changing the role of a contact
586 to "none" for this group. Action commonly known as "kick".
588 :param contact: Contact to be kicked
589 :param reason: A reason for this kick
590 """
591 raise NotImplementedError
593 async def on_invitation(self, reason: str | None) -> None:
594 """
595 Triggered when the user invites this :term:`Contact <Legacy Contact>`
596 to a legacy MUC via :xep:`0249`.
598 The default implementation calls :meth:`LegacyMUC.on_set_affiliation`
599 with the 'member' affiliation. Override if you want to customize this
600 behaviour.
602 :param muc: The group
603 :param reason: Optionally, a reason
604 """
605 # part = await self.muc.get_participant_by_contact(self)
606 await self.on_set_affiliation("member", reason, None)
609def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]:
610 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ")
612 jid = JID(muc_jid)
614 try:
615 jid.resource = nickname
616 except InvalidJID:
617 nickname = nickname.encode("punycode").decode()
618 try:
619 jid.resource = nickname
620 except InvalidJID:
621 # at this point there still might be control chars
622 jid.resource = strip_non_printable(nickname)
624 return nickname_no_illegal, jid
627log = logging.getLogger(__name__)