Coverage for slidge / group / participant.py: 87%

372 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import logging 

2import string 

3import uuid 

4import warnings 

5from copy import copy 

6from datetime import datetime 

7from typing import TYPE_CHECKING, Any, Generic, Literal 

8from xml.etree import ElementTree as ET 

9 

10import sqlalchemy as sa 

11from slixmpp import JID, InvalidJID, Message, Presence 

12from slixmpp.plugins.xep_0030.stanza.info import DiscoInfo 

13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem 

14from slixmpp.plugins.xep_0492.stanza import Never 

15from slixmpp.types import MessageTypes, OptJid 

16from sqlalchemy.orm.exc import DetachedInstanceError 

17 

18from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin 

19from ..core.mixins.db import DBMixin 

20from ..db.models import Participant 

21from ..util import SubclassableOnce, strip_illegal_chars 

22from ..util.types import ( 

23 AnyMUC, 

24 CachedPresence, 

25 Hat, 

26 LegacyContactType, 

27 MessageOrPresenceTypeVar, 

28 MucAffiliation, 

29 MucRole, 

30) 

31 

32if TYPE_CHECKING: 

33 from slidge.command.base import ContactCommand 

34 

35 

36def strip_non_printable(nickname: str) -> str: 

37 new = ( 

38 "".join(x for x in nickname if x in string.printable) 

39 + f"-slidge-{hash(nickname)}" 

40 ) 

41 warnings.warn(f"Could not use {nickname} as a nickname, using {new}") 

42 return new 

43 

44 

45class LegacyParticipant( 

46 Generic[LegacyContactType], 

47 PresenceMixin, 

48 MessageMixin, 

49 ChatterDiscoMixin, 

50 DBMixin, 

51 SubclassableOnce, 

52): 

53 """ 

54 A legacy participant of a legacy group chat. 

55 """ 

56 

57 is_participant: Literal[True] = True 

58 

59 mtype: MessageTypes = "groupchat" 

60 _can_send_carbon = False 

61 USE_STANZA_ID = True 

62 STRIP_SHORT_DELAY = False 

63 stored: Participant 

64 contact: LegacyContactType | None 

65 

66 def __init__( 

67 self, 

68 muc: AnyMUC, 

69 stored: Participant, 

70 is_system: bool = False, 

71 contact: LegacyContactType | None = None, 

72 ) -> None: 

73 self.muc = muc 

74 self.session = muc.session 

75 self.xmpp = muc.session.xmpp 

76 self.is_system = is_system 

77 

78 if contact is None and stored.contact is not None: 

79 contact = self.session.contacts.from_store(stored=stored.contact) 

80 if contact is not None and stored.contact is None: 

81 stored.contact = contact.stored 

82 

83 self.stored = stored 

84 self.contact = contact 

85 

86 super().__init__() 

87 

88 if stored.resource is None: 

89 self.__update_resource(stored.nickname) 

90 

91 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}") 

92 

93 def __eq__(self, other: object) -> bool: 

94 return isinstance(other, LegacyParticipant) and self.jid == other.jid 

95 

96 @property 

97 def is_user(self) -> bool: 

98 try: 

99 return self.stored.is_user 

100 except DetachedInstanceError: 

101 self.merge() 

102 return self.stored.is_user 

103 

104 @is_user.setter 

105 def is_user(self, is_user: bool) -> None: 

106 with self.xmpp.store.session(expire_on_commit=True) as orm: 

107 orm.add(self.stored) 

108 self.stored.is_user = is_user 

109 orm.commit() 

110 

111 @property 

112 def jid(self) -> JID: 

113 jid = JID(self.muc.jid) 

114 if self.stored.resource: 

115 jid.resource = self.stored.resource 

116 return jid 

117 

118 @jid.setter 

119 def jid(self, x: JID) -> None: 

120 # FIXME: without this, mypy yields 

121 # "Cannot override writeable attribute with read-only property" 

122 # But it does not happen for LegacyContact. WTF? 

123 raise RuntimeError 

124 

125 @property 

126 def commands(self) -> dict[str, "type[ContactCommand[Any]]"]: # type:ignore[override] 

127 if self.contact is None: 

128 return {} 

129 else: 

130 return self.contact.commands 

