Coverage for slidge/group/participant.py: 89%

366 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import logging 

2import string 

3import uuid 

4import warnings 

5from copy import copy 

6from datetime import datetime 

7from functools import cached_property 

8from typing import TYPE_CHECKING, Any, Optional, Union 

9from xml.etree import ElementTree as ET 

10 

11import sqlalchemy as sa 

12from slixmpp import JID, InvalidJID, Message, Presence 

13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem 

14from slixmpp.types import MessageTypes, OptJid 

15from sqlalchemy.orm.exc import DetachedInstanceError 

16 

17from ..contact import LegacyContact 

18from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin 

19from ..core.mixins.db import DBMixin 

20from ..db.models import Participant 

21from ..util import SubclassableOnce, strip_illegal_chars 

22from ..util.types import ( 

23 CachedPresence, 

24 Hat, 

25 LegacyMessageType, 

26 LegacyThreadType, 

27 MessageOrPresenceTypeVar, 

28 MucAffiliation, 

29 MucRole, 

30) 

31 

32if TYPE_CHECKING: 

33 from .room import LegacyMUC 

34 

35 

36def strip_non_printable(nickname: str): 

37 new = ( 

38 "".join(x for x in nickname if x in string.printable) 

39 + f"-slidge-{hash(nickname)}" 

40 ) 

41 warnings.warn(f"Could not use {nickname} as a nickname, using {new}") 

42 return new 

43 

44 

45class LegacyParticipant( 

46 PresenceMixin, 

47 MessageMixin, 

48 ChatterDiscoMixin, 

49 DBMixin, 

50 metaclass=SubclassableOnce, 

51): 

52 """ 

53 A legacy participant of a legacy group chat. 

54 """ 

55 

56 is_participant = True 

57 

58 mtype: MessageTypes = "groupchat" 

59 _can_send_carbon = False 

60 USE_STANZA_ID = True 

61 STRIP_SHORT_DELAY = False 

62 stored: Participant 

63 contact: LegacyContact[Any] | None 

64 

65 def __init__( 

66 self, 

67 muc: "LegacyMUC", 

68 stored: Participant, 

69 is_system: bool = False, 

70 contact: LegacyContact[Any] | None = None, 

71 ) -> None: 

72 self.muc = muc 

73 self.session = muc.session 

74 self.xmpp = muc.session.xmpp 

75 self.is_system = is_system 

76 

77 if contact is None and stored.contact is not None: 

78 contact = self.session.contacts.from_store(stored=stored.contact) 

79 if contact is not None and stored.contact is None: 

80 stored.contact = contact.stored 

81 

82 self.stored = stored 

83 self.contact = contact 

84 

85 super().__init__() 

86 

87 if stored.resource is None: 

88 self.__update_resource(stored.nickname) 

89 

90 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}") 

91 

92 @property 

93 def is_user(self) -> bool: 

94 try: 

95 return self.stored.is_user 

96 except DetachedInstanceError: 

97 self.merge() 

98 return self.stored.is_user 

99 

100 @is_user.setter 

101 def is_user(self, is_user: bool) -> None: 

102 with self.xmpp.store.session(expire_on_commit=True) as orm: 

103 orm.add(self.stored) 

104 self.stored.is_user = is_user 

105 orm.commit() 

106 

107 @property 

108 def jid(self) -> JID: 

109 jid = JID(self.muc.jid) 

110 if self.stored.resource: 

111 jid.resource = self.stored.resource 

112 return jid 

113 

114 @jid.setter 

115 def jid(self, x: JID): 

116 # FIXME: without this, mypy yields 

117 # "Cannot override writeable attribute with read-only property" 

118 # But it does not happen for LegacyContact. WTF? 

119 raise RuntimeError 

120 

121 def __should_commit(self) -> bool: 

122 if self.is_system: 

123 return False 

124 if self.muc.get_lock("fill participants"): 

125 return False 

126 if self.muc.get_lock("fill history"): 

127 return False 

128 return True 

129 

130 def commit(self, *args, **kwargs) -> None: 

131 if not self.__should_commit(): 

132 return 

133 super().commit(*args, **kwargs) 

134 

135 @property 

136 def user_jid(self): 

137 return self.session.user_jid 

138 

139 def __repr__(self) -> str: 

140 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>" 

141 

142 @property 

143 def _presence_sent(self) -> bool: 

144 # we track if we already sent a presence for this participant. 

145 # if we didn't, we send it before the first message. 

146 # this way, event in plugins that don't map "user has joined" events, 

147 # we send a "join"-presence from the participant before the first message 

148 return self.stored.presence_sent 

149 

150 @_presence_sent.setter 

151 def _presence_sent(self, val: bool) -> None: 

