Coverage for slidge/group/participant.py: 89%
366 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import logging
2import string
3import uuid
4import warnings
5from copy import copy
6from datetime import datetime
7from functools import cached_property
8from typing import TYPE_CHECKING, Any, Optional, Union
9from xml.etree import ElementTree as ET
11import sqlalchemy as sa
12from slixmpp import JID, InvalidJID, Message, Presence
13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
14from slixmpp.types import MessageTypes, OptJid
15from sqlalchemy.orm.exc import DetachedInstanceError
17from ..contact import LegacyContact
18from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin
19from ..core.mixins.db import DBMixin
20from ..db.models import Participant
21from ..util import SubclassableOnce, strip_illegal_chars
22from ..util.types import (
23 CachedPresence,
24 Hat,
25 LegacyMessageType,
26 LegacyThreadType,
27 MessageOrPresenceTypeVar,
28 MucAffiliation,
29 MucRole,
30)
32if TYPE_CHECKING:
33 from .room import LegacyMUC
36def strip_non_printable(nickname: str):
37 new = (
38 "".join(x for x in nickname if x in string.printable)
39 + f"-slidge-{hash(nickname)}"
40 )
41 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
42 return new
45class LegacyParticipant(
46 PresenceMixin,
47 MessageMixin,
48 ChatterDiscoMixin,
49 DBMixin,
50 metaclass=SubclassableOnce,
51):
52 """
53 A legacy participant of a legacy group chat.
54 """
56 is_participant = True
58 mtype: MessageTypes = "groupchat"
59 _can_send_carbon = False
60 USE_STANZA_ID = True
61 STRIP_SHORT_DELAY = False
62 stored: Participant
63 contact: LegacyContact[Any] | None
65 def __init__(
66 self,
67 muc: "LegacyMUC",
68 stored: Participant,
69 is_system: bool = False,
70 contact: LegacyContact[Any] | None = None,
71 ) -> None:
72 self.muc = muc
73 self.session = muc.session
74 self.xmpp = muc.session.xmpp
75 self.is_system = is_system
77 if contact is None and stored.contact is not None:
78 contact = self.session.contacts.from_store(stored=stored.contact)
79 if contact is not None and stored.contact is None:
80 stored.contact = contact.stored
82 self.stored = stored
83 self.contact = contact
85 super().__init__()
87 if stored.resource is None:
88 self.__update_resource(stored.nickname)
90 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
92 @property
93 def is_user(self) -> bool:
94 try:
95 return self.stored.is_user
96 except DetachedInstanceError:
97 self.merge()
98 return self.stored.is_user
100 @is_user.setter
101 def is_user(self, is_user: bool) -> None:
102 with self.xmpp.store.session(expire_on_commit=True) as orm:
103 orm.add(self.stored)
104 self.stored.is_user = is_user
105 orm.commit()
107 @property
108 def jid(self) -> JID:
109 jid = JID(self.muc.jid)
110 if self.stored.resource:
111 jid.resource = self.stored.resource
112 return jid
114 @jid.setter
115 def jid(self, x: JID):
116 # FIXME: without this, mypy yields
117 # "Cannot override writeable attribute with read-only property"
118 # But it does not happen for LegacyContact. WTF?
119 raise RuntimeError
121 def __should_commit(self) -> bool:
122 if self.is_system:
123 return False
124 if self.muc.get_lock("fill participants"):
125 return False
126 if self.muc.get_lock("fill history"):
127 return False
128 return True
130 def commit(self, *args, **kwargs) -> None:
131 if not self.__should_commit():
132 return
133 super().commit(*args, **kwargs)
135 @property
136 def user_jid(self):
137 return self.session.user_jid
139 def __repr__(self) -> str:
140 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
142 @property
143 def _presence_sent(self) -> bool:
144 # we track if we already sent a presence for this participant.
145 # if we didn't, we send it before the first message.
146 # this way, event in plugins that don't map "user has joined" events,
147 # we send a "join"-presence from the participant before the first message
148 return self.stored.presence_sent
150 @_presence_sent.setter
151 def _presence_sent(self, val: bool) -> None:
152 if self._presence_sent == val:
153 return
154 self.stored.presence_sent = val
155 if not self.__should_commit():
156 return
157 with self.xmpp.store.session() as orm:
158 orm.execute(
159 sa.update(Participant)
160 .where(Participant.id == self.stored.id)
161 .values(presence_sent=val)
162 )
163 orm.commit()
165 @property
166 def nickname_no_illegal(self) -> str:
167 return self.stored.nickname_no_illegal
169 @property
170 def affiliation(self):
171 return self.stored.affiliation
173 @affiliation.setter
174 def affiliation(self, affiliation: MucAffiliation) -> None:
175 if self.affiliation == affiliation:
176 return
177 self.stored.affiliation = affiliation
178 if not self.muc.participants_filled:
179 return
180 self.commit()
181 if not self._presence_sent:
182 return
183 self.send_last_presence(force=True, no_cache_online=True)
185 def send_affiliation_change(self) -> None:
186 # internal use by slidge
187 msg = self._make_message()
188 msg["muc"]["affiliation"] = self.affiliation
189 msg["type"] = "normal"
190 if not self.muc.is_anonymous and not self.is_system:
191 if self.contact:
192 msg["muc"]["jid"] = self.contact.jid
193 else:
194 warnings.warn(
195 f"Private group but no 1:1 JID associated to '{self}'",
196 )
197 self._send(msg)
199 @property
200 def role(self):
201 return self.stored.role
203 @role.setter
204 def role(self, role: MucRole) -> None:
205 if self.role == role:
206 return
207 self.stored.role = role
208 if not self.muc.participants_filled:
209 return
210 self.commit()
211 if not self._presence_sent:
212 return
213 self.send_last_presence(force=True, no_cache_online=True)
215 @property
216 def hats(self) -> list[Hat]:
217 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else []
219 def set_hats(self, hats: list[Hat]) -> None:
220 if self.hats == hats:
221 return
222 self.stored.hats = hats # type:ignore[assignment]
223 if not self.muc.participants_filled:
224 return
225 self.commit(merge=True)
226 if not self._presence_sent:
227 return
228 self.send_last_presence(force=True, no_cache_online=True)
230 def __update_resource(self, unescaped_nickname: Optional[str]) -> None:
231 if not unescaped_nickname:
232 self.stored.resource = ""
233 if self.is_system:
234 self.stored.nickname_no_illegal = ""
235 else:
236 warnings.warn(
237 "Only the system participant is allowed to not have a nickname"
238 )
239 nickname = f"unnamed-{uuid.uuid4()}"
240 self.stored.resource = self.stored.nickname_no_illegal = nickname
241 return
243 self.stored.nickname_no_illegal, jid = escape_nickname(
244 self.muc.jid,
245 unescaped_nickname,
246 )
247 self.stored.resource = jid.resource
249 def send_configuration_change(self, codes: tuple[int]):
250 if not self.is_system:
251 raise RuntimeError("This is only possible for the system participant")
252 msg = self._make_message()
253 msg["muc"]["status_codes"] = codes
254 self._send(msg)
256 @property
257 def nickname(self):
258 return self.stored.nickname
260 @nickname.setter
261 def nickname(self, new_nickname: str) -> None:
262 old = self.nickname
263 if new_nickname == old:
264 return
266 cache = getattr(self, "_last_presence", None)
267 if cache:
268 last_seen = cache.last_seen
269 kwargs = cache.presence_kwargs
270 else:
271 last_seen = None
272 kwargs = {}
274 kwargs["status_codes"] = {303}
276 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
277 # in this order so pfrom=old resource and we actually use the escaped nick
278 # in the muc/item/nick element
279 self.__update_resource(new_nickname)
280 p["muc"]["item"]["nick"] = self.jid.resource
281 self._send(p)
283 self.stored.nickname = new_nickname
284 self.commit()
285 kwargs["status_codes"] = set()
286 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
287 self._send(p)
289 def _make_presence(
290 self,
291 *,
292 last_seen: Optional[datetime] = None,
293 status_codes: Optional[set[int]] = None,
294 user_full_jid: Optional[JID] = None,
295 **presence_kwargs,
296 ):
297 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
298 p["muc"]["affiliation"] = self.affiliation
299 p["muc"]["role"] = self.role
300 if self.hats:
301 p["hats"].add_hats(self.hats)
302 codes = status_codes or set()
303 if self.is_user:
304 codes.add(110)
305 if not self.muc.is_anonymous and not self.is_system:
306 if self.is_user:
307 if user_full_jid:
308 p["muc"]["jid"] = user_full_jid
309 else:
310 jid = JID(self.user_jid)
311 try:
312 jid.resource = next(iter(self.muc.get_user_resources()))
313 except StopIteration:
314 jid.resource = "pseudo-resource"
315 p["muc"]["jid"] = self.user_jid
316 codes.add(100)
317 elif self.contact:
318 p["muc"]["jid"] = self.contact.jid
319 if a := self.contact.get_avatar():
320 p["vcard_temp_update"]["photo"] = a.id
321 else:
322 warnings.warn(
323 f"Private group but no 1:1 JID associated to '{self}'",
324 )
325 if self.is_user and (hash_ := self.session.user.avatar_hash):
326 p["vcard_temp_update"]["photo"] = hash_
327 p["muc"]["status_codes"] = codes
328 return p
330 @property
331 def DISCO_NAME(self): # type:ignore[override]
332 return self.nickname
334 def __send_presence_if_needed(
335 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool
336 ) -> None:
337 if (
338 archive_only
339 or self.is_system
340 or self.is_user
341 or self._presence_sent
342 or stanza["subject"]
343 ):
344 return
345 if isinstance(stanza, Message):
346 if stanza.get_plugin("muc", check=True):
347 return
348 self.send_initial_presence(full_jid)
350 @cached_property
351 def __occupant_id(self):
352 if self.contact:
353 return self.contact.jid
354 elif self.is_user:
355 return "slidge-user"
356 elif self.is_system:
357 return "room"
358 else:
359 return str(uuid.uuid4())
361 def _send(
362 self,
363 stanza: MessageOrPresenceTypeVar,
364 full_jid: Optional[JID] = None,
365 archive_only: bool = False,
366 legacy_msg_id=None,
367 initial_presence=False,
368 **send_kwargs,
369 ) -> MessageOrPresenceTypeVar:
370 if stanza.get_from().resource:
371 stanza["occupant-id"]["id"] = self.__occupant_id
372 else:
373 stanza["occupant-id"]["id"] = "room"
374 self.__add_nick_element(stanza)
375 if not self.is_user and isinstance(stanza, Presence):
376 if stanza["type"] == "unavailable" and not self._presence_sent:
377 return stanza # type:ignore
378 if initial_presence:
379 self.stored.presence_sent = True
380 else:
381 self._presence_sent = True
382 if full_jid:
383 stanza["to"] = full_jid
384 self.__send_presence_if_needed(stanza, full_jid, archive_only)
385 if self.is_user:
386 assert stanza.stream is not None
387 stanza.stream.send(stanza, use_filters=False)
388 else:
389 stanza.send()
390 else:
391 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
392 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
393 if archive_only:
394 return stanza
395 for user_full_jid in self.muc.user_full_jids():
396 stanza = copy(stanza)
397 stanza["to"] = user_full_jid
398 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
399 stanza.send()
400 return stanza
402 def mucadmin_item(self):
403 item = MUCAdminItem()
404 item["nick"] = self.nickname
405 item["affiliation"] = self.affiliation
406 item["role"] = self.role
407 if not self.muc.is_anonymous:
408 if self.is_user:
409 item["jid"] = self.user_jid.bare
410 elif self.contact:
411 item["jid"] = self.contact.jid.bare
412 else:
413 warnings.warn(
414 (
415 f"Public group but no contact JID associated to {self.jid} in"
416 f" {self}"
417 ),
418 )
419 return item
421 def __add_nick_element(self, stanza: Union[Presence, Message]) -> None:
422 if (nick := self.nickname_no_illegal) != self.jid.resource:
423 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
424 n["nick"] = nick
425 stanza.append(n)
427 def _get_last_presence(self) -> Optional[CachedPresence]:
428 own = super()._get_last_presence()
429 if own is None and self.contact:
430 return self.contact._get_last_presence()
431 return own
433 def send_initial_presence(
434 self,
435 full_jid: JID,
436 nick_change: bool = False,
437 presence_id: Optional[str] = None,
438 ) -> None:
439 """
440 Called when the user joins a MUC, as a mechanism
441 to indicate to the joining XMPP client the list of "participants".
443 Can be called this to trigger a "participant has joined the group" event.
445 :param full_jid: Set this to only send to a specific user XMPP resource.
446 :param nick_change: Used when the user joins and the MUC renames them (code 210)
447 :param presence_id: set the presence ID. used internally by slidge
448 """
449 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
450 codes = set()
451 if nick_change:
452 codes.add(210)
454 if self.is_user:
455 # the "initial presence" of the user has to be vanilla, as it is
456 # a crucial part of the MUC join sequence for XMPP clients.
457 kwargs = {}
458 else:
459 cache = self._get_last_presence()
460 self.log.debug("Join muc, initial presence: %s", cache)
461 if cache:
462 ptype = cache.ptype
463 if ptype == "unavailable":
464 return
465 kwargs = dict(
466 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
467 )
468 else:
469 kwargs = {}
470 p = self._make_presence(
471 status_codes=codes,
472 user_full_jid=full_jid,
473 **kwargs, # type:ignore
474 )
475 if presence_id:
476 p["id"] = presence_id
477 self._send(p, full_jid, initial_presence=True)
479 def leave(self) -> None:
480 """
481 Call this when the participant leaves the room
482 """
483 self.muc.remove_participant(self)
485 def kick(self, reason: str | None = None) -> None:
486 """
487 Call this when the participant is kicked from the room
488 """
489 self.muc.remove_participant(self, kick=True, reason=reason)
491 def ban(self, reason: str | None = None) -> None:
492 """
493 Call this when the participant is banned from the room
494 """
495 self.muc.remove_participant(self, ban=True, reason=reason)
497 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None):
498 if self.contact is not None:
499 return self.contact.get_disco_info()
500 return super().get_disco_info()
502 def moderate(
503 self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None
504 ) -> None:
505 for i in self._legacy_to_xmpp(legacy_msg_id):
506 m = self.muc.get_system_participant()._make_message()
507 m["retract"]["id"] = i
508 if self.is_system:
509 m["retract"].enable("moderated")
510 else:
511 m["retract"]["moderated"]["by"] = self.jid
512 m["retract"]["moderated"]["occupant-id"]["id"] = self.__occupant_id
513 if reason:
514 m["retract"]["reason"] = reason
515 self._send(m)
517 def set_room_subject(
518 self,
519 subject: str,
520 full_jid: Optional[JID] = None,
521 when: Optional[datetime] = None,
522 update_muc: bool = True,
523 ) -> None:
524 if update_muc:
525 self.muc._subject = subject # type: ignore
526 self.muc.subject_setter = self.nickname
527 self.muc.subject_date = when
529 msg = self._make_message()
530 if when is not None:
531 msg["delay"].set_stamp(when)
532 msg["delay"]["from"] = self.muc.jid
533 if subject:
534 msg["subject"] = subject
535 else:
536 # may be simplified if slixmpp lets it do it more easily some day
537 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
538 self._send(msg, full_jid)
540 def set_thread_subject(
541 self,
542 thread: LegacyThreadType,
543 subject: str | None,
544 when: Optional[datetime] = None,
545 ) -> None:
546 msg = self._make_message()
547 msg["thread"] = str(thread)
548 if when is not None:
549 msg["delay"].set_stamp(when)
550 msg["delay"]["from"] = self.muc.jid
551 if subject:
552 msg["subject"] = subject
553 else:
554 # may be simplified if slixmpp lets it do it more easily some day
555 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject"))
556 self._send(msg)
559def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]:
560 nickname = nickname_no_illegal = strip_illegal_chars(nickname)
562 jid = JID(muc_jid)
564 try:
565 jid.resource = nickname
566 except InvalidJID:
567 nickname = nickname.encode("punycode").decode()
568 try:
569 jid.resource = nickname
570 except InvalidJID:
571 # at this point there still might be control chars
572 jid.resource = strip_non_printable(nickname)
574 return nickname_no_illegal, jid
577log = logging.getLogger(__name__)