131 

132 def __should_commit(self) -> bool: 

133 if self.is_system: 

134 return False 

135 if self.muc.get_lock("fill participants"): 

136 return False 

137 return not self.muc.get_lock("fill history") 

138 

139 def commit(self) -> None: 

140 if not self.__should_commit(): 

141 return 

142 super().commit() 

143 

144 @property 

145 def user_jid(self) -> JID: 

146 return self.session.user_jid 

147 

148 def __repr__(self) -> str: 

149 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>" 

150 

151 @property 

152 def _presence_sent(self) -> bool: 

153 # we track if we already sent a presence for this participant. 

154 # if we didn't, we send it before the first message. 

155 # this way, event in plugins that don't map "user has joined" events, 

156 # we send a "join"-presence from the participant before the first message 

157 return self.stored.presence_sent 

158 

159 @_presence_sent.setter 

160 def _presence_sent(self, val: bool) -> None: 

161 if self._presence_sent == val: 

162 return 

163 self.stored.presence_sent = val 

164 if not self.__should_commit(): 

165 return 

166 with self.xmpp.store.session() as orm: 

167 orm.execute( 

168 sa.update(Participant) 

169 .where(Participant.id == self.stored.id) 

170 .values(presence_sent=val) 

171 ) 

172 orm.commit() 

173 

174 @property 

175 def nickname_no_illegal(self) -> str: 

176 return self.stored.nickname_no_illegal 

177 

178 @property 

179 def affiliation(self) -> MucAffiliation: 

180 return self.stored.affiliation 

181 

182 @affiliation.setter 

183 def affiliation(self, affiliation: MucAffiliation) -> None: 

184 if self.affiliation == affiliation: 

185 return 

186 was = self.stored.affiliation 

187 self.stored.affiliation = affiliation 

188 if not self.muc.participants_filled: 

189 return 

190 self.commit() 

191 if self.cached_presence is None or self.cached_presence.ptype == "unavailable": 

192 self.muc.send_affiliation_change(self, was) 

193 self.send_last_presence(force=True, no_cache_online=True) 

194 

195 @property 

196 def role(self) -> MucRole: 

197 return self.stored.role 

198 

199 @role.setter 

200 def role(self, role: MucRole) -> None: 

201 if self.role == role: 

202 return 

203 self.stored.role = role 

204 if not self.muc.participants_filled: 

205 return 

206 self.commit() 

207 if not self._presence_sent: 

208 return 

209 self.send_last_presence(force=True, no_cache_online=True) 

210 

211 @property 

212 def hats(self) -> list[Hat]: 

213 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else [] 

214 

215 def set_hats(self, hats: list[Hat]) -> None: 

216 if self.hats == hats: 

217 return 

218 self.stored.hats = hats 

219 if not self.muc.participants_filled: 

220 return 

221 self.commit() 

222 if not self._presence_sent: 

223 return 

224 self.send_last_presence(force=True, no_cache_online=True) 

225 

226 def __update_resource(self, unescaped_nickname: str | None) -> None: 

227 if not unescaped_nickname: 

228 self.stored.resource = "" 

229 if self.is_system: 

230 self.stored.nickname_no_illegal = "" 

231 else: 

232 warnings.warn( 

233 "Only the system participant is allowed to not have a nickname" 

234 ) 

235 nickname = f"unnamed-{uuid.uuid4()}" 

236 self.stored.resource = self.stored.nickname_no_illegal = nickname 

237 return 

238 

239 self.stored.nickname_no_illegal, jid = escape_nickname( 

240 self.muc.jid, 

241 unescaped_nickname, 

242 ) 

243 self.stored.resource = jid.resource 

244 

245 def send_configuration_change(self, codes: tuple[int, ...]) -> None: 

246 if not self.is_system: 

247 raise RuntimeError("This is only possible for the system participant") 

248 msg = self._make_message() 

249 msg["muc"]["status_codes"] = codes 

250 self._send(msg) 

251 

252 @property 

253 def nickname(self) -> str: 

254 return self.stored.nickname 

255 

256 @nickname.setter 

257 def nickname(self, new_nickname: str) -> None: 

258 old = self.nickname 

259 if new_nickname == old: 

260 return 

261 

