Coverage for slidge/group/participant.py: 90%

321 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1import logging 

2import string 

3import stringprep 

4import uuid 

5import warnings 

6from copy import copy 

7from datetime import datetime 

8from functools import cached_property 

9from typing import TYPE_CHECKING, Optional, Self, Union 

10 

11from slixmpp import JID, InvalidJID, Message, Presence 

12from slixmpp.plugins.xep_0045.stanza import MUCAdminItem 

13from slixmpp.stringprep import StringprepError, resourceprep 

14from slixmpp.types import MessageTypes, OptJid 

15from slixmpp.util.stringprep_profiles import StringPrepError, prohibit_output 

16 

17from ..contact import LegacyContact 

18from ..core.mixins import ( 

19 ChatterDiscoMixin, 

20 MessageMixin, 

21 PresenceMixin, 

22 StoredAttributeMixin, 

23) 

24from ..db.models import Participant 

25from ..util import SubclassableOnce, strip_illegal_chars 

26from ..util.types import ( 

27 CachedPresence, 

28 Hat, 

29 LegacyMessageType, 

30 MessageOrPresenceTypeVar, 

31 MucAffiliation, 

32 MucRole, 

33) 

34 

35if TYPE_CHECKING: 

36 from .room import LegacyMUC 

37 

38 

39def strip_non_printable(nickname: str): 

40 new = ( 

41 "".join(x for x in nickname if x in string.printable) 

42 + f"-slidge-{hash(nickname)}" 

43 ) 

44 warnings.warn(f"Could not use {nickname} as a nickname, using {new}") 

45 return new 

46 

47 

48class LegacyParticipant( 

49 StoredAttributeMixin, 

50 PresenceMixin, 

51 MessageMixin, 

52 ChatterDiscoMixin, 

53 metaclass=SubclassableOnce, 

54): 

55 """ 

56 A legacy participant of a legacy group chat. 

57 """ 

58 

59 mtype: MessageTypes = "groupchat" 

60 _can_send_carbon = False 

61 USE_STANZA_ID = True 

62 STRIP_SHORT_DELAY = False 

63 pk: int 

64 

65 def __init__( 

66 self, 

67 muc: "LegacyMUC", 

68 nickname: Optional[str] = None, 

69 is_user=False, 

70 is_system=False, 

71 role: MucRole = "participant", 

72 affiliation: MucAffiliation = "member", 

73 ): 

74 self.session = session = muc.session 

75 self.xmpp = session.xmpp 

76 super().__init__() 

77 self._hats = list[Hat]() 

78 self.muc = muc 

79 self._role = role 

80 self._affiliation = affiliation 

81 self.is_user: bool = is_user 

82 self.is_system: bool = is_system 

83 

84 self._nickname = nickname 

85 

86 self.__update_jid(nickname) 

87 log.debug("Instantiation of: %r", self) 

88 

89 self.contact: Optional["LegacyContact"] = None 

90 # we track if we already sent a presence for this participant. 

91 # if we didn't, we send it before the first message. 

92 # this way, event in plugins that don't map "user has joined" events, 

93 # we send a "join"-presence from the participant before the first message 

94 self._presence_sent: bool = False 

95 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}") 

96 self.__part_store = self.xmpp.store.participants 

97 

98 @property 

99 def contact_pk(self) -> Optional[int]: # type:ignore 

100 if self.contact: 

101 return self.contact.contact_pk 

102 return None 

103 

104 @property 

105 def user_jid(self): 

106 return self.session.user_jid 

107 

108 def __repr__(self): 

109 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>" 

110 

111 @property 

112 def affiliation(self): 

113 return self._affiliation 

114 

115 @affiliation.setter 

116 def affiliation(self, affiliation: MucAffiliation): 

117 if self._affiliation == affiliation: 

118 return 

119 self._affiliation = affiliation 

120 if not self.muc._participants_filled: 

121 return 

122 self.__part_store.set_affiliation(self.pk, affiliation) 

123 if not self._presence_sent: 

124 return 

125 self.send_last_presence(force=True, no_cache_online=True) 

126 

127 def send_affiliation_change(self): 

128 # internal use by slidge 

129 msg = self._make_message() 

130 msg["muc"]["affiliation"] = self._affiliation 

131 msg["type"] = "normal" 

132 if not self.muc.is_anonymous and not self.is_system: 

133 if self.contact: 