152 if self._presence_sent == val: 

153 return 

154 self.stored.presence_sent = val 

155 if not self.__should_commit(): 

156 return 

157 with self.xmpp.store.session() as orm: 

158 orm.execute( 

159 sa.update(Participant) 

160 .where(Participant.id == self.stored.id) 

161 .values(presence_sent=val) 

162 ) 

163 orm.commit() 

164 

165 @property 

166 def nickname_no_illegal(self) -> str: 

167 return self.stored.nickname_no_illegal 

168 

169 @property 

170 def affiliation(self): 

171 return self.stored.affiliation 

172 

173 @affiliation.setter 

174 def affiliation(self, affiliation: MucAffiliation) -> None: 

175 if self.affiliation == affiliation: 

176 return 

177 self.stored.affiliation = affiliation 

178 if not self.muc.participants_filled: 

179 return 

180 self.commit() 

181 if not self._presence_sent: 

182 return 

183 self.send_last_presence(force=True, no_cache_online=True) 

184 

185 def send_affiliation_change(self) -> None: 

186 # internal use by slidge 

187 msg = self._make_message() 

188 msg["muc"]["affiliation"] = self.affiliation 

189 msg["type"] = "normal" 

190 if not self.muc.is_anonymous and not self.is_system: 

191 if self.contact: 

192 msg["muc"]["jid"] = self.contact.jid 

193 else: 

194 warnings.warn( 

195 f"Private group but no 1:1 JID associated to '{self}'", 

196 ) 

197 self._send(msg) 

198 

199 @property 

200 def role(self): 

201 return self.stored.role 

202 

203 @role.setter 

204 def role(self, role: MucRole) -> None: 

205 if self.role == role: 

206 return 

207 self.stored.role = role 

208 if not self.muc.participants_filled: 

209 return 

210 self.commit() 

211 if not self._presence_sent: 

212 return 

213 self.send_last_presence(force=True, no_cache_online=True) 

214 

215 @property 

216 def hats(self) -> list[Hat]: 

217 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else [] 

218 

219 def set_hats(self, hats: list[Hat]) -> None: 

220 if self.hats == hats: 

221 return 

222 self.stored.hats = hats # type:ignore[assignment] 

223 if not self.muc.participants_filled: 

224 return 

225 self.commit(merge=True) 

226 if not self._presence_sent: 

227 return 

228 self.send_last_presence(force=True, no_cache_online=True) 

229 

230 def __update_resource(self, unescaped_nickname: Optional[str]) -> None: 

231 if not unescaped_nickname: 

232 self.stored.resource = "" 

233 if self.is_system: 

234 self.stored.nickname_no_illegal = "" 

235 else: 

236 warnings.warn( 

237 "Only the system participant is allowed to not have a nickname" 

238 ) 

239 nickname = f"unnamed-{uuid.uuid4()}" 

240 self.stored.resource = self.stored.nickname_no_illegal = nickname 

241 return 

242 

243 self.stored.nickname_no_illegal, jid = escape_nickname( 

244 self.muc.jid, 

245 unescaped_nickname, 

246 ) 

247 self.stored.resource = jid.resource 

248 

249 def send_configuration_change(self, codes: tuple[int]): 

250 if not self.is_system: 

251 raise RuntimeError("This is only possible for the system participant") 

252 msg = self._make_message() 

253 msg["muc"]["status_codes"] = codes 

254 self._send(msg) 

255 

256 @property 

257 def nickname(self): 

258 return self.stored.nickname 

259 

260 @nickname.setter 

261 def nickname(self, new_nickname: str) -> None: 

262 old = self.nickname 

263 if new_nickname == old: 

264 return 

265 

266 cache = getattr(self, "_last_presence", None) 

267 if cache: 

268 last_seen = cache.last_seen 

269 kwargs = cache.presence_kwargs 

270 else: 

271 last_seen = None 

272 kwargs = {} 

273 

274 kwargs["status_codes"] = {303} 

275 

276 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs) 

277 # in this order so pfrom=old resource and we actually use the escaped nick 

278 # in the muc/item/nick element 

279 self.__update_resource(new_nickname) 

280 p["muc"]["item"]["nick"] = self.jid.resource 

281 self._send(p) 

282 

283 self.stored.nickname = new_nickname 

284 self.commit() 

285 kwargs["status_codes"] = set() 

286 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs) 

287 self._send(p) 

288 

289 def _make_presence( 

290 self, 

291 *, 

292 last_seen: Optional[datetime] = None, 

293 status_codes: Optional[set[int]] = None, 

294 user_full_jid: Optional[JID] = None, 

295 **presence_kwargs, 

296 ): 

297 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

298 p["muc"]["affiliation"] = self.affiliation 

