Coverage for slidge/group/participant.py: 90%
321 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1import logging
2import string
3import stringprep
4import uuid
5import warnings
6from copy import copy
7from datetime import datetime
8from functools import cached_property
9from typing import TYPE_CHECKING, Optional, Self, Union
11from slixmpp import JID, InvalidJID, Message, Presence
12from slixmpp.plugins.xep_0045.stanza import MUCAdminItem
13from slixmpp.stringprep import StringprepError, resourceprep
14from slixmpp.types import MessageTypes, OptJid
15from slixmpp.util.stringprep_profiles import StringPrepError, prohibit_output
17from ..contact import LegacyContact
18from ..core.mixins import (
19 ChatterDiscoMixin,
20 MessageMixin,
21 PresenceMixin,
22 StoredAttributeMixin,
23)
24from ..db.models import Participant
25from ..util import SubclassableOnce, strip_illegal_chars
26from ..util.types import (
27 CachedPresence,
28 Hat,
29 LegacyMessageType,
30 MessageOrPresenceTypeVar,
31 MucAffiliation,
32 MucRole,
33)
35if TYPE_CHECKING:
36 from .room import LegacyMUC
39def strip_non_printable(nickname: str):
40 new = (
41 "".join(x for x in nickname if x in string.printable)
42 + f"-slidge-{hash(nickname)}"
43 )
44 warnings.warn(f"Could not use {nickname} as a nickname, using {new}")
45 return new
48class LegacyParticipant(
49 StoredAttributeMixin,
50 PresenceMixin,
51 MessageMixin,
52 ChatterDiscoMixin,
53 metaclass=SubclassableOnce,
54):
55 """
56 A legacy participant of a legacy group chat.
57 """
59 mtype: MessageTypes = "groupchat"
60 _can_send_carbon = False
61 USE_STANZA_ID = True
62 STRIP_SHORT_DELAY = False
63 pk: int
65 def __init__(
66 self,
67 muc: "LegacyMUC",
68 nickname: Optional[str] = None,
69 is_user=False,
70 is_system=False,
71 role: MucRole = "participant",
72 affiliation: MucAffiliation = "member",
73 ):
74 self.session = session = muc.session
75 self.xmpp = session.xmpp
76 super().__init__()
77 self._hats = list[Hat]()
78 self.muc = muc
79 self._role = role
80 self._affiliation = affiliation
81 self.is_user: bool = is_user
82 self.is_system: bool = is_system
84 self._nickname = nickname
86 self.__update_jid(nickname)
87 log.debug("Instantiation of: %r", self)
89 self.contact: Optional["LegacyContact"] = None
90 # we track if we already sent a presence for this participant.
91 # if we didn't, we send it before the first message.
92 # this way, event in plugins that don't map "user has joined" events,
93 # we send a "join"-presence from the participant before the first message
94 self._presence_sent: bool = False
95 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}")
96 self.__part_store = self.xmpp.store.participants
98 @property
99 def contact_pk(self) -> Optional[int]: # type:ignore
100 if self.contact:
101 return self.contact.contact_pk
102 return None
104 @property
105 def user_jid(self):
106 return self.session.user_jid
108 def __repr__(self):
109 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>"
111 @property
112 def affiliation(self):
113 return self._affiliation
115 @affiliation.setter
116 def affiliation(self, affiliation: MucAffiliation):
117 if self._affiliation == affiliation:
118 return
119 self._affiliation = affiliation
120 if not self.muc._participants_filled:
121 return
122 self.__part_store.set_affiliation(self.pk, affiliation)
123 if not self._presence_sent:
124 return
125 self.send_last_presence(force=True, no_cache_online=True)
127 def send_affiliation_change(self):
128 # internal use by slidge
129 msg = self._make_message()
130 msg["muc"]["affiliation"] = self._affiliation
131 msg["type"] = "normal"
132 if not self.muc.is_anonymous and not self.is_system:
133 if self.contact:
134 msg["muc"]["jid"] = self.contact.jid
135 else:
136 warnings.warn(
137 f"Private group but no 1:1 JID associated to '{self}'",
138 )
139 self._send(msg)
141 @property
142 def role(self):
143 return self._role
145 @role.setter
146 def role(self, role: MucRole):
147 if self._role == role:
148 return
149 self._role = role
150 if not self.muc._participants_filled:
151 return
152 self.__part_store.set_role(self.pk, role)
153 if not self._presence_sent:
154 return
155 self.send_last_presence(force=True, no_cache_online=True)
157 def set_hats(self, hats: list[Hat]):
158 if self._hats == hats:
159 return
160 self._hats = hats
161 if not self.muc._participants_filled:
162 return
163 self.__part_store.set_hats(self.pk, hats)
164 if not self._presence_sent:
165 return
166 self.send_last_presence(force=True, no_cache_online=True)
168 def __update_jid(self, unescaped_nickname: Optional[str]):
169 j: JID = copy(self.muc.jid)
171 if self.is_system:
172 self.jid = j
173 self._nickname_no_illegal = ""
174 return
176 nickname = unescaped_nickname
178 if nickname:
179 nickname = self._nickname_no_illegal = strip_illegal_chars(nickname)
180 else:
181 warnings.warn(
182 "Only the system participant is allowed to not have a nickname"
183 )
184 nickname = f"unnamed-{uuid.uuid4()}"
186 assert isinstance(nickname, str)
188 try:
189 # workaround for https://codeberg.org/poezio/slixmpp/issues/3480
190 prohibit_output(nickname, [stringprep.in_table_a1])
191 resourceprep(nickname)
192 except (StringPrepError, StringprepError):
193 nickname = nickname.encode("punycode").decode()
195 # at this point there still might be control chars
196 try:
197 j.resource = nickname
198 except InvalidJID:
199 j.resource = strip_non_printable(nickname)
201 self.jid = j
203 def send_configuration_change(self, codes: tuple[int]):
204 if not self.is_system:
205 raise RuntimeError("This is only possible for the system participant")
206 msg = self._make_message()
207 msg["muc"]["status_codes"] = codes
208 self._send(msg)
210 @property
211 def nickname(self):
212 return self._nickname
214 @nickname.setter
215 def nickname(self, new_nickname: str):
216 old = self._nickname
217 if new_nickname == old:
218 return
220 cache = getattr(self, "_last_presence", None)
221 if cache:
222 last_seen = cache.last_seen
223 kwargs = cache.presence_kwargs
224 else:
225 last_seen = None
226 kwargs = {}
228 kwargs["status_codes"] = {303}
230 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs)
231 # in this order so pfrom=old resource and we actually use the escaped nick
232 # in the muc/item/nick element
233 self.__update_jid(new_nickname)
234 p["muc"]["item"]["nick"] = self.jid.resource
235 self._send(p)
237 self._nickname = new_nickname
239 kwargs["status_codes"] = set()
240 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs)
241 self._send(p)
243 def _make_presence(
244 self,
245 *,
246 last_seen: Optional[datetime] = None,
247 status_codes: Optional[set[int]] = None,
248 user_full_jid: Optional[JID] = None,
249 **presence_kwargs,
250 ):
251 p = super()._make_presence(last_seen=last_seen, **presence_kwargs)
252 p["muc"]["affiliation"] = self.affiliation
253 p["muc"]["role"] = self.role
254 if self._hats:
255 p["hats"].add_hats(self._hats)
256 codes = status_codes or set()
257 if self.is_user:
258 codes.add(110)
259 if not self.muc.is_anonymous and not self.is_system:
260 if self.is_user:
261 if user_full_jid:
262 p["muc"]["jid"] = user_full_jid
263 else:
264 jid = copy(self.user_jid)
265 try:
266 jid.resource = next(
267 iter(self.muc.get_user_resources()) # type:ignore
268 )
269 except StopIteration:
270 jid.resource = "pseudo-resource"
271 p["muc"]["jid"] = self.user_jid
272 codes.add(100)
273 elif self.contact:
274 p["muc"]["jid"] = self.contact.jid
275 if a := self.contact.get_avatar():
276 p["vcard_temp_update"]["photo"] = a.id
277 else:
278 warnings.warn(
279 f"Private group but no 1:1 JID associated to '{self}'",
280 )
281 if self.is_user and (hash_ := self.session.user.avatar_hash):
282 p["vcard_temp_update"]["photo"] = hash_
283 p["muc"]["status_codes"] = codes
284 return p
286 @property
287 def DISCO_NAME(self):
288 return self.nickname
290 def __send_presence_if_needed(
291 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool
292 ):
293 if (
294 archive_only
295 or self.is_system
296 or self.is_user
297 or self._presence_sent
298 or stanza["subject"]
299 ):
300 return
301 if isinstance(stanza, Message):
302 if stanza.get_plugin("muc", check=True):
303 return
304 self.send_initial_presence(full_jid)
306 @cached_property
307 def __occupant_id(self):
308 if self.contact:
309 return self.contact.jid
310 elif self.is_user:
311 return "slidge-user"
312 elif self.is_system:
313 return "room"
314 else:
315 return str(uuid.uuid4())
317 def _send(
318 self,
319 stanza: MessageOrPresenceTypeVar,
320 full_jid: Optional[JID] = None,
321 archive_only=False,
322 legacy_msg_id=None,
323 **send_kwargs,
324 ) -> MessageOrPresenceTypeVar:
325 stanza["occupant-id"]["id"] = self.__occupant_id
326 self.__add_nick_element(stanza)
327 if not self.is_user and isinstance(stanza, Presence):
328 if stanza["type"] == "unavailable" and not self._presence_sent:
329 return stanza # type:ignore
330 self._presence_sent = True
331 self.__part_store.set_presence_sent(self.pk)
332 if full_jid:
333 stanza["to"] = full_jid
334 self.__send_presence_if_needed(stanza, full_jid, archive_only)
335 if self.is_user:
336 assert stanza.stream is not None
337 stanza.stream.send(stanza, use_filters=False)
338 else:
339 stanza.send()
340 else:
341 if hasattr(self.muc, "archive") and isinstance(stanza, Message):
342 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id)
343 if archive_only:
344 return stanza
345 for user_full_jid in self.muc.user_full_jids():
346 stanza = copy(stanza)
347 stanza["to"] = user_full_jid
348 self.__send_presence_if_needed(stanza, user_full_jid, archive_only)
349 stanza.send()
350 return stanza
352 def mucadmin_item(self):
353 item = MUCAdminItem()
354 item["nick"] = self.nickname
355 item["affiliation"] = self.affiliation
356 item["role"] = self.role
357 if not self.muc.is_anonymous:
358 if self.is_user:
359 item["jid"] = self.user_jid.bare
360 elif self.contact:
361 item["jid"] = self.contact.jid.bare
362 else:
363 warnings.warn(
364 (
365 f"Public group but no contact JID associated to {self.jid} in"
366 f" {self}"
367 ),
368 )
369 return item
371 def __add_nick_element(self, stanza: Union[Presence, Message]):
372 if (nick := self._nickname_no_illegal) != self.jid.resource:
373 n = self.xmpp.plugin["xep_0172"].stanza.UserNick()
374 n["nick"] = nick
375 stanza.append(n)
377 def _get_last_presence(self) -> Optional[CachedPresence]:
378 own = super()._get_last_presence()
379 if own is None and self.contact:
380 return self.contact._get_last_presence()
381 return own
383 def send_initial_presence(
384 self,
385 full_jid: JID,
386 nick_change=False,
387 presence_id: Optional[str] = None,
388 ):
389 """
390 Called when the user joins a MUC, as a mechanism
391 to indicate to the joining XMPP client the list of "participants".
393 Can be called this to trigger a "participant has joined the group" event.
395 :param full_jid: Set this to only send to a specific user XMPP resource.
396 :param nick_change: Used when the user joins and the MUC renames them (code 210)
397 :param presence_id: set the presence ID. used internally by slidge
398 """
399 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes
400 codes = set()
401 if nick_change:
402 codes.add(210)
404 if self.is_user:
405 # the "initial presence" of the user has to be vanilla, as it is
406 # a crucial part of the MUC join sequence for XMPP clients.
407 kwargs = {}
408 else:
409 cache = self._get_last_presence()
410 self.log.debug("Join muc, initial presence: %s", cache)
411 if cache:
412 ptype = cache.ptype
413 if ptype == "unavailable":
414 return
415 kwargs = dict(
416 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow
417 )
418 else:
419 kwargs = {}
420 p = self._make_presence(
421 status_codes=codes,
422 user_full_jid=full_jid,
423 **kwargs, # type:ignore
424 )
425 if presence_id:
426 p["id"] = presence_id
427 self._send(p, full_jid)
429 def leave(self):
430 """
431 Call this when the participant leaves the room
432 """
433 self.muc.remove_participant(self)
435 def kick(self, reason: str | None = None):
436 """
437 Call this when the participant is kicked from the room
438 """
439 self.muc.remove_participant(self, kick=True, reason=reason)
441 def ban(self, reason: str | None = None):
442 """
443 Call this when the participant is banned from the room
444 """
445 self.muc.remove_participant(self, ban=True, reason=reason)
447 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None):
448 if self.contact is not None:
449 return self.contact.get_disco_info()
450 return super().get_disco_info()
452 def moderate(self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None):
453 xmpp_id = self._legacy_to_xmpp(legacy_msg_id)
454 multi = self.xmpp.store.multi.get_xmpp_ids(self.session.user_pk, xmpp_id)
455 if multi is None:
456 msg_ids = [xmpp_id]
457 else:
458 msg_ids = multi + [xmpp_id]
460 for i in msg_ids:
461 m = self.muc.get_system_participant()._make_message()
462 m["apply_to"]["id"] = i
463 m["apply_to"]["moderated"].enable("retract")
464 m["apply_to"]["moderated"]["by"] = self.jid
465 if reason:
466 m["apply_to"]["moderated"]["reason"] = reason
467 self._send(m)
469 def set_room_subject(
470 self,
471 subject: str,
472 full_jid: Optional[JID] = None,
473 when: Optional[datetime] = None,
474 update_muc=True,
475 ):
476 if update_muc:
477 self.muc._subject = subject # type: ignore
478 self.muc.subject_setter = self.nickname
479 self.muc.subject_date = when
481 msg = self._make_message()
482 if when is not None:
483 msg["delay"].set_stamp(when)
484 msg["delay"]["from"] = self.muc.jid
485 msg["subject"] = subject or str(self.muc.name)
486 self._send(msg, full_jid)
488 @classmethod
489 def from_store(
490 cls,
491 session,
492 stored: Participant,
493 contact: Optional[LegacyContact] = None,
494 muc: Optional["LegacyMUC"] = None,
495 ) -> Self:
496 from slidge.group.room import LegacyMUC
498 if muc is None:
499 muc = LegacyMUC.get_self_or_unique_subclass().from_store(
500 session, stored.room
501 )
502 part = cls(
503 muc,
504 stored.nickname,
505 role=stored.role,
506 affiliation=stored.affiliation,
507 )
508 part.pk = stored.id
509 if contact is not None:
510 part.contact = contact
511 elif stored.contact is not None:
512 contact = LegacyContact.get_self_or_unique_subclass().from_store(
513 session, stored.contact
514 )
515 part.contact = contact
517 part.is_user = stored.is_user
518 if (data := stored.extra_attributes) is not None:
519 muc.deserialize_extra_attributes(data)
520 part._presence_sent = stored.presence_sent
521 part._hats = [Hat(h.uri, h.title) for h in stored.hats]
522 return part
525log = logging.getLogger(__name__)