262 if self.muc.stored.id is not None: 

263 with self.xmpp.store.session() as orm: 

264 if not self.xmpp.store.rooms.nick_available( 

265 orm, self.muc.stored.id, new_nickname 

266 ): 

267 if self.contact is None: 

268 new_nickname = f"{new_nickname} ({self.occupant_id})" 

269 else: 

270 new_nickname = f"{new_nickname} ({self.contact.legacy_id})" 

271 

272 cache = getattr(self, "_last_presence", None) 

273 if cache: 

274 last_seen = cache.last_seen 

275 kwargs = cache.presence_kwargs 

276 else: 

277 last_seen = None 

278 kwargs = {} 

279 

280 kwargs["status_codes"] = {303} 

281 

282 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs) 

283 # in this order so pfrom=old resource and we actually use the escaped nick 

284 # in the muc/item/nick element 

285 self.__update_resource(new_nickname) 

286 p["muc"]["item"]["nick"] = self.jid.resource 

287 self._send(p) 

288 

289 self.stored.nickname = new_nickname 

290 self.commit() 

291 kwargs["status_codes"] = set() 

292 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs) 

293 self._send(p) 

294 

295 def _make_presence( # type:ignore[no-untyped-def] 

296 self, 

297 *, 

298 last_seen: datetime | None = None, 

299 status_codes: set[int] | None = None, 

300 user_full_jid: JID | None = None, 

301 **presence_kwargs, # noqa type:ignore[no-untyped-def] 

302 ) -> Presence: 

303 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

304 p["muc"]["affiliation"] = self.affiliation 

305 p["muc"]["role"] = self.role 

306 if self.hats: 

307 p["hats"].add_hats(self.hats) 

308 codes = status_codes or set() 

309 if self.is_user: 

310 codes.add(110) 

311 if not self.muc.is_anonymous and not self.is_system: 

312 if self.is_user: 

313 if user_full_jid: 

314 p["muc"]["jid"] = user_full_jid 

315 else: 

316 jid = JID(self.user_jid) 

317 try: 

318 jid.resource = next(iter(self.muc.get_user_resources())) 

319 except StopIteration: 

320 jid.resource = "pseudo-resource" 

321 p["muc"]["jid"] = self.user_jid 

322 codes.add(100) 

323 elif self.contact: 

324 p["muc"]["jid"] = self.contact.jid 

325 if a := self.contact.get_avatar(): 

326 p["vcard_temp_update"]["photo"] = a.id 

327 else: 

328 warnings.warn( 

329 f"Private group but no 1:1 JID associated to '{self}'", 

330 ) 

331 if self.is_user and (hash_ := self.session.user.avatar_hash): 

332 p["vcard_temp_update"]["photo"] = hash_ 

333 p["muc"]["status_codes"] = codes 

334 return p 

335 

336 @property 

337 def DISCO_NAME(self) -> str: 

338 return self.nickname 

339 

340 @DISCO_NAME.setter 

341 def DISCO_NAME(self, _: str) -> Never: 

342 raise RuntimeError 

343 

344 def __send_presence_if_needed( 

345 self, stanza: Message | Presence, full_jid: JID, archive_only: bool 

346 ) -> None: 

347 if ( 

348 archive_only 

349 or self.is_system 

350 or self.is_user 

351 or self._presence_sent 

352 or stanza["subject"] 

353 ): 

354 return 

355 if isinstance(stanza, Message): 

356 if "muc" in stanza: 

357 return 

358 self.send_initial_presence(full_jid) 

359 

360 @property 

361 def occupant_id(self) -> str: 

362 return self.stored.occupant_id 

363 

364 def _send( 

365 self, 

366 stanza: MessageOrPresenceTypeVar, 

367 full_jid: JID | None = None, 

368 archive_only: bool = False, 

369 legacy_msg_id: str | None = None, 

370 initial_presence: bool = False, 

371 **send_kwargs: Any, # noqa:ANN401 

372 ) -> MessageOrPresenceTypeVar: 

373 if stanza.get_from().resource: 

374 stanza["occupant-id"]["id"] = self.occupant_id 

375 else: 

376 stanza["occupant-id"]["id"] = "room" 

377 self.__add_nick_element(stanza) 

