Coverage for slidge / group / participant.py: 87%
367 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import logging
2import string
3import uuid
4import warnings
5from copy import copy
6from datetime import datetime
7from typing import TYPE_CHECKING, Any, Literal
8from xml.etree import ElementTree as ET
10import sqlalchemy as sa
11from slixmpp import JID, InvalidJID, Message, Presence
12from slixmpp.plugins.xep_0030.stanza.info import DiscoInfo
13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
14from slixmpp.plugins.xep_0492.stanza import Never
15from slixmpp.types import MessageTypes, OptJid
16from sqlalchemy.orm.exc import DetachedInstanceError
18from ..contact import LegacyContact
19from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin
20from ..core.mixins.db import DBMixin
21from ..db.models import Participant
22from ..util import SubclassableOnce, strip_illegal_chars
23from ..util.types import (
24 AnyMUC,
25 CachedPresence,
26 Hat,
27 LegacyMessageType,
28 LegacyThreadType,
29 MessageOrPresenceTypeVar,
30 MucAffiliation,
31 MucRole,
32)
34if TYPE_CHECKING:
35 from slidge.command.base import ContactCommand
38def strip_non_printable(nickname: str) -> str:
39 new = (
40 "".join(x for x in nickname if x in string.printable)
41 + f"-slidge-{hash(nickname)}"
42 )
43 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
44 return new
47class LegacyParticipant(
48 PresenceMixin,
49 MessageMixin,
50 ChatterDiscoMixin,
51 DBMixin,
52 SubclassableOnce,
53):
54 """
55 A legacy participant of a legacy group chat.
56 """
58 is_participant: Literal[True] = True
60 mtype: MessageTypes = "groupchat"
61 _can_send_carbon = False
62 USE_STANZA_ID = True
63 STRIP_SHORT_DELAY = False
64 stored: Participant
65 contact: LegacyContact[Any] | None
67 def __init__(
68 self,
69 muc: AnyMUC,
70 stored: Participant,
71 is_system: bool = False,
72 contact: LegacyContact[Any] | None = None,
73 ) -> None:
74 self.muc = muc
75 self.session = muc.session
76 self.xmpp = muc.session.xmpp
77 self.is_system = is_system
79 if contact is None and stored.contact is not None:
80 contact = self.session.contacts.from_store(stored=stored.contact)
81 if contact is not None and stored.contact is None:
82 stored.contact = contact.stored
84 self.stored = stored
85 self.contact = contact
87 super().__init__()
89 if stored.resource is None:
90 self.__update_resource(stored.nickname)
92 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
94 @property
95 def is_user(self) -> bool:
96 try:
97 return self.stored.is_user
98 except DetachedInstanceError:
99 self.merge()
100 return self.stored.is_user
102 @is_user.setter
103 def is_user(self, is_user: bool) -> None:
104 with self.xmpp.store.session(expire_on_commit=True) as orm:
105 orm.add(self.stored)
106 self.stored.is_user = is_user
107 orm.commit()
109 @property
110 def jid(self) -> JID:
111 jid = JID(self.muc.jid)
112 if self.stored.resource:
113 jid.resource = self.stored.resource
114 return jid
116 @jid.setter
117 def jid(self, x: JID) -> None:
118 # FIXME: without this, mypy yields
119 # "Cannot override writeable attribute with read-only property"
120 # But it does not happen for LegacyContact. WTF?
121 raise RuntimeError
123 @property
124 def commands(self) -> dict[str, "type[ContactCommand[Any]]"]: # type:ignore[override]
125 if self.contact is None:
126 return {}
127 else:
128 return self.contact.commands
130 def __should_commit(self) -> bool:
131 if self.is_system:
132 return False
133 if self.muc.get_lock("fill participants"):
134 return False
135 return not self.muc.get_lock("fill history")
137 def commit(self) -> None:
138 if not self.__should_commit():
139 return
140 super().commit()
142 @property
143 def user_jid(self) -> JID:
144 return self.session.user_jid
146 def __repr__(self) -> str:
147 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
149 @property
150 def _presence_sent(self) -> bool:
151 # we track if we already sent a presence for this participant.
152 # if we didn't, we send it before the first message.
153 # this way, event in plugins that don't map "user has joined" events,
154 # we send a "join"-presence from the participant before the first message
155 return self.stored.presence_sent
157 @_presence_sent.setter
158 def _presence_sent(self, val: bool) -> None:
159 if self._presence_sent == val:
160 return
161 self.stored.presence_sent = val
162 if not self.__should_commit():
163 return
164 with self.xmpp.store.session() as orm:
165 orm.execute(
166 sa.update(Participant)
167 .where(Participant.id == self.stored.id)
168 .values(presence_sent=val)
169 )
170 orm.commit()
172 @property
173 def nickname_no_illegal(self) -> str:
174 return self.stored.nickname_no_illegal
176 @property
177 def affiliation(self) -> MucAffiliation:
178 return self.stored.affiliation
180 @affiliation.setter
181 def affiliation(self, affiliation: MucAffiliation) -> None:
182 if self.affiliation == affiliation:
183 return
184 was = self.stored.affiliation
185 self.stored.affiliation = affiliation
186 if not self.muc.participants_filled:
187 return
188 self.commit()
189 if self.cached_presence is None or self.cached_presence.ptype == "unavailable":
190 self.muc.send_affiliation_change(self, was)
191 self.send_last_presence(force=True, no_cache_online=True)
193 @property
194 def role(self) -> MucRole:
195 return self.stored.role
197 @role.setter
198 def role(self, role: MucRole) -> None:
199 if self.role == role:
200 return
201 self.stored.role = role
202 if not self.muc.participants_filled:
203 return
204 self.commit()
205 if not self._presence_sent:
206 return
207 self.send_last_presence(force=True, no_cache_online=True)
209 @property
210 def hats(self) -> list[Hat]:
211 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else []
213 def set_hats(self, hats: list[Hat]) -> None:
214 if self.hats == hats:
215 return
216 self.stored.hats = hats
217 if not self.muc.participants_filled:
218 return
219 self.commit()
220 if not self._presence_sent:
221 return
222 self.send_last_presence(force=True, no_cache_online=True)
224 def __update_resource(self, unescaped_nickname: str | None) -> None:
225 if not unescaped_nickname:
226 self.stored.resource = ""
227 if self.is_system:
228 self.stored.nickname_no_illegal = ""
229 else:
230 warnings.warn(
231 "Only the system participant is allowed to not have a nickname"
232 )
233 nickname = f"unnamed-{uuid.uuid4()}"
234 self.stored.resource = self.stored.nickname_no_illegal = nickname
235 return
237 self.stored.nickname_no_illegal, jid = escape_nickname(
238 self.muc.jid,
239 unescaped_nickname,
240 )
241 self.stored.resource = jid.resource
243 def send_configuration_change(self, codes: tuple[int, ...]) -> None:
244 if not self.is_system:
245 raise RuntimeError("This is only possible for the system participant")
246 msg = self._make_message()
247 msg["muc"]["status_codes"] = codes
248 self._send(msg)
250 @property
251 def nickname(self) -> str:
252 return self.stored.nickname
254 @nickname.setter
255 def nickname(self, new_nickname: str) -> None:
256 old = self.nickname
257 if new_nickname == old:
258 return
260 if self.muc.stored.id is not None:
261 with self.xmpp.store.session() as orm:
262 if not self.xmpp.store.rooms.nick_available(
263 orm, self.muc.stored.id, new_nickname
264 ):
265 if self.contact is None:
266 new_nickname = f"{new_nickname} ({self.occupant_id})"
267 else:
268 new_nickname = f"{new_nickname} ({self.contact.legacy_id})"
270 cache = getattr(self, "_last_presence", None)
271 if cache:
272 last_seen = cache.last_seen
273 kwargs = cache.presence_kwargs
274 else:
275 last_seen = None
276 kwargs = {}
278 kwargs["status_codes"] = {303}
280 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
281 # in this order so pfrom=old resource and we actually use the escaped nick
282 # in the muc/item/nick element
283 self.__update_resource(new_nickname)
284 p["muc"]["item"]["nick"] = self.jid.resource
285 self._send(p)
287 self.stored.nickname = new_nickname
288 self.commit()
289 kwargs["status_codes"] = set()
290 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
291 self._send(p)
293 def _make_presence( # type:ignore[no-untyped-def]
294 self,
295 *,
296 last_seen: datetime | None = None,
297 status_codes: set[int] | None = None,
298 user_full_jid: JID | None = None,
299 **presence_kwargs, # noqa type:ignore[no-untyped-def]
300 ) -> Presence:
301 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
302 p["muc"]["affiliation"] = self.affiliation
303 p["muc"]["role"] = self.role
304 if self.hats:
305 p["hats"].add_hats(self.hats)
306 codes = status_codes or set()
307 if self.is_user:
308 codes.add(110)
309 if not self.muc.is_anonymous and not self.is_system:
310 if self.is_user:
311 if user_full_jid:
312 p["muc"]["jid"] = user_full_jid
313 else:
314 jid = JID(self.user_jid)
315 try:
316 jid.resource = next(iter(self.muc.get_user_resources()))
317 except StopIteration:
318 jid.resource = "pseudo-resource"
319 p["muc"]["jid"] = self.user_jid
320 codes.add(100)
321 elif self.contact:
322 p["muc"]["jid"] = self.contact.jid
323 if a := self.contact.get_avatar():
324 p["vcard_temp_update"]["photo"] = a.id
325 else:
326 warnings.warn(
327 f"Private group but no 1:1 JID associated to '{self}'",
328 )
329 if self.is_user and (hash_ := self.session.user.avatar_hash):
330 p["vcard_temp_update"]["photo"] = hash_
331 p["muc"]["status_codes"] = codes
332 return p
334 @property
335 def DISCO_NAME(self) -> str:
336 return self.nickname
338 @DISCO_NAME.setter
339 def DISCO_NAME(self, _: str) -> Never:
340 raise RuntimeError
342 def __send_presence_if_needed(
343 self, stanza: Message | Presence, full_jid: JID, archive_only: bool
344 ) -> None:
345 if (
346 archive_only
347 or self.is_system
348 or self.is_user
349 or self._presence_sent
350 or stanza["subject"]
351 ):
352 return
353 if isinstance(stanza, Message):
354 if "muc" in stanza:
355 return
356 self.send_initial_presence(full_jid)
358 @property
359 def occupant_id(self) -> str:
360 return self.stored.occupant_id
362 def _send(
363 self,
364 stanza: MessageOrPresenceTypeVar,
365 full_jid: JID | None = None,
366 archive_only: bool = False,
367 legacy_msg_id: LegacyMessageType | None = None,
368 initial_presence: bool = False,
369 **send_kwargs: Any, # noqa:ANN401
370 ) -> MessageOrPresenceTypeVar:
371 if stanza.get_from().resource:
372 stanza["occupant-id"]["id"] = self.occupant_id
373 else:
374 stanza["occupant-id"]["id"] = "room"
375 self.__add_nick_element(stanza)
376 if not self.is_user and isinstance(stanza, Presence):
377 if stanza["type"] == "unavailable" and not self._presence_sent:
378 return stanza
379 if initial_presence:
380 self._presence_sent = True
381 else:
382 self._presence_sent = True
383 if full_jid:
384 stanza["to"] = full_jid
385 self.__send_presence_if_needed(stanza, full_jid, archive_only)
386 if self.is_user:
387 assert stanza.stream is not None
388 stanza.stream.send(stanza, use_filters=False)
389 else:
390 stanza.send()
391 else:
392 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
393 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
394 if archive_only:
395 return stanza
396 for user_full_jid in self.muc.user_full_jids():
397 stanza = copy(stanza)
398 stanza["to"] = user_full_jid
399 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
400 stanza.send()
401 return stanza
403 def mucadmin_item(self) -> MUCAdminItem:
404 item = MUCAdminItem()
405 item["nick"] = self.nickname
406 item["affiliation"] = self.affiliation
407 item["role"] = self.role
408 if not self.muc.is_anonymous:
409 if self.is_user:
410 item["jid"] = self.user_jid.bare
411 elif self.contact:
412 item["jid"] = self.contact.jid.bare
413 else:
414 warnings.warn(
415 (
416 f"Private group but no contact JID associated to {self.jid} in"
417 f" {self}"
418 ),
419 )
420 return item
422 def __add_nick_element(self, stanza: Presence | Message) -> None:
423 if (nick := self.nickname_no_illegal) != self.jid.resource:
424 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
425 n["nick"] = nick
426 stanza.append(n)
428 def _get_last_presence(self) -> CachedPresence | None:
429 own = super()._get_last_presence()
430 if own is None and self.contact:
431 return self.contact._get_last_presence()
432 return own
434 def send_initial_presence(
435 self,
436 full_jid: JID,
437 nick_change: bool = False,
438 presence_id: str | None = None,
439 mav_until: str | None = None,
440 ) -> None:
441 """
442 Called when the user joins a MUC, as a mechanism
443 to indicate to the joining XMPP client the list of "participants".
445 Can be called this to trigger a "participant has joined the group" event.
447 :param full_jid: Set this to only send to a specific user XMPP resource.
448 :param nick_change: Used when the user joins and the MUC renames them (code 210)
449 :param presence_id: set the presence ID. used internally by slidge
450 """
451 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
452 codes = set()
453 if nick_change:
454 codes.add(210)
456 if self.is_user:
457 # the "initial presence" of the user has to be vanilla, as it is
458 # a crucial part of the MUC join sequence for XMPP clients.
459 kwargs = {}
460 else:
461 cache = self._get_last_presence()
462 self.log.debug("Join muc, initial presence: %s", cache)
463 if cache:
464 ptype = cache.ptype
465 if ptype == "unavailable":
466 return
467 kwargs = dict(
468 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
469 )
470 else:
471 kwargs = {}
472 p = self._make_presence(
473 status_codes=codes,
474 user_full_jid=full_jid,
475 **kwargs, # type:ignore
476 )
477 if presence_id:
478 p["id"] = presence_id
479 if self.is_user and mav_until is not None:
480 p["muc"]["mav"]["until"] = mav_until
481 self._send(p, full_jid, initial_presence=True)
483 def leave(self) -> None:
484 """
485 Call this when the participant leaves the room
486 """
487 self.muc.remove_participant(self)
489 def kick(self, reason: str | None = None) -> None:
490 """
491 Call this when the participant is kicked from the room
492 """
493 self.muc.remove_participant(self, kick=True, reason=reason)
495 def ban(self, reason: str | None = None) -> None:
496 """
497 Call this when the participant is banned from the room
498 """
499 self.muc.remove_participant(self, ban=True, reason=reason)
501 async def get_disco_info(
502 self, jid: OptJid = None, node: str | None = None
503 ) -> DiscoInfo:
504 if self.contact is not None:
505 return await self.contact.get_disco_info()
506 return await super().get_disco_info()
508 def moderate(
509 self, legacy_msg_id: LegacyMessageType, reason: str | None = None
510 ) -> None:
511 for i in self._legacy_to_xmpp(legacy_msg_id):
512 m = self.muc.get_system_participant()._make_message()
513 m["retract"]["id"] = i
514 if self.is_system:
515 m["retract"].enable("moderated")
516 else:
517 m["retract"]["moderated"]["by"] = self.jid
518 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id
519 if reason:
520 m["retract"]["reason"] = reason
521 self._send(m)
523 def set_room_subject(
524 self,
525 subject: str,
526 full_jid: JID | None = None,
527 when: datetime | None = None,
528 update_muc: bool = True,
529 ) -> None:
530 if update_muc:
531 self.muc._subject = subject # type: ignore
532 self.muc.subject_setter = self.nickname
533 self.muc.subject_date = when
535 msg = self._make_message()
536 if when is not None:
537 msg["delay"].set_stamp(when)
538 msg["delay"]["from"] = self.muc.jid
539 if subject:
540 msg["subject"] = subject
541 else:
542 # may be simplified if slixmpp lets it do it more easily some day
543 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
544 self._send(msg, full_jid)
546 def set_thread_subject(
547 self,
548 thread: LegacyThreadType,
549 subject: str | None,
550 when: datetime | None = None,
551 ) -> None:
552 msg = self._make_message()
553 msg["thread"] = str(thread)
554 if when is not None:
555 msg["delay"].set_stamp(when)
556 msg["delay"]["from"] = self.muc.jid
557 if subject:
558 msg["subject"] = subject
559 else:
560 # may be simplified if slixmpp lets it do it more easily some day
561 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
562 self._send(msg)
565def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]:
566 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ")
568 jid = JID(muc_jid)
570 try:
571 jid.resource = nickname
572 except InvalidJID:
573 nickname = nickname.encode("punycode").decode()
574 try:
575 jid.resource = nickname
576 except InvalidJID:
577 # at this point there still might be control chars
578 jid.resource = strip_non_printable(nickname)
580 return nickname_no_illegal, jid
583log = logging.getLogger(__name__)