134 msg["muc"]["jid"] = self.contact.jid 

135 else: 

136 warnings.warn( 

137 f"Private group but no 1:1 JID associated to '{self}'", 

138 ) 

139 self._send(msg) 

140 

141 @property 

142 def role(self): 

143 return self._role 

144 

145 @role.setter 

146 def role(self, role: MucRole): 

147 if self._role == role: 

148 return 

149 self._role = role 

150 if not self.muc._participants_filled: 

151 return 

152 self.__part_store.set_role(self.pk, role) 

153 if not self._presence_sent: 

154 return 

155 self.send_last_presence(force=True, no_cache_online=True) 

156 

157 def set_hats(self, hats: list[Hat]): 

158 if self._hats == hats: 

159 return 

160 self._hats = hats 

161 if not self.muc._participants_filled: 

162 return 

163 self.__part_store.set_hats(self.pk, hats) 

164 if not self._presence_sent: 

165 return 

166 self.send_last_presence(force=True, no_cache_online=True) 

167 

168 def __update_jid(self, unescaped_nickname: Optional[str]): 

169 j: JID = copy(self.muc.jid) 

170 

171 if self.is_system: 

172 self.jid = j 

173 self._nickname_no_illegal = "" 

174 return 

175 

176 nickname = unescaped_nickname 

177 

178 if nickname: 

179 nickname = self._nickname_no_illegal = strip_illegal_chars(nickname) 

180 else: 

181 warnings.warn( 

182 "Only the system participant is allowed to not have a nickname" 

183 ) 

184 nickname = f"unnamed-{uuid.uuid4()}" 

185 

186 assert isinstance(nickname, str) 

187 

188 try: 

189 # workaround for https://codeberg.org/poezio/slixmpp/issues/3480 

190 prohibit_output(nickname, [stringprep.in_table_a1]) 

191 resourceprep(nickname) 

192 except (StringPrepError, StringprepError): 

193 nickname = nickname.encode("punycode").decode() 

194 

195 # at this point there still might be control chars 

196 try: 

197 j.resource = nickname 

198 except InvalidJID: 

199 j.resource = strip_non_printable(nickname) 

200 

201 self.jid = j 

202 

203 def send_configuration_change(self, codes: tuple[int]): 

204 if not self.is_system: 

205 raise RuntimeError("This is only possible for the system participant") 

206 msg = self._make_message() 

207 msg["muc"]["status_codes"] = codes 

208 self._send(msg) 

209 

210 @property 

211 def nickname(self): 

212 return self._nickname 

213 

214 @nickname.setter 

215 def nickname(self, new_nickname: str): 

216 old = self._nickname 

217 if new_nickname == old: 

218 return 

219 

220 cache = getattr(self, "_last_presence", None) 

221 if cache: 

222 last_seen = cache.last_seen 

223 kwargs = cache.presence_kwargs 

224 else: 

225 last_seen = None 

226 kwargs = {} 

227 

228 kwargs["status_codes"] = {303} 

229 

230 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs) 

231 # in this order so pfrom=old resource and we actually use the escaped nick 

232 # in the muc/item/nick element 

233 self.__update_jid(new_nickname) 

234 p["muc"]["item"]["nick"] = self.jid.resource 

235 self._send(p) 

236 

237 self._nickname = new_nickname 

238 

239 kwargs["status_codes"] = set() 

240 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs) 

241 self._send(p) 

242 

243 def _make_presence( 

244 self, 

245 *, 

246 last_seen: Optional[datetime] = None, 

247 status_codes: Optional[set[int]] = None, 

248 user_full_jid: Optional[JID] = None, 

249 **presence_kwargs, 

250 ): 

251 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

252 p["muc"]["affiliation"] = self.affiliation 

253 p["muc"]["role"] = self.role 

254 if self._hats: 

255 p["hats"].add_hats(self._hats) 

256 codes = status_codes or set() 

257 if self.is_user: 

258 codes.add(110) 

259 if not self.muc.is_anonymous and not self.is_system: 

260 if self.is_user: 

261 if user_full_jid: 

262 p["muc"]["jid"] = user_full_jid 

263 else: 

264 jid = copy(self.user_jid) 

265 try: 

266 jid.resource = next( 

267 iter(self.muc.get_user_resources()) # type:ignore 

268 ) 

269 except StopIteration: 

270 jid.resource = "pseudo-resource" 

