Coverage for slidge/group/participant.py: 89%
339 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import logging
2import string
3import uuid
4import warnings
5from copy import copy
6from datetime import datetime
7from functools import cached_property
8from typing import TYPE_CHECKING, Any, Optional, Union
10import sqlalchemy as sa
11from slixmpp import JID, InvalidJID, Message, Presence
12from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
13from slixmpp.types import MessageTypes, OptJid
14from sqlalchemy.orm.exc import DetachedInstanceError
16from ..contact import LegacyContact
17from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin
18from ..core.mixins.db import DBMixin
19from ..db.models import Participant
20from ..util import SubclassableOnce, strip_illegal_chars
21from ..util.types import (
22 CachedPresence,
23 Hat,
24 LegacyMessageType,
25 MessageOrPresenceTypeVar,
26 MucAffiliation,
27 MucRole,
28)
30if TYPE_CHECKING:
31 from .room import LegacyMUC
34def strip_non_printable(nickname: str):
35 new = (
36 "".join(x for x in nickname if x in string.printable)
37 + f"-slidge-{hash(nickname)}"
38 )
39 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
40 return new
43class LegacyParticipant(
44 PresenceMixin,
45 MessageMixin,
46 ChatterDiscoMixin,
47 DBMixin,
48 metaclass=SubclassableOnce,
49):
50 """
51 A legacy participant of a legacy group chat.
52 """
54 mtype: MessageTypes = "groupchat"
55 _can_send_carbon = False
56 USE_STANZA_ID = True
57 STRIP_SHORT_DELAY = False
58 stored: Participant
59 contact: LegacyContact[Any] | None
61 def __init__(
62 self,
63 muc: "LegacyMUC",
64 stored: Participant,
65 is_system: bool = False,
66 contact: LegacyContact[Any] | None = None,
67 ) -> None:
68 self.muc = muc
69 self.session = muc.session
70 self.xmpp = muc.session.xmpp
71 self.is_system = is_system
73 if contact is None and stored.contact is not None:
74 contact = self.session.contacts.from_store(stored=stored.contact)
75 if contact is not None and stored.contact is None:
76 stored.contact = contact.stored
78 self.stored = stored
79 self.contact = contact
81 super().__init__()
83 if stored.resource is None:
84 self.__update_resource(stored.nickname)
86 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
88 @property
89 def is_user(self) -> bool:
90 try:
91 return self.stored.is_user
92 except DetachedInstanceError:
93 self.merge()
94 return self.stored.is_user
96 @property
97 def jid(self) -> JID:
98 jid = JID(self.muc.jid)
99 if self.stored.resource:
100 jid.resource = self.stored.resource
101 return jid
103 @jid.setter
104 def jid(self, x: JID):
105 # FIXME: without this, mypy yields
106 # "Cannot override writeable attribute with read-only property"
107 # But it does not happen for LegacyContact. WTF?
108 raise RuntimeError
110 def commit(self, *args, **kwargs) -> None:
111 if self.is_system:
112 return
113 if self.muc.get_lock("fill participants") or self.muc.get_lock("fill history"):
114 return
115 super().commit(*args, **kwargs)
117 @property
118 def user_jid(self):
119 return self.session.user_jid
121 def __repr__(self) -> str:
122 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
124 @property
125 def _presence_sent(self) -> bool:
126 # we track if we already sent a presence for this participant.
127 # if we didn't, we send it before the first message.
128 # this way, event in plugins that don't map "user has joined" events,
129 # we send a "join"-presence from the participant before the first message
130 return self.stored.presence_sent
132 @_presence_sent.setter
133 def _presence_sent(self, val: bool) -> None:
134 if self._presence_sent == val:
135 return
136 self.stored.presence_sent = val
137 self.commit(merge=True)
139 @property
140 def nickname_no_illegal(self) -> str:
141 return self.stored.nickname_no_illegal
143 @property
144 def affiliation(self):
145 return self.stored.affiliation
147 @affiliation.setter
148 def affiliation(self, affiliation: MucAffiliation) -> None:
149 if self.affiliation == affiliation:
150 return
151 self.stored.affiliation = affiliation
152 if not self.muc.participants_filled:
153 return
154 self.commit()
155 if not self._presence_sent:
156 return
157 self.send_last_presence(force=True, no_cache_online=True)
159 def send_affiliation_change(self) -> None:
160 # internal use by slidge
161 msg = self._make_message()
162 msg["muc"]["affiliation"] = self.affiliation
163 msg["type"] = "normal"
164 if not self.muc.is_anonymous and not self.is_system:
165 if self.contact:
166 msg["muc"]["jid"] = self.contact.jid
167 else:
168 warnings.warn(
169 f"Private group but no 1:1 JID associated to '{self}'",
170 )
171 self._send(msg)
173 @property
174 def role(self):
175 return self.stored.role
177 @role.setter
178 def role(self, role: MucRole) -> None:
179 if self.role == role:
180 return
181 self.stored.role = role
182 if not self.muc.participants_filled:
183 return
184 self.commit()
185 if not self._presence_sent:
186 return
187 self.send_last_presence(force=True, no_cache_online=True)
189 @property
190 def hats(self) -> list[Hat]:
191 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else []
193 def set_hats(self, hats: list[Hat]) -> None:
194 if self.hats == hats:
195 return
196 self.stored.hats = hats # type:ignore[assignment]
197 if not self.muc.participants_filled:
198 return
199 self.commit(merge=True)
200 if not self._presence_sent:
201 return
202 self.send_last_presence(force=True, no_cache_online=True)
204 def __update_resource(self, unescaped_nickname: Optional[str]) -> None:
205 if not unescaped_nickname:
206 self.stored.resource = ""
207 if self.is_system:
208 self.stored.nickname_no_illegal = ""
209 else:
210 warnings.warn(
211 "Only the system participant is allowed to not have a nickname"
212 )
213 nickname = f"unnamed-{uuid.uuid4()}"
214 self.stored.resource = self.stored.nickname_no_illegal = nickname
215 return
217 self.stored.nickname_no_illegal, jid = escape_nickname(
218 self.muc.jid,
219 unescaped_nickname,
220 )
221 self.stored.resource = jid.resource
223 def send_configuration_change(self, codes: tuple[int]):
224 if not self.is_system:
225 raise RuntimeError("This is only possible for the system participant")
226 msg = self._make_message()
227 msg["muc"]["status_codes"] = codes
228 self._send(msg)
230 @property
231 def nickname(self):
232 return self.stored.nickname
234 @nickname.setter
235 def nickname(self, new_nickname: str) -> None:
236 old = self.nickname
237 if new_nickname == old:
238 return
240 cache = getattr(self, "_last_presence", None)
241 if cache:
242 last_seen = cache.last_seen
243 kwargs = cache.presence_kwargs
244 else:
245 last_seen = None
246 kwargs = {}
248 kwargs["status_codes"] = {303}
250 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
251 # in this order so pfrom=old resource and we actually use the escaped nick
252 # in the muc/item/nick element
253 self.__update_resource(new_nickname)
254 p["muc"]["item"]["nick"] = self.jid.resource
255 self._send(p)
257 self.stored.nickname = new_nickname
258 self.commit()
259 kwargs["status_codes"] = set()
260 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
261 self._send(p)
263 def _make_presence(
264 self,
265 *,
266 last_seen: Optional[datetime] = None,
267 status_codes: Optional[set[int]] = None,
268 user_full_jid: Optional[JID] = None,
269 **presence_kwargs,
270 ):
271 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
272 p["muc"]["affiliation"] = self.affiliation
273 p["muc"]["role"] = self.role
274 if self.hats:
275 p["hats"].add_hats(self.hats)
276 codes = status_codes or set()
277 if self.is_user:
278 codes.add(110)
279 if not self.muc.is_anonymous and not self.is_system:
280 if self.is_user:
281 if user_full_jid:
282 p["muc"]["jid"] = user_full_jid
283 else:
284 jid = JID(self.user_jid)
285 try:
286 jid.resource = next(iter(self.muc.get_user_resources()))
287 except StopIteration:
288 jid.resource = "pseudo-resource"
289 p["muc"]["jid"] = self.user_jid
290 codes.add(100)
291 elif self.contact:
292 p["muc"]["jid"] = self.contact.jid
293 if a := self.contact.get_avatar():
294 p["vcard_temp_update"]["photo"] = a.id
295 else:
296 warnings.warn(
297 f"Private group but no 1:1 JID associated to '{self}'",
298 )
299 if self.is_user and (hash_ := self.session.user.avatar_hash):
300 p["vcard_temp_update"]["photo"] = hash_
301 p["muc"]["status_codes"] = codes
302 return p
304 @property
305 def DISCO_NAME(self):
306 return self.nickname
308 def __send_presence_if_needed(
309 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool
310 ) -> None:
311 if (
312 archive_only
313 or self.is_system
314 or self.is_user
315 or self._presence_sent
316 or stanza["subject"]
317 ):
318 return
319 if isinstance(stanza, Message):
320 if stanza.get_plugin("muc", check=True):
321 return
322 self.send_initial_presence(full_jid)
324 @cached_property
325 def __occupant_id(self):
326 if self.contact:
327 return self.contact.jid
328 elif self.is_user:
329 return "slidge-user"
330 elif self.is_system:
331 return "room"
332 else:
333 return str(uuid.uuid4())
335 def _send(
336 self,
337 stanza: MessageOrPresenceTypeVar,
338 full_jid: Optional[JID] = None,
339 archive_only: bool = False,
340 legacy_msg_id=None,
341 initial_presence=False,
342 **send_kwargs,
343 ) -> MessageOrPresenceTypeVar:
344 if stanza.get_from().resource:
345 stanza["occupant-id"]["id"] = self.__occupant_id
346 else:
347 stanza["occupant-id"]["id"] = "room"
348 self.__add_nick_element(stanza)
349 if not self.is_user and isinstance(stanza, Presence):
350 if stanza["type"] == "unavailable" and not self._presence_sent:
351 return stanza # type:ignore
352 if initial_presence:
353 self.stored.presence_sent = True
354 else:
355 self._presence_sent = True
356 if full_jid:
357 stanza["to"] = full_jid
358 self.__send_presence_if_needed(stanza, full_jid, archive_only)
359 if self.is_user:
360 assert stanza.stream is not None
361 stanza.stream.send(stanza, use_filters=False)
362 else:
363 stanza.send()
364 else:
365 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
366 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
367 if archive_only:
368 return stanza
369 for user_full_jid in self.muc.user_full_jids():
370 stanza = copy(stanza)
371 stanza["to"] = user_full_jid
372 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
373 stanza.send()
374 return stanza
376 def mucadmin_item(self):
377 item = MUCAdminItem()
378 item["nick"] = self.nickname
379 item["affiliation"] = self.affiliation
380 item["role"] = self.role
381 if not self.muc.is_anonymous:
382 if self.is_user:
383 item["jid"] = self.user_jid.bare
384 elif self.contact:
385 item["jid"] = self.contact.jid.bare
386 else:
387 warnings.warn(
388 (
389 f"Public group but no contact JID associated to {self.jid} in"
390 f" {self}"
391 ),
392 )
393 return item
395 def __add_nick_element(self, stanza: Union[Presence, Message]) -> None:
396 if (nick := self.nickname_no_illegal) != self.jid.resource:
397 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
398 n["nick"] = nick
399 stanza.append(n)
401 def _get_last_presence(self) -> Optional[CachedPresence]:
402 own = super()._get_last_presence()
403 if own is None and self.contact:
404 return self.contact._get_last_presence()
405 return own
407 def send_initial_presence(
408 self,
409 full_jid: JID,
410 nick_change: bool = False,
411 presence_id: Optional[str] = None,
412 ) -> None:
413 """
414 Called when the user joins a MUC, as a mechanism
415 to indicate to the joining XMPP client the list of "participants".
417 Can be called this to trigger a "participant has joined the group" event.
419 :param full_jid: Set this to only send to a specific user XMPP resource.
420 :param nick_change: Used when the user joins and the MUC renames them (code 210)
421 :param presence_id: set the presence ID. used internally by slidge
422 """
423 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
424 codes = set()
425 if nick_change:
426 codes.add(210)
428 if self.is_user:
429 # the "initial presence" of the user has to be vanilla, as it is
430 # a crucial part of the MUC join sequence for XMPP clients.
431 kwargs = {}
432 else:
433 cache = self._get_last_presence()
434 self.log.debug("Join muc, initial presence: %s", cache)
435 if cache:
436 ptype = cache.ptype
437 if ptype == "unavailable":
438 return
439 kwargs = dict(
440 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
441 )
442 else:
443 kwargs = {}
444 p = self._make_presence(
445 status_codes=codes,
446 user_full_jid=full_jid,
447 **kwargs, # type:ignore
448 )
449 if presence_id:
450 p["id"] = presence_id
451 self._send(p, full_jid, initial_presence=True)
453 def leave(self) -> None:
454 """
455 Call this when the participant leaves the room
456 """
457 self.muc.remove_participant(self)
459 def kick(self, reason: str | None = None) -> None:
460 """
461 Call this when the participant is kicked from the room
462 """
463 self.muc.remove_participant(self, kick=True, reason=reason)
465 def ban(self, reason: str | None = None) -> None:
466 """
467 Call this when the participant is banned from the room
468 """
469 self.muc.remove_participant(self, ban=True, reason=reason)
471 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None):
472 if self.contact is not None:
473 return self.contact.get_disco_info()
474 return super().get_disco_info()
476 def moderate(
477 self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None
478 ) -> None:
479 xmpp_id = self._legacy_to_xmpp(legacy_msg_id)
480 with self.xmpp.store.session() as orm:
481 msg_ids = self.xmpp.store.id_map.get_xmpp(
482 orm, self.muc.stored.id, str(legacy_msg_id), True
483 )
485 for i in set(msg_ids + [xmpp_id]):
486 m = self.muc.get_system_participant()._make_message()
487 m["retract"]["id"] = i
488 if self.is_system:
489 m["retract"].enable("moderated")
490 else:
491 m["retract"]["moderated"]["by"] = self.jid
492 m["retract"]["moderated"]["occupant-id"]["id"] = self.__occupant_id
493 if reason:
494 m["retract"]["reason"] = reason
495 self._send(m)
497 def set_room_subject(
498 self,
499 subject: str,
500 full_jid: Optional[JID] = None,
501 when: Optional[datetime] = None,
502 update_muc: bool = True,
503 ) -> None:
504 if update_muc:
505 self.muc._subject = subject # type: ignore
506 self.muc.subject_setter = self.nickname
507 self.muc.subject_date = when
509 msg = self._make_message()
510 if when is not None:
511 msg["delay"].set_stamp(when)
512 msg["delay"]["from"] = self.muc.jid
513 msg["subject"] = subject or str(self.muc.name)
514 self._send(msg, full_jid)
517def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]:
518 nickname = nickname_no_illegal = strip_illegal_chars(nickname)
520 jid = JID(muc_jid)
522 try:
523 jid.resource = nickname
524 except InvalidJID:
525 nickname = nickname.encode("punycode").decode()
526 try:
527 jid.resource = nickname
528 except InvalidJID:
529 # at this point there still might be control chars
530 jid.resource = strip_non_printable(nickname)
532 return nickname_no_illegal, jid
535log = logging.getLogger(__name__)