Coverage for slidge/group/participant.py: 90%
325 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-10 09:11 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-10 09:11 +0000
1import logging
2import string
3import stringprep
4import uuid
5import warnings
6from copy import copy
7from datetime import datetime
8from functools import cached_property
9from typing import TYPE_CHECKING, Optional, Self, Union
11from slixmpp import JID, InvalidJID, Message, Presence
12from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
13from slixmpp.stringprep import StringprepError, resourceprep
14from slixmpp.types import MessageTypes, OptJid
15from slixmpp.util.stringprep_profiles import StringPrepError, prohibit_output
17from ..contact import LegacyContact
18from ..core.mixins import (
19 ChatterDiscoMixin,
20 MessageMixin,
21 PresenceMixin,
22 StoredAttributeMixin,
23)
24from ..db.models import Participant
25from ..util import SubclassableOnce, strip_illegal_chars
26from ..util.types import (
27 CachedPresence,
28 Hat,
29 LegacyMessageType,
30 MessageOrPresenceTypeVar,
31 MucAffiliation,
32 MucRole,
33)
35if TYPE_CHECKING:
36 from .room import LegacyMUC
39def strip_non_printable(nickname: str):
40 new = (
41 "".join(x for x in nickname if x in string.printable)
42 + f"-slidge-{hash(nickname)}"
43 )
44 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
45 return new
48class LegacyParticipant(
49 StoredAttributeMixin,
50 PresenceMixin,
51 MessageMixin,
52 ChatterDiscoMixin,
53 metaclass=SubclassableOnce,
54):
55 """
56 A legacy participant of a legacy group chat.
57 """
59 mtype: MessageTypes = "groupchat"
60 _can_send_carbon = False
61 USE_STANZA_ID = True
62 STRIP_SHORT_DELAY = False
63 pk: int
65 def __init__(
66 self,
67 muc: "LegacyMUC",
68 nickname: Optional[str] = None,
69 is_user=False,
70 is_system=False,
71 role: MucRole = "participant",
72 affiliation: MucAffiliation = "member",
73 ):
74 self.session = session = muc.session
75 self.xmpp = session.xmpp
76 super().__init__()
77 self._hats = list[Hat]()
78 self.muc = muc
79 self._role = role
80 self._affiliation = affiliation
81 self.is_user: bool = is_user
82 self.is_system: bool = is_system
84 self._nickname = nickname
86 self.__update_jid(nickname)
87 log.debug("Instantiation of: %r", self)
89 self.contact: Optional["LegacyContact"] = None
90 # we track if we already sent a presence for this participant.
91 # if we didn't, we send it before the first message.
92 # this way, event in plugins that don't map "user has joined" events,
93 # we send a "join"-presence from the participant before the first message
94 self._presence_sent: bool = False
95 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
96 self.__part_store = self.xmpp.store.participants
98 @property
99 def contact_pk(self) -> Optional[int]: # type:ignore
100 if self.contact:
101 return self.contact.contact_pk
102 return None
104 @property
105 def user_jid(self):
106 return self.session.user_jid
108 def __repr__(self):
109 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
111 @property
112 def affiliation(self):
113 return self._affiliation
115 @affiliation.setter
116 def affiliation(self, affiliation: MucAffiliation):
117 if self._affiliation == affiliation:
118 return
119 self._affiliation = affiliation
120 if not self.muc._participants_filled:
121 return
122 self.__part_store.set_affiliation(self.pk, affiliation)
123 if not self._presence_sent:
124 return
125 self.send_last_presence(force=True, no_cache_online=True)
127 def send_affiliation_change(self):
128 # internal use by slidge
129 msg = self._make_message()
130 msg["muc"]["affiliation"] = self._affiliation
131 msg["type"] = "normal"
132 if not self.muc.is_anonymous and not self.is_system:
133 if self.contact:
134 msg["muc"]["jid"] = self.contact.jid
135 else:
136 warnings.warn(
137 f"Private group but no 1:1 JID associated to '{self}'",
138 )
139 self._send(msg)
141 @property
142 def role(self):
143 return self._role
145 @role.setter
146 def role(self, role: MucRole):
147 if self._role == role:
148 return
149 self._role = role
150 if not self.muc._participants_filled:
151 return
152 self.__part_store.set_role(self.pk, role)
153 if not self._presence_sent:
154 return
155 self.send_last_presence(force=True, no_cache_online=True)
157 def set_hats(self, hats: list[Hat]):
158 if self._hats == hats:
159 return
160 self._hats = hats
161 if not self.muc._participants_filled:
162 return
163 self.__part_store.set_hats(self.pk, hats)
164 if not self._presence_sent:
165 return
166 self.send_last_presence(force=True, no_cache_online=True)
168 def __update_jid(self, unescaped_nickname: Optional[str]):
169 j: JID = copy(self.muc.jid)
171 if self.is_system:
172 self.jid = j
173 self._nickname_no_illegal = ""
174 return
176 nickname = unescaped_nickname
178 if nickname:
179 nickname = self._nickname_no_illegal = strip_illegal_chars(nickname)
180 else:
181 warnings.warn(
182 "Only the system participant is allowed to not have a nickname"
183 )
184 nickname = f"unnamed-{uuid.uuid4()}"
186 assert isinstance(nickname, str)
188 try:
189 # workaround for https://codeberg.org/poezio/slixmpp/issues/3480
190 prohibit_output(nickname, [stringprep.in_table_a1])
191 resourceprep(nickname)
192 except (StringPrepError, StringprepError):
193 nickname = nickname.encode("punycode").decode()
195 # at this point there still might be control chars
196 try:
197 j.resource = nickname
198 except InvalidJID:
199 j.resource = strip_non_printable(nickname)
201 self.jid = j
203 def send_configuration_change(self, codes: tuple[int]):
204 if not self.is_system:
205 raise RuntimeError("This is only possible for the system participant")
206 msg = self._make_message()
207 msg["muc"]["status_codes"] = codes
208 self._send(msg)
210 @property
211 def nickname(self):
212 return self._nickname
214 @nickname.setter
215 def nickname(self, new_nickname: str):
216 old = self._nickname
217 if new_nickname == old:
218 return
220 cache = getattr(self, "_last_presence", None)
221 if cache:
222 last_seen = cache.last_seen
223 kwargs = cache.presence_kwargs
224 else:
225 last_seen = None
226 kwargs = {}
228 kwargs["status_codes"] = {303}
230 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
231 # in this order so pfrom=old resource and we actually use the escaped nick
232 # in the muc/item/nick element
233 self.__update_jid(new_nickname)
234 p["muc"]["item"]["nick"] = self.jid.resource
235 self._send(p)
237 self._nickname = new_nickname
239 kwargs["status_codes"] = set()
240 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
241 self._send(p)
243 def _make_presence(
244 self,
245 *,
246 last_seen: Optional[datetime] = None,
247 status_codes: Optional[set[int]] = None,
248 user_full_jid: Optional[JID] = None,
249 **presence_kwargs,
250 ):
251 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
252 p["muc"]["affiliation"] = self.affiliation
253 p["muc"]["role"] = self.role
254 if self._hats:
255 p["hats"].add_hats(self._hats)
256 codes = status_codes or set()
257 if self.is_user:
258 codes.add(110)
259 if not self.muc.is_anonymous and not self.is_system:
260 if self.is_user:
261 if user_full_jid:
262 p["muc"]["jid"] = user_full_jid
263 else:
264 jid = copy(self.user_jid)
265 try:
266 jid.resource = next(
267 iter(self.muc.get_user_resources()) # type:ignore
268 )
269 except StopIteration:
270 jid.resource = "pseudo-resource"
271 p["muc"]["jid"] = self.user_jid
272 codes.add(100)
273 elif self.contact:
274 p["muc"]["jid"] = self.contact.jid
275 if a := self.contact.get_avatar():
276 p["vcard_temp_update"]["photo"] = a.id
277 else:
278 warnings.warn(
279 f"Private group but no 1:1 JID associated to '{self}'",
280 )
281 if self.is_user and (hash_ := self.session.user.avatar_hash):
282 p["vcard_temp_update"]["photo"] = hash_
283 p["muc"]["status_codes"] = codes
284 return p
286 @property
287 def DISCO_NAME(self):
288 return self.nickname
290 def __send_presence_if_needed(
291 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool
292 ):
293 if (
294 archive_only
295 or self.is_system
296 or self.is_user
297 or self._presence_sent
298 or stanza["subject"]
299 ):
300 return
301 if isinstance(stanza, Message):
302 if stanza.get_plugin("muc", check=True):
303 return
304 self.send_initial_presence(full_jid)
306 @cached_property
307 def __occupant_id(self):
308 if self.contact:
309 return self.contact.jid
310 elif self.is_user:
311 return "slidge-user"
312 elif self.is_system:
313 return "room"
314 else:
315 return str(uuid.uuid4())
317 def _send(
318 self,
319 stanza: MessageOrPresenceTypeVar,
320 full_jid: Optional[JID] = None,
321 archive_only=False,
322 legacy_msg_id=None,
323 **send_kwargs,
324 ) -> MessageOrPresenceTypeVar:
325 if stanza.get_from().resource:
326 stanza["occupant-id"]["id"] = self.__occupant_id
327 else:
328 stanza["occupant-id"]["id"] = "room"
329 self.__add_nick_element(stanza)
330 if not self.is_user and isinstance(stanza, Presence):
331 if stanza["type"] == "unavailable" and not self._presence_sent:
332 return stanza # type:ignore
333 self._presence_sent = True
334 self.__part_store.set_presence_sent(self.pk)
335 if full_jid:
336 stanza["to"] = full_jid
337 self.__send_presence_if_needed(stanza, full_jid, archive_only)
338 if self.is_user:
339 assert stanza.stream is not None
340 stanza.stream.send(stanza, use_filters=False)
341 else:
342 stanza.send()
343 else:
344 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
345 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
346 if archive_only:
347 return stanza
348 for user_full_jid in self.muc.user_full_jids():
349 stanza = copy(stanza)
350 stanza["to"] = user_full_jid
351 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
352 stanza.send()
353 return stanza
355 def mucadmin_item(self):
356 item = MUCAdminItem()
357 item["nick"] = self.nickname
358 item["affiliation"] = self.affiliation
359 item["role"] = self.role
360 if not self.muc.is_anonymous:
361 if self.is_user:
362 item["jid"] = self.user_jid.bare
363 elif self.contact:
364 item["jid"] = self.contact.jid.bare
365 else:
366 warnings.warn(
367 (
368 f"Public group but no contact JID associated to {self.jid} in"
369 f" {self}"
370 ),
371 )
372 return item
374 def __add_nick_element(self, stanza: Union[Presence, Message]):
375 if (nick := self._nickname_no_illegal) != self.jid.resource:
376 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
377 n["nick"] = nick
378 stanza.append(n)
380 def _get_last_presence(self) -> Optional[CachedPresence]:
381 own = super()._get_last_presence()
382 if own is None and self.contact:
383 return self.contact._get_last_presence()
384 return own
386 def send_initial_presence(
387 self,
388 full_jid: JID,
389 nick_change=False,
390 presence_id: Optional[str] = None,
391 ):
392 """
393 Called when the user joins a MUC, as a mechanism
394 to indicate to the joining XMPP client the list of "participants".
396 Can be called this to trigger a "participant has joined the group" event.
398 :param full_jid: Set this to only send to a specific user XMPP resource.
399 :param nick_change: Used when the user joins and the MUC renames them (code 210)
400 :param presence_id: set the presence ID. used internally by slidge
401 """
402 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
403 codes = set()
404 if nick_change:
405 codes.add(210)
407 if self.is_user:
408 # the "initial presence" of the user has to be vanilla, as it is
409 # a crucial part of the MUC join sequence for XMPP clients.
410 kwargs = {}
411 else:
412 cache = self._get_last_presence()
413 self.log.debug("Join muc, initial presence: %s", cache)
414 if cache:
415 ptype = cache.ptype
416 if ptype == "unavailable":
417 return
418 kwargs = dict(
419 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
420 )
421 else:
422 kwargs = {}
423 p = self._make_presence(
424 status_codes=codes,
425 user_full_jid=full_jid,
426 **kwargs, # type:ignore
427 )
428 if presence_id:
429 p["id"] = presence_id
430 self._send(p, full_jid)
432 def leave(self):
433 """
434 Call this when the participant leaves the room
435 """
436 self.muc.remove_participant(self)
438 def kick(self, reason: str | None = None):
439 """
440 Call this when the participant is kicked from the room
441 """
442 self.muc.remove_participant(self, kick=True, reason=reason)
444 def ban(self, reason: str | None = None):
445 """
446 Call this when the participant is banned from the room
447 """
448 self.muc.remove_participant(self, ban=True, reason=reason)
450 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None):
451 if self.contact is not None:
452 return self.contact.get_disco_info()
453 return super().get_disco_info()
455 def moderate(self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None):
456 xmpp_id = self._legacy_to_xmpp(legacy_msg_id)
457 multi = self.xmpp.store.multi.get_xmpp_ids(self.session.user_pk, xmpp_id)
458 if multi is None:
459 msg_ids = [xmpp_id]
460 else:
461 msg_ids = multi + [xmpp_id]
463 for i in msg_ids:
464 m = self.muc.get_system_participant()._make_message()
465 m["retract"]["id"] = i
466 if self.is_system:
467 m["retract"].enable("moderated")
468 else:
469 m["retract"]["moderated"]["by"] = self.jid
470 m["retract"]["moderated"]["occupant-id"]["id"] = self.__occupant_id
471 if reason:
472 m["retract"]["reason"] = reason
473 self._send(m)
475 def set_room_subject(
476 self,
477 subject: str,
478 full_jid: Optional[JID] = None,
479 when: Optional[datetime] = None,
480 update_muc=True,
481 ):
482 if update_muc:
483 self.muc._subject = subject # type: ignore
484 self.muc.subject_setter = self.nickname
485 self.muc.subject_date = when
487 msg = self._make_message()
488 if when is not None:
489 msg["delay"].set_stamp(when)
490 msg["delay"]["from"] = self.muc.jid
491 msg["subject"] = subject or str(self.muc.name)
492 self._send(msg, full_jid)
494 @classmethod
495 def from_store(
496 cls,
497 session,
498 stored: Participant,
499 contact: Optional[LegacyContact] = None,
500 muc: Optional["LegacyMUC"] = None,
501 ) -> Self:
502 from slidge.group.room import LegacyMUC
504 if muc is None:
505 muc = LegacyMUC.get_self_or_unique_subclass().from_store(
506 session, stored.room
507 )
508 part = cls(
509 muc,
510 stored.nickname,
511 role=stored.role,
512 affiliation=stored.affiliation,
513 )
514 part.pk = stored.id
515 if contact is not None:
516 part.contact = contact
517 elif stored.contact is not None:
518 contact = LegacyContact.get_self_or_unique_subclass().from_store(
519 session, stored.contact
520 )
521 part.contact = contact
523 part.is_user = stored.is_user
524 if (data := stored.extra_attributes) is not None:
525 muc.deserialize_extra_attributes(data)
526 part._presence_sent = stored.presence_sent
527 part._hats = [Hat(h.uri, h.title) for h in stored.hats]
528 return part
531log = logging.getLogger(__name__)