271 p["muc"]["jid"] = self.user_jid 

272 codes.add(100) 

273 elif self.contact: 

274 p["muc"]["jid"] = self.contact.jid 

275 if a := self.contact.get_avatar(): 

276 p["vcard_temp_update"]["photo"] = a.id 

277 else: 

278 warnings.warn( 

279 f"Private group but no 1:1 JID associated to '{self}'", 

280 ) 

281 if self.is_user and (hash_ := self.session.user.avatar_hash): 

282 p["vcard_temp_update"]["photo"] = hash_ 

283 p["muc"]["status_codes"] = codes 

284 return p 

285 

286 @property 

287 def DISCO_NAME(self): 

288 return self.nickname 

289 

290 def __send_presence_if_needed( 

291 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool 

292 ): 

293 if ( 

294 archive_only 

295 or self.is_system 

296 or self.is_user 

297 or self._presence_sent 

298 or stanza["subject"] 

299 ): 

300 return 

301 if isinstance(stanza, Message): 

302 if stanza.get_plugin("muc", check=True): 

303 return 

304 self.send_initial_presence(full_jid) 

305 

306 @cached_property 

307 def __occupant_id(self): 

308 if self.contact: 

309 return self.contact.jid 

310 elif self.is_user: 

311 return "slidge-user" 

312 elif self.is_system: 

313 return "room" 

314 else: 

315 return str(uuid.uuid4()) 

316 

317 def _send( 

318 self, 

319 stanza: MessageOrPresenceTypeVar, 

320 full_jid: Optional[JID] = None, 

321 archive_only=False, 

322 legacy_msg_id=None, 

323 **send_kwargs, 

324 ) -> MessageOrPresenceTypeVar: 

325 stanza["occupant-id"]["id"] = self.__occupant_id 

326 self.__add_nick_element(stanza) 

327 if not self.is_user and isinstance(stanza, Presence): 

328 if stanza["type"] == "unavailable" and not self._presence_sent: 

329 return stanza # type:ignore 

330 self._presence_sent = True 

331 self.__part_store.set_presence_sent(self.pk) 

332 if full_jid: 

333 stanza["to"] = full_jid 

334 self.__send_presence_if_needed(stanza, full_jid, archive_only) 

335 if self.is_user: 

336 assert stanza.stream is not None 

337 stanza.stream.send(stanza, use_filters=False) 

338 else: 

339 stanza.send() 

340 else: 

341 if hasattr(self.muc, "archive") and isinstance(stanza, Message): 

342 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id) 

343 if archive_only: 

344 return stanza 

345 for user_full_jid in self.muc.user_full_jids(): 

346 stanza = copy(stanza) 

347 stanza["to"] = user_full_jid 

348 self.__send_presence_if_needed(stanza, user_full_jid, archive_only) 

349 stanza.send() 

350 return stanza 

351 

352 def mucadmin_item(self): 

353 item = MUCAdminItem() 

354 item["nick"] = self.nickname 

355 item["affiliation"] = self.affiliation 

356 item["role"] = self.role 

357 if not self.muc.is_anonymous: 

358 if self.is_user: 

359 item["jid"] = self.user_jid.bare 

360 elif self.contact: 

361 item["jid"] = self.contact.jid.bare 

362 else: 

363 warnings.warn( 

364 ( 

365 f"Public group but no contact JID associated to {self.jid} in" 

366 f" {self}" 

367 ), 

368 ) 

369 return item 

370 

371 def __add_nick_element(self, stanza: Union[Presence, Message]): 

372 if (nick := self._nickname_no_illegal) != self.jid.resource: 

373 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

374 n["nick"] = nick 

375 stanza.append(n) 

376 

377 def _get_last_presence(self) -> Optional[CachedPresence]: 

378 own = super()._get_last_presence() 

379 if own is None and self.contact: 

380 return self.contact._get_last_presence() 

381 return own 

382 

383 def send_initial_presence( 

384 self, 

385 full_jid: JID, 

386 nick_change=False, 

387 presence_id: Optional[str] = None, 

388 ): 

389 """ 

390 Called when the user joins a MUC, as a mechanism 

391 to indicate to the joining XMPP client the list of "participants". 

392 

393 Can be called this to trigger a "participant has joined the group" event. 

394 

395 :param full_jid: Set this to only send to a specific user XMPP resource. 

396 :param nick_change: Used when the user joins and the MUC renames them (code 210) 

397 :param presence_id: set the presence ID. used internally by slidge 

398 """ 