299 p["muc"]["role"] = self.role 

300 if self.hats: 

301 p["hats"].add_hats(self.hats) 

302 codes = status_codes or set() 

303 if self.is_user: 

304 codes.add(110) 

305 if not self.muc.is_anonymous and not self.is_system: 

306 if self.is_user: 

307 if user_full_jid: 

308 p["muc"]["jid"] = user_full_jid 

309 else: 

310 jid = JID(self.user_jid) 

311 try: 

312 jid.resource = next(iter(self.muc.get_user_resources())) 

313 except StopIteration: 

314 jid.resource = "pseudo-resource" 

315 p["muc"]["jid"] = self.user_jid 

316 codes.add(100) 

317 elif self.contact: 

318 p["muc"]["jid"] = self.contact.jid 

319 if a := self.contact.get_avatar(): 

320 p["vcard_temp_update"]["photo"] = a.id 

321 else: 

322 warnings.warn( 

323 f"Private group but no 1:1 JID associated to '{self}'", 

324 ) 

325 if self.is_user and (hash_ := self.session.user.avatar_hash): 

326 p["vcard_temp_update"]["photo"] = hash_ 

327 p["muc"]["status_codes"] = codes 

328 return p 

329 

330 @property 

331 def DISCO_NAME(self): # type:ignore[override] 

332 return self.nickname 

333 

334 def __send_presence_if_needed( 

335 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool 

336 ) -> None: 

337 if ( 

338 archive_only 

339 or self.is_system 

340 or self.is_user 

341 or self._presence_sent 

342 or stanza["subject"] 

343 ): 

344 return 

345 if isinstance(stanza, Message): 

346 if stanza.get_plugin("muc", check=True): 

347 return 

348 self.send_initial_presence(full_jid) 

349 

350 @cached_property 

351 def __occupant_id(self): 

352 if self.contact: 

353 return self.contact.jid 

354 elif self.is_user: 

355 return "slidge-user" 

356 elif self.is_system: 

357 return "room" 

358 else: 

359 return str(uuid.uuid4()) 

360 

361 def _send( 

362 self, 

363 stanza: MessageOrPresenceTypeVar, 

364 full_jid: Optional[JID] = None, 

365 archive_only: bool = False, 

366 legacy_msg_id=None, 

367 initial_presence=False, 

368 **send_kwargs, 

369 ) -> MessageOrPresenceTypeVar: 

370 if stanza.get_from().resource: 

371 stanza["occupant-id"]["id"] = self.__occupant_id 

372 else: 

373 stanza["occupant-id"]["id"] = "room" 

374 self.__add_nick_element(stanza) 

375 if not self.is_user and isinstance(stanza, Presence): 

376 if stanza["type"] == "unavailable" and not self._presence_sent: 

377 return stanza # type:ignore 

378 if initial_presence: 

379 self.stored.presence_sent = True 

380 else: 

381 self._presence_sent = True 

382 if full_jid: 

383 stanza["to"] = full_jid 

384 self.__send_presence_if_needed(stanza, full_jid, archive_only) 

385 if self.is_user: 

386 assert stanza.stream is not None 

387 stanza.stream.send(stanza, use_filters=False) 

388 else: 

389 stanza.send() 

390 else: 

391 if hasattr(self.muc, "archive") and isinstance(stanza, Message): 

392 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id) 

393 if archive_only: 

394 return stanza 

395 for user_full_jid in self.muc.user_full_jids(): 

396 stanza = copy(stanza) 

397 stanza["to"] = user_full_jid 

398 self.__send_presence_if_needed(stanza, user_full_jid, archive_only) 

399 stanza.send() 

400 return stanza 

401 

402 def mucadmin_item(self): 

403 item = MUCAdminItem() 

404 item["nick"] = self.nickname 

405 item["affiliation"] = self.affiliation 

406 item["role"] = self.role 

407 if not self.muc.is_anonymous: 

408 if self.is_user: 

409 item["jid"] = self.user_jid.bare 

410 elif self.contact: 

411 item["jid"] = self.contact.jid.bare 

412 else: 

413 warnings.warn( 

414 ( 

415 f"Public group but no contact JID associated to {self.jid} in" 

416 f" {self}" 

417 ), 

418 ) 

419 return item 

420 

421 def __add_nick_element(self, stanza: Union[Presence, Message]) -> None: 

422 if (nick := self.nickname_no_illegal) != self.jid.resource: 

423 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

424 n["nick"] = nick 

425 stanza.append(n) 

426 

427 def _get_last_presence(self) -> Optional[CachedPresence]: 

428 own = super()._get_last_presence() 

429 if own is None and self.contact: 

430 return self.contact._get_last_presence() 

431 return own 

