Coverage for slidge / group / participant.py: 89%
365 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import logging
2import string
3import uuid
4import warnings
5from copy import copy
6from datetime import datetime
7from typing import TYPE_CHECKING, Any, Optional, Union
8from xml.etree import ElementTree as ET
10import sqlalchemy as sa
11from slixmpp import JID, InvalidJID, Message, Presence
12from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
13from slixmpp.types import MessageTypes, OptJid
14from sqlalchemy.orm.exc import DetachedInstanceError
16from ..contact import LegacyContact
17from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin
18from ..core.mixins.db import DBMixin
19from ..db.models import Participant
20from ..util import SubclassableOnce, strip_illegal_chars
21from ..util.types import (
22 CachedPresence,
23 Hat,
24 LegacyMessageType,
25 LegacyThreadType,
26 MessageOrPresenceTypeVar,
27 MucAffiliation,
28 MucRole,
29)
31if TYPE_CHECKING:
32 from .room import LegacyMUC
35def strip_non_printable(nickname: str):
36 new = (
37 "".join(x for x in nickname if x in string.printable)
38 + f"-slidge-{hash(nickname)}"
39 )
40 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
41 return new
44class LegacyParticipant(
45 PresenceMixin,
46 MessageMixin,
47 ChatterDiscoMixin,
48 DBMixin,
49 metaclass=SubclassableOnce,
50):
51 """
52 A legacy participant of a legacy group chat.
53 """
55 is_participant = True
57 mtype: MessageTypes = "groupchat"
58 _can_send_carbon = False
59 USE_STANZA_ID = True
60 STRIP_SHORT_DELAY = False
61 stored: Participant
62 contact: LegacyContact[Any] | None
64 def __init__(
65 self,
66 muc: "LegacyMUC",
67 stored: Participant,
68 is_system: bool = False,
69 contact: LegacyContact[Any] | None = None,
70 ) -> None:
71 self.muc = muc
72 self.session = muc.session
73 self.xmpp = muc.session.xmpp
74 self.is_system = is_system
76 if contact is None and stored.contact is not None:
77 contact = self.session.contacts.from_store(stored=stored.contact)
78 if contact is not None and stored.contact is None:
79 stored.contact = contact.stored
81 self.stored = stored
82 self.contact = contact
84 super().__init__()
86 if stored.resource is None:
87 self.__update_resource(stored.nickname)
89 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
91 @property
92 def is_user(self) -> bool:
93 try:
94 return self.stored.is_user
95 except DetachedInstanceError:
96 self.merge()
97 return self.stored.is_user
99 @is_user.setter
100 def is_user(self, is_user: bool) -> None:
101 with self.xmpp.store.session(expire_on_commit=True) as orm:
102 orm.add(self.stored)
103 self.stored.is_user = is_user
104 orm.commit()
106 @property
107 def jid(self) -> JID:
108 jid = JID(self.muc.jid)
109 if self.stored.resource:
110 jid.resource = self.stored.resource
111 return jid
113 @jid.setter
114 def jid(self, x: JID):
115 # FIXME: without this, mypy yields
116 # "Cannot override writeable attribute with read-only property"
117 # But it does not happen for LegacyContact. WTF?
118 raise RuntimeError
120 def __should_commit(self) -> bool:
121 if self.is_system:
122 return False
123 if self.muc.get_lock("fill participants"):
124 return False
125 if self.muc.get_lock("fill history"):
126 return False
127 return True
129 def commit(self, *args, **kwargs) -> None:
130 if not self.__should_commit():
131 return
132 super().commit(*args, **kwargs)
134 @property
135 def user_jid(self):
136 return self.session.user_jid
138 def __repr__(self) -> str:
139 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
141 @property
142 def _presence_sent(self) -> bool:
143 # we track if we already sent a presence for this participant.
144 # if we didn't, we send it before the first message.
145 # this way, event in plugins that don't map "user has joined" events,
146 # we send a "join"-presence from the participant before the first message
147 return self.stored.presence_sent
149 @_presence_sent.setter
150 def _presence_sent(self, val: bool) -> None:
151 if self._presence_sent == val:
152 return
153 self.stored.presence_sent = val
154 if not self.__should_commit():
155 return
156 with self.xmpp.store.session() as orm:
157 orm.execute(
158 sa.update(Participant)
159 .where(Participant.id == self.stored.id)
160 .values(presence_sent=val)
161 )
162 orm.commit()
164 @property
165 def nickname_no_illegal(self) -> str:
166 return self.stored.nickname_no_illegal
168 @property
169 def affiliation(self):
170 return self.stored.affiliation
172 @affiliation.setter
173 def affiliation(self, affiliation: MucAffiliation) -> None:
174 if self.affiliation == affiliation:
175 return
176 self.stored.affiliation = affiliation
177 if not self.muc.participants_filled:
178 return
179 self.commit()
180 if not self._presence_sent:
181 return
182 self.send_last_presence(force=True, no_cache_online=True)
184 def send_affiliation_change(self) -> None:
185 # internal use by slidge
186 msg = self._make_message()
187 msg["muc"]["affiliation"] = self.affiliation
188 msg["type"] = "normal"
189 if not self.muc.is_anonymous and not self.is_system:
190 if self.contact:
191 msg["muc"]["jid"] = self.contact.jid
192 else:
193 warnings.warn(
194 f"Private group but no 1:1 JID associated to '{self}'",
195 )
196 self._send(msg)
198 @property
199 def role(self):
200 return self.stored.role
202 @role.setter
203 def role(self, role: MucRole) -> None:
204 if self.role == role:
205 return
206 self.stored.role = role
207 if not self.muc.participants_filled:
208 return
209 self.commit()
210 if not self._presence_sent:
211 return
212 self.send_last_presence(force=True, no_cache_online=True)
214 @property
215 def hats(self) -> list[Hat]:
216 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else []
218 def set_hats(self, hats: list[Hat]) -> None:
219 if self.hats == hats:
220 return
221 self.stored.hats = hats # type:ignore[assignment]
222 if not self.muc.participants_filled:
223 return
224 self.commit(merge=True)
225 if not self._presence_sent:
226 return
227 self.send_last_presence(force=True, no_cache_online=True)
229 def __update_resource(self, unescaped_nickname: Optional[str]) -> None:
230 if not unescaped_nickname:
231 self.stored.resource = ""
232 if self.is_system:
233 self.stored.nickname_no_illegal = ""
234 else:
235 warnings.warn(
236 "Only the system participant is allowed to not have a nickname"
237 )
238 nickname = f"unnamed-{uuid.uuid4()}"
239 self.stored.resource = self.stored.nickname_no_illegal = nickname
240 return
242 self.stored.nickname_no_illegal, jid = escape_nickname(
243 self.muc.jid,
244 unescaped_nickname,
245 )
246 self.stored.resource = jid.resource
248 def send_configuration_change(self, codes: tuple[int]):
249 if not self.is_system:
250 raise RuntimeError("This is only possible for the system participant")
251 msg = self._make_message()
252 msg["muc"]["status_codes"] = codes
253 self._send(msg)
255 @property
256 def nickname(self):
257 return self.stored.nickname
259 @nickname.setter
260 def nickname(self, new_nickname: str) -> None:
261 old = self.nickname
262 if new_nickname == old:
263 return
265 if self.muc.stored.id is not None:
266 with self.xmpp.store.session() as orm:
267 if not self.xmpp.store.rooms.nick_available(
268 orm, self.muc.stored.id, new_nickname
269 ):
270 if self.contact is None:
271 new_nickname = f"{new_nickname} ({self.occupant_id})"
272 else:
273 new_nickname = f"{new_nickname} ({self.contact.legacy_id})"
275 cache = getattr(self, "_last_presence", None)
276 if cache:
277 last_seen = cache.last_seen
278 kwargs = cache.presence_kwargs
279 else:
280 last_seen = None
281 kwargs = {}
283 kwargs["status_codes"] = {303}
285 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
286 # in this order so pfrom=old resource and we actually use the escaped nick
287 # in the muc/item/nick element
288 self.__update_resource(new_nickname)
289 p["muc"]["item"]["nick"] = self.jid.resource
290 self._send(p)
292 self.stored.nickname = new_nickname
293 self.commit()
294 kwargs["status_codes"] = set()
295 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
296 self._send(p)
298 def _make_presence(
299 self,
300 *,
301 last_seen: Optional[datetime] = None,
302 status_codes: Optional[set[int]] = None,
303 user_full_jid: Optional[JID] = None,
304 **presence_kwargs,
305 ):
306 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
307 p["muc"]["affiliation"] = self.affiliation
308 p["muc"]["role"] = self.role
309 if self.hats:
310 p["hats"].add_hats(self.hats)
311 codes = status_codes or set()
312 if self.is_user:
313 codes.add(110)
314 if not self.muc.is_anonymous and not self.is_system:
315 if self.is_user:
316 if user_full_jid:
317 p["muc"]["jid"] = user_full_jid
318 else:
319 jid = JID(self.user_jid)
320 try:
321 jid.resource = next(iter(self.muc.get_user_resources()))
322 except StopIteration:
323 jid.resource = "pseudo-resource"
324 p["muc"]["jid"] = self.user_jid
325 codes.add(100)
326 elif self.contact:
327 p["muc"]["jid"] = self.contact.jid
328 if a := self.contact.get_avatar():
329 p["vcard_temp_update"]["photo"] = a.id
330 else:
331 warnings.warn(
332 f"Private group but no 1:1 JID associated to '{self}'",
333 )
334 if self.is_user and (hash_ := self.session.user.avatar_hash):
335 p["vcard_temp_update"]["photo"] = hash_
336 p["muc"]["status_codes"] = codes
337 return p
339 @property
340 def DISCO_NAME(self): # type:ignore[override]
341 return self.nickname
343 def __send_presence_if_needed(
344 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool
345 ) -> None:
346 if (
347 archive_only
348 or self.is_system
349 or self.is_user
350 or self._presence_sent
351 or stanza["subject"]
352 ):
353 return
354 if isinstance(stanza, Message):
355 if stanza.get_plugin("muc", check=True):
356 return
357 self.send_initial_presence(full_jid)
359 @property
360 def occupant_id(self) -> str:
361 return self.stored.occupant_id
363 def _send(
364 self,
365 stanza: MessageOrPresenceTypeVar,
366 full_jid: Optional[JID] = None,
367 archive_only: bool = False,
368 legacy_msg_id=None,
369 initial_presence=False,
370 **send_kwargs,
371 ) -> MessageOrPresenceTypeVar:
372 if stanza.get_from().resource:
373 stanza["occupant-id"]["id"] = self.occupant_id
374 else:
375 stanza["occupant-id"]["id"] = "room"
376 self.__add_nick_element(stanza)
377 if not self.is_user and isinstance(stanza, Presence):
378 if stanza["type"] == "unavailable" and not self._presence_sent:
379 return stanza # type:ignore
380 if initial_presence:
381 self.stored.presence_sent = True
382 else:
383 self._presence_sent = True
384 if full_jid:
385 stanza["to"] = full_jid
386 self.__send_presence_if_needed(stanza, full_jid, archive_only)
387 if self.is_user:
388 assert stanza.stream is not None
389 stanza.stream.send(stanza, use_filters=False)
390 else:
391 stanza.send()
392 else:
393 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
394 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
395 if archive_only:
396 return stanza
397 for user_full_jid in self.muc.user_full_jids():
398 stanza = copy(stanza)
399 stanza["to"] = user_full_jid
400 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
401 stanza.send()
402 return stanza
404 def mucadmin_item(self):
405 item = MUCAdminItem()
406 item["nick"] = self.nickname
407 item["affiliation"] = self.affiliation
408 item["role"] = self.role
409 if not self.muc.is_anonymous:
410 if self.is_user:
411 item["jid"] = self.user_jid.bare
412 elif self.contact:
413 item["jid"] = self.contact.jid.bare
414 else:
415 warnings.warn(
416 (
417 f"Private group but no contact JID associated to {self.jid} in"
418 f" {self}"
419 ),
420 )
421 return item
423 def __add_nick_element(self, stanza: Union[Presence, Message]) -> None:
424 if (nick := self.nickname_no_illegal) != self.jid.resource:
425 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
426 n["nick"] = nick
427 stanza.append(n)
429 def _get_last_presence(self) -> Optional[CachedPresence]:
430 own = super()._get_last_presence()
431 if own is None and self.contact:
432 return self.contact._get_last_presence()
433 return own
435 def send_initial_presence(
436 self,
437 full_jid: JID,
438 nick_change: bool = False,
439 presence_id: Optional[str] = None,
440 ) -> None:
441 """
442 Called when the user joins a MUC, as a mechanism
443 to indicate to the joining XMPP client the list of "participants".
445 Can be called this to trigger a "participant has joined the group" event.
447 :param full_jid: Set this to only send to a specific user XMPP resource.
448 :param nick_change: Used when the user joins and the MUC renames them (code 210)
449 :param presence_id: set the presence ID. used internally by slidge
450 """
451 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
452 codes = set()
453 if nick_change:
454 codes.add(210)
456 if self.is_user:
457 # the "initial presence" of the user has to be vanilla, as it is
458 # a crucial part of the MUC join sequence for XMPP clients.
459 kwargs = {}
460 else:
461 cache = self._get_last_presence()
462 self.log.debug("Join muc, initial presence: %s", cache)
463 if cache:
464 ptype = cache.ptype
465 if ptype == "unavailable":
466 return
467 kwargs = dict(
468 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
469 )
470 else:
471 kwargs = {}
472 p = self._make_presence(
473 status_codes=codes,
474 user_full_jid=full_jid,
475 **kwargs, # type:ignore
476 )
477 if presence_id:
478 p["id"] = presence_id
479 self._send(p, full_jid, initial_presence=True)
481 def leave(self) -> None:
482 """
483 Call this when the participant leaves the room
484 """
485 self.muc.remove_participant(self)
487 def kick(self, reason: str | None = None) -> None:
488 """
489 Call this when the participant is kicked from the room
490 """
491 self.muc.remove_participant(self, kick=True, reason=reason)
493 def ban(self, reason: str | None = None) -> None:
494 """
495 Call this when the participant is banned from the room
496 """
497 self.muc.remove_participant(self, ban=True, reason=reason)
499 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None):
500 if self.contact is not None:
501 return self.contact.get_disco_info()
502 return super().get_disco_info()
504 def moderate(
505 self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None
506 ) -> None:
507 for i in self._legacy_to_xmpp(legacy_msg_id):
508 m = self.muc.get_system_participant()._make_message()
509 m["retract"]["id"] = i
510 if self.is_system:
511 m["retract"].enable("moderated")
512 else:
513 m["retract"]["moderated"]["by"] = self.jid
514 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id
515 if reason:
516 m["retract"]["reason"] = reason
517 self._send(m)
519 def set_room_subject(
520 self,
521 subject: str,
522 full_jid: Optional[JID] = None,
523 when: Optional[datetime] = None,
524 update_muc: bool = True,
525 ) -> None:
526 if update_muc:
527 self.muc._subject = subject # type: ignore
528 self.muc.subject_setter = self.nickname
529 self.muc.subject_date = when
531 msg = self._make_message()
532 if when is not None:
533 msg["delay"].set_stamp(when)
534 msg["delay"]["from"] = self.muc.jid
535 if subject:
536 msg["subject"] = subject
537 else:
538 # may be simplified if slixmpp lets it do it more easily some day
539 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
540 self._send(msg, full_jid)
542 def set_thread_subject(
543 self,
544 thread: LegacyThreadType,
545 subject: str | None,
546 when: Optional[datetime] = None,
547 ) -> None:
548 msg = self._make_message()
549 msg["thread"] = str(thread)
550 if when is not None:
551 msg["delay"].set_stamp(when)
552 msg["delay"]["from"] = self.muc.jid
553 if subject:
554 msg["subject"] = subject
555 else:
556 # may be simplified if slixmpp lets it do it more easily some day
557 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
558 self._send(msg)
561def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]:
562 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ")
564 jid = JID(muc_jid)
566 try:
567 jid.resource = nickname
568 except InvalidJID:
569 nickname = nickname.encode("punycode").decode()
570 try:
571 jid.resource = nickname
572 except InvalidJID:
573 # at this point there still might be control chars
574 jid.resource = strip_non_printable(nickname)
576 return nickname_no_illegal, jid
579log = logging.getLogger(__name__)