378 if not self.is_user and isinstance(stanza, Presence): 

379 if stanza["type"] == "unavailable" and not self._presence_sent: 

380 return stanza 

381 if initial_presence: 

382 self._presence_sent = True 

383 else: 

384 self._presence_sent = True 

385 if full_jid: 

386 stanza["to"] = full_jid 

387 self.__send_presence_if_needed(stanza, full_jid, archive_only) 

388 if self.is_user: 

389 assert stanza.stream is not None 

390 stanza.stream.send(stanza, use_filters=False) 

391 else: 

392 stanza.send() 

393 else: 

394 if hasattr(self.muc, "archive") and isinstance(stanza, Message): 

395 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id) 

396 if archive_only: 

397 return stanza 

398 for user_full_jid in self.muc.user_full_jids(): 

399 stanza = copy(stanza) 

400 stanza["to"] = user_full_jid 

401 self.__send_presence_if_needed(stanza, user_full_jid, archive_only) 

402 stanza.send() 

403 return stanza 

404 

405 def mucadmin_item(self) -> MUCAdminItem: 

406 item = MUCAdminItem() 

407 item["nick"] = self.nickname 

408 item["affiliation"] = self.affiliation 

409 item["role"] = self.role 

410 if not self.muc.is_anonymous: 

411 if self.is_user: 

412 item["jid"] = self.user_jid.bare 

413 elif self.contact: 

414 item["jid"] = self.contact.jid.bare 

415 else: 

416 warnings.warn( 

417 ( 

418 f"Private group but no contact JID associated to {self.jid} in" 

419 f" {self}" 

420 ), 

421 ) 

422 return item 

423 

424 def __add_nick_element(self, stanza: Presence | Message) -> None: 

425 if (nick := self.nickname_no_illegal) != self.jid.resource: 

426 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

427 n["nick"] = nick 

428 stanza.append(n) 

429 

430 def _get_last_presence(self) -> CachedPresence | None: 

431 own = super()._get_last_presence() 

432 if own is None and self.contact: 

433 return self.contact._get_last_presence() 

434 return own 

435 

436 def send_initial_presence( 

437 self, 

438 full_jid: JID, 

439 nick_change: bool = False, 

440 presence_id: str | None = None, 

441 mav_until: str | None = None, 

442 ) -> None: 

443 """ 

444 Called when the user joins a MUC, as a mechanism 

445 to indicate to the joining XMPP client the list of "participants". 

446 

447 Can be called this to trigger a "participant has joined the group" event. 

448 

449 :param full_jid: Set this to only send to a specific user XMPP resource. 

450 :param nick_change: Used when the user joins and the MUC renames them (code 210) 

451 :param presence_id: set the presence ID. used internally by slidge 

452 """ 

453 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes 

454 codes = set() 

455 if nick_change: 

456 codes.add(210) 

457 

458 if self.is_user: 

459 # the "initial presence" of the user has to be vanilla, as it is 

460 # a crucial part of the MUC join sequence for XMPP clients. 

461 kwargs = {} 

462 else: 

463 cache = self._get_last_presence() 

464 self.log.debug("Join muc, initial presence: %s", cache) 

465 if cache: 

466 ptype = cache.ptype 

467 if ptype == "unavailable": 

468 return 

469 kwargs = dict( 

470 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow 

471 ) 

472 else: 

473 kwargs = {} 

474 p = self._make_presence( 

475 status_codes=codes, 

476 user_full_jid=full_jid, 

477 **kwargs, # type:ignore 

478 ) 

479 if presence_id: 

480 p["id"] = presence_id 

481 if self.is_user and mav_until is not None: 

482 p["muc"]["mav"]["until"] = mav_until 

483 self._send(p, full_jid, initial_presence=True) 

484 

485 def leave(self) -> None: 

486 """ 

487 Call this when the participant leaves the room 

488 """ 

489 self.muc.remove_participant(self) 

490 

491 def kick(self, reason: str | None = None) -> None: 

492 """ 

493 Call this when the participant is kicked from the room 

494 """ 

495 self.muc.remove_participant(self, kick=True, reason=reason) 

496 

497 def ban(self, reason: str | None = None) -> None: 