432 

433 def send_initial_presence( 

434 self, 

435 full_jid: JID, 

436 nick_change: bool = False, 

437 presence_id: Optional[str] = None, 

438 ) -> None: 

439 """ 

440 Called when the user joins a MUC, as a mechanism 

441 to indicate to the joining XMPP client the list of "participants". 

442 

443 Can be called this to trigger a "participant has joined the group" event. 

444 

445 :param full_jid: Set this to only send to a specific user XMPP resource. 

446 :param nick_change: Used when the user joins and the MUC renames them (code 210) 

447 :param presence_id: set the presence ID. used internally by slidge 

448 """ 

449 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes 

450 codes = set() 

451 if nick_change: 

452 codes.add(210) 

453 

454 if self.is_user: 

455 # the "initial presence" of the user has to be vanilla, as it is 

456 # a crucial part of the MUC join sequence for XMPP clients. 

457 kwargs = {} 

458 else: 

459 cache = self._get_last_presence() 

460 self.log.debug("Join muc, initial presence: %s", cache) 

461 if cache: 

462 ptype = cache.ptype 

463 if ptype == "unavailable": 

464 return 

465 kwargs = dict( 

466 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow 

467 ) 

468 else: 

469 kwargs = {} 

470 p = self._make_presence( 

471 status_codes=codes, 

472 user_full_jid=full_jid, 

473 **kwargs, # type:ignore 

474 ) 

475 if presence_id: 

476 p["id"] = presence_id 

477 self._send(p, full_jid, initial_presence=True) 

478 

479 def leave(self) -> None: 

480 """ 

481 Call this when the participant leaves the room 

482 """ 

483 self.muc.remove_participant(self) 

484 

485 def kick(self, reason: str | None = None) -> None: 

486 """ 

487 Call this when the participant is kicked from the room 

488 """ 

489 self.muc.remove_participant(self, kick=True, reason=reason) 

490 

491 def ban(self, reason: str | None = None) -> None: 

492 """ 

493 Call this when the participant is banned from the room 

494 """ 

495 self.muc.remove_participant(self, ban=True, reason=reason) 

496 

497 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None): 

498 if self.contact is not None: 

499 return self.contact.get_disco_info() 

500 return super().get_disco_info() 

501 

502 def moderate( 

503 self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None 

504 ) -> None: 

505 for i in self._legacy_to_xmpp(legacy_msg_id): 

506 m = self.muc.get_system_participant()._make_message() 

507 m["retract"]["id"] = i 

508 if self.is_system: 

509 m["retract"].enable("moderated") 

510 else: 

511 m["retract"]["moderated"]["by"] = self.jid 

512 m["retract"]["moderated"]["occupant-id"]["id"] = self.__occupant_id 

513 if reason: 

514 m["retract"]["reason"] = reason 

515 self._send(m) 

516 

517 def set_room_subject( 

518 self, 

519 subject: str, 

520 full_jid: Optional[JID] = None, 

521 when: Optional[datetime] = None, 

522 update_muc: bool = True, 

523 ) -> None: 

524 if update_muc: 

525 self.muc._subject = subject # type: ignore 

526 self.muc.subject_setter = self.nickname 

527 self.muc.subject_date = when 

528 

529 msg = self._make_message() 

530 if when is not None: 

531 msg["delay"].set_stamp(when) 

532 msg["delay"]["from"] = self.muc.jid 

533 if subject: 

534 msg["subject"] = subject 

535 else: 

536 # may be simplified if slixmpp lets it do it more easily some day 

537 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

538 self._send(msg, full_jid) 

539 

540 def set_thread_subject( 

541 self, 

542 thread: LegacyThreadType, 

543 subject: str | None, 

544 when: Optional[datetime] = None, 

545 ) -> None: 

546 msg = self._make_message() 

547 msg["thread"] = str(thread) 

548 if when is not None: 

549 msg["delay"].set_stamp(when) 

550 msg["delay"]["from"] = self.muc.jid 

551 if subject: 

552 msg["subject"] = subject 

553 else: 

554 # may be simplified if slixmpp lets it do it more easily some day 

555 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

556 self._send(msg) 

557 

558 

559def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]: 

560 nickname = nickname_no_illegal = strip_illegal_chars(nickname) 

561 

562 jid = JID(muc_jid) 

563 

564 try: 

565 jid.resource = nickname 

566 except InvalidJID: 

567 nickname = nickname.encode("punycode").decode() 

568 try: 

569 jid.resource = nickname 

570 except InvalidJID: 

571 # at this point there still might be control chars 

572 jid.resource = strip_non_printable(nickname) 

573 

574 return nickname_no_illegal, jid 

575 

576 

577log = logging.getLogger(__name__)