399 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes 

400 codes = set() 

401 if nick_change: 

402 codes.add(210) 

403 

404 if self.is_user: 

405 # the "initial presence" of the user has to be vanilla, as it is 

406 # a crucial part of the MUC join sequence for XMPP clients. 

407 kwargs = {} 

408 else: 

409 cache = self._get_last_presence() 

410 self.log.debug("Join muc, initial presence: %s", cache) 

411 if cache: 

412 ptype = cache.ptype 

413 if ptype == "unavailable": 

414 return 

415 kwargs = dict( 

416 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow 

417 ) 

418 else: 

419 kwargs = {} 

420 p = self._make_presence( 

421 status_codes=codes, 

422 user_full_jid=full_jid, 

423 **kwargs, # type:ignore 

424 ) 

425 if presence_id: 

426 p["id"] = presence_id 

427 self._send(p, full_jid) 

428 

429 def leave(self): 

430 """ 

431 Call this when the participant leaves the room 

432 """ 

433 self.muc.remove_participant(self) 

434 

435 def kick(self, reason: str | None = None): 

436 """ 

437 Call this when the participant is kicked from the room 

438 """ 

439 self.muc.remove_participant(self, kick=True, reason=reason) 

440 

441 def ban(self, reason: str | None = None): 

442 """ 

443 Call this when the participant is banned from the room 

444 """ 

445 self.muc.remove_participant(self, ban=True, reason=reason) 

446 

447 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None): 

448 if self.contact is not None: 

449 return self.contact.get_disco_info() 

450 return super().get_disco_info() 

451 

452 def moderate(self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None): 

453 xmpp_id = self._legacy_to_xmpp(legacy_msg_id) 

454 multi = self.xmpp.store.multi.get_xmpp_ids(self.session.user_pk, xmpp_id) 

455 if multi is None: 

456 msg_ids = [xmpp_id] 

457 else: 

458 msg_ids = multi + [xmpp_id] 

459 

460 for i in msg_ids: 

461 m = self.muc.get_system_participant()._make_message() 

462 m["apply_to"]["id"] = i 

463 m["apply_to"]["moderated"].enable("retract") 

464 m["apply_to"]["moderated"]["by"] = self.jid 

465 if reason: 

466 m["apply_to"]["moderated"]["reason"] = reason 

467 self._send(m) 

468 

469 def set_room_subject( 

470 self, 

471 subject: str, 

472 full_jid: Optional[JID] = None, 

473 when: Optional[datetime] = None, 

474 update_muc=True, 

475 ): 

476 if update_muc: 

477 self.muc._subject = subject # type: ignore 

478 self.muc.subject_setter = self.nickname 

479 self.muc.subject_date = when 

480 

481 msg = self._make_message() 

482 if when is not None: 

483 msg["delay"].set_stamp(when) 

484 msg["delay"]["from"] = self.muc.jid 

485 msg["subject"] = subject or str(self.muc.name) 

486 self._send(msg, full_jid) 

487 

488 @classmethod 

489 def from_store( 

490 cls, 

491 session, 

492 stored: Participant, 

493 contact: Optional[LegacyContact] = None, 

494 muc: Optional["LegacyMUC"] = None, 

495 ) -> Self: 

496 from slidge.group.room import LegacyMUC 

497 

498 if muc is None: 

499 muc = LegacyMUC.get_self_or_unique_subclass().from_store( 

500 session, stored.room 

501 ) 

502 part = cls( 

503 muc, 

504 stored.nickname, 

505 role=stored.role, 

506 affiliation=stored.affiliation, 

507 ) 

508 part.pk = stored.id 

509 if contact is not None: 

510 part.contact = contact 

511 elif stored.contact is not None: 

512 contact = LegacyContact.get_self_or_unique_subclass().from_store( 

513 session, stored.contact 

514 ) 

515 part.contact = contact 

516 

517 part.is_user = stored.is_user 

518 if (data := stored.extra_attributes) is not None: 

519 muc.deserialize_extra_attributes(data) 

520 part._presence_sent = stored.presence_sent 

521 part._hats = [Hat(h.uri, h.title) for h in stored.hats] 

522 return part 

523 

524 

525log = logging.getLogger(__name__)