Coverage for slidge / group / participant.py: 89%
370 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import logging
2import string
3import uuid
4import warnings
5from copy import copy
6from datetime import datetime
7from typing import Any, Literal
8from xml.etree import ElementTree as ET
10import sqlalchemy as sa
11from slixmpp import JID, InvalidJID, Message, Presence
12from slixmpp.plugins.xep_0030.stanza.info import DiscoInfo
13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
14from slixmpp.plugins.xep_0492.stanza import Never
15from slixmpp.types import MessageTypes, OptJid
16from sqlalchemy.orm.exc import DetachedInstanceError
18from ..contact import LegacyContact
19from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin
20from ..core.mixins.db import DBMixin
21from ..db.models import Participant
22from ..util import SubclassableOnce, strip_illegal_chars
23from ..util.types import (
24 AnyMUC,
25 CachedPresence,
26 Hat,
27 LegacyMessageType,
28 LegacyThreadType,
29 MessageOrPresenceTypeVar,
30 MucAffiliation,
31 MucRole,
32)
35def strip_non_printable(nickname: str) -> str:
36 new = (
37 "".join(x for x in nickname if x in string.printable)
38 + f"-slidge-{hash(nickname)}"
39 )
40 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
41 return new
44class LegacyParticipant(
45 PresenceMixin,
46 MessageMixin,
47 ChatterDiscoMixin,
48 DBMixin,
49 SubclassableOnce,
50):
51 """
52 A legacy participant of a legacy group chat.
53 """
55 is_participant: Literal[True] = True
57 mtype: MessageTypes = "groupchat"
58 _can_send_carbon = False
59 USE_STANZA_ID = True
60 STRIP_SHORT_DELAY = False
61 stored: Participant
62 contact: LegacyContact[Any] | None
64 def __init__(
65 self,
66 muc: AnyMUC,
67 stored: Participant,
68 is_system: bool = False,
69 contact: LegacyContact[Any] | None = None,
70 ) -> None:
71 self.muc = muc
72 self.session = muc.session
73 self.xmpp = muc.session.xmpp
74 self.is_system = is_system
76 if contact is None and stored.contact is not None:
77 contact = self.session.contacts.from_store(stored=stored.contact)
78 if contact is not None and stored.contact is None:
79 stored.contact = contact.stored
81 self.stored = stored
82 self.contact = contact
84 super().__init__()
86 if stored.resource is None:
87 self.__update_resource(stored.nickname)
89 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
91 @property
92 def is_user(self) -> bool:
93 try:
94 return self.stored.is_user
95 except DetachedInstanceError:
96 self.merge()
97 return self.stored.is_user
99 @is_user.setter
100 def is_user(self, is_user: bool) -> None:
101 with self.xmpp.store.session(expire_on_commit=True) as orm:
102 orm.add(self.stored)
103 self.stored.is_user = is_user
104 orm.commit()
106 @property
107 def jid(self) -> JID:
108 jid = JID(self.muc.jid)
109 if self.stored.resource:
110 jid.resource = self.stored.resource
111 return jid
113 @jid.setter
114 def jid(self, x: JID) -> None:
115 # FIXME: without this, mypy yields
116 # "Cannot override writeable attribute with read-only property"
117 # But it does not happen for LegacyContact. WTF?
118 raise RuntimeError
120 def __should_commit(self) -> bool:
121 if self.is_system:
122 return False
123 if self.muc.get_lock("fill participants"):
124 return False
125 if self.muc.get_lock("fill history"):
126 return False
127 return True
129 def commit(self, merge: bool = False) -> None:
130 if not self.__should_commit():
131 return
132 super().commit(merge)
134 @property
135 def user_jid(self) -> JID:
136 return self.session.user_jid
138 def __repr__(self) -> str:
139 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
141 @property
142 def _presence_sent(self) -> bool:
143 # we track if we already sent a presence for this participant.
144 # if we didn't, we send it before the first message.
145 # this way, event in plugins that don't map "user has joined" events,
146 # we send a "join"-presence from the participant before the first message
147 return self.stored.presence_sent
149 @_presence_sent.setter
150 def _presence_sent(self, val: bool) -> None:
151 if self._presence_sent == val:
152 return
153 self.stored.presence_sent = val
154 if not self.__should_commit():
155 return
156 with self.xmpp.store.session() as orm:
157 orm.execute(
158 sa.update(Participant)
159 .where(Participant.id == self.stored.id)
160 .values(presence_sent=val)
161 )
162 orm.commit()
164 @property
165 def nickname_no_illegal(self) -> str:
166 return self.stored.nickname_no_illegal
168 @property
169 def affiliation(self) -> MucAffiliation:
170 return self.stored.affiliation
172 @affiliation.setter
173 def affiliation(self, affiliation: MucAffiliation) -> None:
174 if self.affiliation == affiliation:
175 return
176 self.stored.affiliation = affiliation
177 if not self.muc.participants_filled:
178 return
179 self.commit()
180 if not self._presence_sent:
181 return
182 self.send_last_presence(force=True, no_cache_online=True)
184 def send_affiliation_change(self) -> None:
185 # internal use by slidge
186 msg = self._make_message()
187 msg["muc"]["affiliation"] = self.affiliation
188 msg["type"] = "normal"
189 if not self.muc.is_anonymous and not self.is_system:
190 if self.contact:
191 msg["muc"]["jid"] = self.contact.jid
192 else:
193 warnings.warn(
194 f"Private group but no 1:1 JID associated to '{self}'",
195 )
196 self._send(msg)
198 @property
199 def role(self) -> MucRole:
200 return self.stored.role
202 @role.setter
203 def role(self, role: MucRole) -> None:
204 if self.role == role:
205 return
206 self.stored.role = role
207 if not self.muc.participants_filled:
208 return
209 self.commit()
210 if not self._presence_sent:
211 return
212 self.send_last_presence(force=True, no_cache_online=True)
214 @property
215 def hats(self) -> list[Hat]:
216 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else []
218 def set_hats(self, hats: list[Hat]) -> None:
219 if self.hats == hats:
220 return
221 self.stored.hats = hats
222 if not self.muc.participants_filled:
223 return
224 self.commit(merge=True)
225 if not self._presence_sent:
226 return
227 self.send_last_presence(force=True, no_cache_online=True)
229 def __update_resource(self, unescaped_nickname: str | None) -> None:
230 if not unescaped_nickname:
231 self.stored.resource = ""
232 if self.is_system:
233 self.stored.nickname_no_illegal = ""
234 else:
235 warnings.warn(
236 "Only the system participant is allowed to not have a nickname"
237 )
238 nickname = f"unnamed-{uuid.uuid4()}"
239 self.stored.resource = self.stored.nickname_no_illegal = nickname
240 return
242 self.stored.nickname_no_illegal, jid = escape_nickname(
243 self.muc.jid,
244 unescaped_nickname,
245 )
246 self.stored.resource = jid.resource
248 def send_configuration_change(self, codes: tuple[int, ...]) -> None:
249 if not self.is_system:
250 raise RuntimeError("This is only possible for the system participant")
251 msg = self._make_message()
252 msg["muc"]["status_codes"] = codes
253 self._send(msg)
255 @property
256 def nickname(self) -> str:
257 return self.stored.nickname
259 @nickname.setter
260 def nickname(self, new_nickname: str) -> None:
261 old = self.nickname
262 if new_nickname == old:
263 return
265 if self.muc.stored.id is not None:
266 with self.xmpp.store.session() as orm:
267 if not self.xmpp.store.rooms.nick_available(
268 orm, self.muc.stored.id, new_nickname
269 ):
270 if self.contact is None:
271 new_nickname = f"{new_nickname} ({self.occupant_id})"
272 else:
273 new_nickname = f"{new_nickname} ({self.contact.legacy_id})"
275 cache = getattr(self, "_last_presence", None)
276 if cache:
277 last_seen = cache.last_seen
278 kwargs = cache.presence_kwargs
279 else:
280 last_seen = None
281 kwargs = {}
283 kwargs["status_codes"] = {303}
285 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
286 # in this order so pfrom=old resource and we actually use the escaped nick
287 # in the muc/item/nick element
288 self.__update_resource(new_nickname)
289 p["muc"]["item"]["nick"] = self.jid.resource
290 self._send(p)
292 self.stored.nickname = new_nickname
293 self.commit()
294 kwargs["status_codes"] = set()
295 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
296 self._send(p)
298 def _make_presence( # type:ignore[no-untyped-def]
299 self,
300 *,
301 last_seen: datetime | None = None,
302 status_codes: set[int] | None = None,
303 user_full_jid: JID | None = None,
304 **presence_kwargs, # noqa type:ignore[no-untyped-def]
305 ) -> Presence:
306 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
307 p["muc"]["affiliation"] = self.affiliation
308 p["muc"]["role"] = self.role
309 if self.hats:
310 p["hats"].add_hats(self.hats)
311 codes = status_codes or set()
312 if self.is_user:
313 codes.add(110)
314 if not self.muc.is_anonymous and not self.is_system:
315 if self.is_user:
316 if user_full_jid:
317 p["muc"]["jid"] = user_full_jid
318 else:
319 jid = JID(self.user_jid)
320 try:
321 jid.resource = next(iter(self.muc.get_user_resources()))
322 except StopIteration:
323 jid.resource = "pseudo-resource"
324 p["muc"]["jid"] = self.user_jid
325 codes.add(100)
326 elif self.contact:
327 p["muc"]["jid"] = self.contact.jid
328 if a := self.contact.get_avatar():
329 p["vcard_temp_update"]["photo"] = a.id
330 else:
331 warnings.warn(
332 f"Private group but no 1:1 JID associated to '{self}'",
333 )
334 if self.is_user and (hash_ := self.session.user.avatar_hash):
335 p["vcard_temp_update"]["photo"] = hash_
336 p["muc"]["status_codes"] = codes
337 return p
339 @property
340 def DISCO_NAME(self) -> str:
341 return self.nickname
343 @DISCO_NAME.setter
344 def DISCO_NAME(self, _: str) -> Never:
345 raise RuntimeError
347 def __send_presence_if_needed(
348 self, stanza: Message | Presence, full_jid: JID, archive_only: bool
349 ) -> None:
350 if (
351 archive_only
352 or self.is_system
353 or self.is_user
354 or self._presence_sent
355 or stanza["subject"]
356 ):
357 return
358 if isinstance(stanza, Message):
359 if stanza.get_plugin("muc", check=True):
360 return
361 self.send_initial_presence(full_jid)
363 @property
364 def occupant_id(self) -> str:
365 return self.stored.occupant_id
367 def _send(
368 self,
369 stanza: MessageOrPresenceTypeVar,
370 full_jid: JID | None = None,
371 archive_only: bool = False,
372 legacy_msg_id: LegacyMessageType | None = None,
373 initial_presence: bool = False,
374 **send_kwargs: Any, # noqa:ANN401
375 ) -> MessageOrPresenceTypeVar:
376 if stanza.get_from().resource:
377 stanza["occupant-id"]["id"] = self.occupant_id
378 else:
379 stanza["occupant-id"]["id"] = "room"
380 self.__add_nick_element(stanza)
381 if not self.is_user and isinstance(stanza, Presence):
382 if stanza["type"] == "unavailable" and not self._presence_sent:
383 return stanza
384 if initial_presence:
385 self.stored.presence_sent = True
386 else:
387 self._presence_sent = True
388 if full_jid:
389 stanza["to"] = full_jid
390 self.__send_presence_if_needed(stanza, full_jid, archive_only)
391 if self.is_user:
392 assert stanza.stream is not None
393 stanza.stream.send(stanza, use_filters=False)
394 else:
395 stanza.send()
396 else:
397 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
398 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
399 if archive_only:
400 return stanza
401 for user_full_jid in self.muc.user_full_jids():
402 stanza = copy(stanza)
403 stanza["to"] = user_full_jid
404 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
405 stanza.send()
406 return stanza
408 def mucadmin_item(self) -> MUCAdminItem:
409 item = MUCAdminItem()
410 item["nick"] = self.nickname
411 item["affiliation"] = self.affiliation
412 item["role"] = self.role
413 if not self.muc.is_anonymous:
414 if self.is_user:
415 item["jid"] = self.user_jid.bare
416 elif self.contact:
417 item["jid"] = self.contact.jid.bare
418 else:
419 warnings.warn(
420 (
421 f"Private group but no contact JID associated to {self.jid} in"
422 f" {self}"
423 ),
424 )
425 return item
427 def __add_nick_element(self, stanza: Presence | Message) -> None:
428 if (nick := self.nickname_no_illegal) != self.jid.resource:
429 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
430 n["nick"] = nick
431 stanza.append(n)
433 def _get_last_presence(self) -> CachedPresence | None:
434 own = super()._get_last_presence()
435 if own is None and self.contact:
436 return self.contact._get_last_presence()
437 return own
439 def send_initial_presence(
440 self,
441 full_jid: JID,
442 nick_change: bool = False,
443 presence_id: str | None = None,
444 ) -> None:
445 """
446 Called when the user joins a MUC, as a mechanism
447 to indicate to the joining XMPP client the list of "participants".
449 Can be called this to trigger a "participant has joined the group" event.
451 :param full_jid: Set this to only send to a specific user XMPP resource.
452 :param nick_change: Used when the user joins and the MUC renames them (code 210)
453 :param presence_id: set the presence ID. used internally by slidge
454 """
455 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
456 codes = set()
457 if nick_change:
458 codes.add(210)
460 if self.is_user:
461 # the "initial presence" of the user has to be vanilla, as it is
462 # a crucial part of the MUC join sequence for XMPP clients.
463 kwargs = {}
464 else:
465 cache = self._get_last_presence()
466 self.log.debug("Join muc, initial presence: %s", cache)
467 if cache:
468 ptype = cache.ptype
469 if ptype == "unavailable":
470 return
471 kwargs = dict(
472 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
473 )
474 else:
475 kwargs = {}
476 p = self._make_presence(
477 status_codes=codes,
478 user_full_jid=full_jid,
479 **kwargs, # type:ignore
480 )
481 if presence_id:
482 p["id"] = presence_id
483 self._send(p, full_jid, initial_presence=True)
485 def leave(self) -> None:
486 """
487 Call this when the participant leaves the room
488 """
489 self.muc.remove_participant(self)
491 def kick(self, reason: str | None = None) -> None:
492 """
493 Call this when the participant is kicked from the room
494 """
495 self.muc.remove_participant(self, kick=True, reason=reason)
497 def ban(self, reason: str | None = None) -> None:
498 """
499 Call this when the participant is banned from the room
500 """
501 self.muc.remove_participant(self, ban=True, reason=reason)
503 async def get_disco_info(
504 self, jid: OptJid = None, node: str | None = None
505 ) -> DiscoInfo:
506 if self.contact is not None:
507 return await self.contact.get_disco_info()
508 return await super().get_disco_info()
510 def moderate(
511 self, legacy_msg_id: LegacyMessageType, reason: str | None = None
512 ) -> None:
513 for i in self._legacy_to_xmpp(legacy_msg_id):
514 m = self.muc.get_system_participant()._make_message()
515 m["retract"]["id"] = i
516 if self.is_system:
517 m["retract"].enable("moderated")
518 else:
519 m["retract"]["moderated"]["by"] = self.jid
520 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id
521 if reason:
522 m["retract"]["reason"] = reason
523 self._send(m)
525 def set_room_subject(
526 self,
527 subject: str,
528 full_jid: JID | None = None,
529 when: datetime | None = None,
530 update_muc: bool = True,
531 ) -> None:
532 if update_muc:
533 self.muc._subject = subject # type: ignore
534 self.muc.subject_setter = self.nickname
535 self.muc.subject_date = when
537 msg = self._make_message()
538 if when is not None:
539 msg["delay"].set_stamp(when)
540 msg["delay"]["from"] = self.muc.jid
541 if subject:
542 msg["subject"] = subject
543 else:
544 # may be simplified if slixmpp lets it do it more easily some day
545 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
546 self._send(msg, full_jid)
548 def set_thread_subject(
549 self,
550 thread: LegacyThreadType,
551 subject: str | None,
552 when: datetime | None = None,
553 ) -> None:
554 msg = self._make_message()
555 msg["thread"] = str(thread)
556 if when is not None:
557 msg["delay"].set_stamp(when)
558 msg["delay"]["from"] = self.muc.jid
559 if subject:
560 msg["subject"] = subject
561 else:
562 # may be simplified if slixmpp lets it do it more easily some day
563 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
564 self._send(msg)
567def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]:
568 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ")
570 jid = JID(muc_jid)
572 try:
573 jid.resource = nickname
574 except InvalidJID:
575 nickname = nickname.encode("punycode").decode()
576 try:
577 jid.resource = nickname
578 except InvalidJID:
579 # at this point there still might be control chars
580 jid.resource = strip_non_printable(nickname)
582 return nickname_no_illegal, jid
585log = logging.getLogger(__name__)