498 """ 

499 Call this when the participant is banned from the room 

500 """ 

501 self.muc.remove_participant(self, ban=True, reason=reason) 

502 

503 async def get_disco_info( 

504 self, jid: OptJid = None, node: str | None = None 

505 ) -> DiscoInfo: 

506 if self.contact is not None: 

507 return await self.contact.get_disco_info() 

508 return await super().get_disco_info() 

509 

510 def moderate(self, legacy_msg_id: str, reason: str | None = None) -> None: 

511 for i in self._legacy_to_xmpp(legacy_msg_id): 

512 m = self.muc.get_system_participant()._make_message() 

513 m["retract"]["id"] = i 

514 if self.is_system: 

515 m["retract"].enable("moderated") 

516 else: 

517 m["retract"]["moderated"]["by"] = self.jid 

518 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id 

519 if reason: 

520 m["retract"]["reason"] = reason 

521 self._send(m) 

522 

523 def set_room_subject( 

524 self, 

525 subject: str, 

526 full_jid: JID | None = None, 

527 when: datetime | None = None, 

528 update_muc: bool = True, 

529 ) -> None: 

530 if update_muc: 

531 self.muc._subject = subject # type: ignore 

532 self.muc.subject_setter = self.nickname 

533 self.muc.subject_date = when 

534 

535 msg = self._make_message() 

536 if when is not None: 

537 msg["delay"].set_stamp(when) 

538 msg["delay"]["from"] = self.muc.jid 

539 if subject: 

540 msg["subject"] = subject 

541 else: 

542 # may be simplified if slixmpp lets it do it more easily some day 

543 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

544 self._send(msg, full_jid) 

545 

546 def set_thread_subject( 

547 self, 

548 thread: str, 

549 subject: str | None, 

550 when: datetime | None = None, 

551 ) -> None: 

552 msg = self._make_message() 

553 msg["thread"] = str(thread) 

554 if when is not None: 

555 msg["delay"].set_stamp(when) 

556 msg["delay"]["from"] = self.muc.jid 

557 if subject: 

558 msg["subject"] = subject 

559 else: 

560 # may be simplified if slixmpp lets it do it more easily some day 

561 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

562 self._send(msg) 

563 

564 async def on_set_affiliation( 

565 self, 

566 affiliation: MucAffiliation, 

567 reason: str | None, 

568 nickname: str | None, 

569 ) -> None: 

570 """ 

571 Triggered when the user requests changing the affiliation of a contact 

572 for this group. 

573 

574 Examples: promotion them to moderator, ban (affiliation=outcast). 

575 

576 :param contact: The contact whose affiliation change is requested 

577 :param affiliation: The new affiliation 

578 :param reason: A reason for this affiliation change 

579 :param nickname: 

580 """ 

581 raise NotImplementedError 

582 

583 async def on_kick(self, reason: str | None) -> None: 

584 """ 

585 Triggered when the user requests changing the role of a contact 

586 to "none" for this group. Action commonly known as "kick". 

587 

588 :param contact: Contact to be kicked 

589 :param reason: A reason for this kick 

590 """ 

591 raise NotImplementedError 

592 

593 async def on_invitation(self, reason: str | None) -> None: 

594 """ 

595 Triggered when the user invites this :term:`Contact <Legacy Contact>` 

596 to a legacy MUC via :xep:`0249`. 

597 

598 The default implementation calls :meth:`LegacyMUC.on_set_affiliation` 

599 with the 'member' affiliation. Override if you want to customize this 

600 behaviour. 

601 

602 :param muc: The group 

603 :param reason: Optionally, a reason 

604 """ 

605 # part = await self.muc.get_participant_by_contact(self) 

606 await self.on_set_affiliation("member", reason, None) 

607 

608 

609def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]: 

610 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ") 

611 

612 jid = JID(muc_jid) 

613 

614 try: 

615 jid.resource = nickname 

616 except InvalidJID: 

617 nickname = nickname.encode("punycode").decode() 

618 try: 

619 jid.resource = nickname 

620 except InvalidJID: 

621 # at this point there still might be control chars 

622 jid.resource = strip_non_printable(nickname) 

623 

624 return nickname_no_illegal, jid 

625 

626 

627log = logging.getLogger(__name__)