Coverage for slidge / group / participant.py: 89%

370 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import logging 

2import string 

3import uuid 

4import warnings 

5from copy import copy 

6from datetime import datetime 

7from typing import Any, Literal 

8from xml.etree import ElementTree as ET 

9 

10import sqlalchemy as sa 

11from slixmpp import JID, InvalidJID, Message, Presence 

12from slixmpp.plugins.xep_0030.stanza.info import DiscoInfo 

13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem 

14from slixmpp.plugins.xep_0492.stanza import Never 

15from slixmpp.types import MessageTypes, OptJid 

16from sqlalchemy.orm.exc import DetachedInstanceError 

17 

18from ..contact import LegacyContact 

19from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin 

20from ..core.mixins.db import DBMixin 

21from ..db.models import Participant 

22from ..util import SubclassableOnce, strip_illegal_chars 

23from ..util.types import ( 

24 AnyMUC, 

25 CachedPresence, 

26 Hat, 

27 LegacyMessageType, 

28 LegacyThreadType, 

29 MessageOrPresenceTypeVar, 

30 MucAffiliation, 

31 MucRole, 

32) 

33 

34 

35def strip_non_printable(nickname: str) -> str: 

36 new = ( 

37 "".join(x for x in nickname if x in string.printable) 

38 + f"-slidge-{hash(nickname)}" 

39 ) 

40 warnings.warn(f"Could not use {nickname} as a nickname, using {new}") 

41 return new 

42 

43 

44class LegacyParticipant( 

45 PresenceMixin, 

46 MessageMixin, 

47 ChatterDiscoMixin, 

48 DBMixin, 

49 SubclassableOnce, 

50): 

51 """ 

52 A legacy participant of a legacy group chat. 

53 """ 

54 

55 is_participant: Literal[True] = True 

56 

57 mtype: MessageTypes = "groupchat" 

58 _can_send_carbon = False 

59 USE_STANZA_ID = True 

60 STRIP_SHORT_DELAY = False 

61 stored: Participant 

62 contact: LegacyContact[Any] | None 

63 

64 def __init__( 

65 self, 

66 muc: AnyMUC, 

67 stored: Participant, 

68 is_system: bool = False, 

69 contact: LegacyContact[Any] | None = None, 

70 ) -> None: 

71 self.muc = muc 

72 self.session = muc.session 

73 self.xmpp = muc.session.xmpp 

74 self.is_system = is_system 

75 

76 if contact is None and stored.contact is not None: 

77 contact = self.session.contacts.from_store(stored=stored.contact) 

78 if contact is not None and stored.contact is None: 

79 stored.contact = contact.stored 

80 

81 self.stored = stored 

82 self.contact = contact 

83 

84 super().__init__() 

85 

86 if stored.resource is None: 

87 self.__update_resource(stored.nickname) 

88 

89 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}") 

90 

91 @property 

92 def is_user(self) -> bool: 

93 try: 

94 return self.stored.is_user 

95 except DetachedInstanceError: 

96 self.merge() 

97 return self.stored.is_user 

98 

99 @is_user.setter 

100 def is_user(self, is_user: bool) -> None: 

101 with self.xmpp.store.session(expire_on_commit=True) as orm: 

102 orm.add(self.stored) 

103 self.stored.is_user = is_user 

104 orm.commit() 

105 

106 @property 

107 def jid(self) -> JID: 

108 jid = JID(self.muc.jid) 

109 if self.stored.resource: 

110 jid.resource = self.stored.resource 

111 return jid 

112 

113 @jid.setter 

114 def jid(self, x: JID) -> None: 

115 # FIXME: without this, mypy yields 

116 # "Cannot override writeable attribute with read-only property" 

117 # But it does not happen for LegacyContact. WTF? 

118 raise RuntimeError 

119 

120 def __should_commit(self) -> bool: 

121 if self.is_system: 

122 return False 

123 if self.muc.get_lock("fill participants"): 

124 return False 

125 if self.muc.get_lock("fill history"): 

126 return False 

127 return True 

128 

129 def commit(self, merge: bool = False) -> None: 

130 if not self.__should_commit(): 

131 return 

132 super().commit(merge) 

133 

134 @property 

135 def user_jid(self) -> JID: 

136 return self.session.user_jid 

137 

138 def __repr__(self) -> str: 

139 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>" 

140 

141 @property 

142 def _presence_sent(self) -> bool: 

143 # we track if we already sent a presence for this participant. 

144 # if we didn't, we send it before the first message. 

145 # this way, event in plugins that don't map "user has joined" events, 

146 # we send a "join"-presence from the participant before the first message 

147 return self.stored.presence_sent 

148 

149 @_presence_sent.setter 

150 def _presence_sent(self, val: bool) -> None: 

151 if self._presence_sent == val: 

152 return 

153 self.stored.presence_sent = val 

154 if not self.__should_commit(): 

155 return 

156 with self.xmpp.store.session() as orm: 

157 orm.execute( 

158 sa.update(Participant) 

159 .where(Participant.id == self.stored.id) 

160 .values(presence_sent=val) 

161 ) 

162 orm.commit() 

163 

164 @property 

165 def nickname_no_illegal(self) -> str: 

166 return self.stored.nickname_no_illegal 

167 

168 @property 

169 def affiliation(self) -> MucAffiliation: 

170 return self.stored.affiliation 

171 

172 @affiliation.setter 

173 def affiliation(self, affiliation: MucAffiliation) -> None: 

174 if self.affiliation == affiliation: 

175 return 

176 self.stored.affiliation = affiliation 

177 if not self.muc.participants_filled: 

178 return 

179 self.commit() 

180 if not self._presence_sent: 

181 return 

182 self.send_last_presence(force=True, no_cache_online=True) 

183 

184 def send_affiliation_change(self) -> None: 

185 # internal use by slidge 

186 msg = self._make_message() 

187 msg["muc"]["affiliation"] = self.affiliation 

188 msg["type"] = "normal" 

189 if not self.muc.is_anonymous and not self.is_system: 

190 if self.contact: 

191 msg["muc"]["jid"] = self.contact.jid 

192 else: 

193 warnings.warn( 

194 f"Private group but no 1:1 JID associated to '{self}'", 

195 ) 

196 self._send(msg) 

197 

198 @property 

199 def role(self) -> MucRole: 

200 return self.stored.role 

201 

202 @role.setter 

203 def role(self, role: MucRole) -> None: 

204 if self.role == role: 

205 return 

206 self.stored.role = role 

207 if not self.muc.participants_filled: 

208 return 

209 self.commit() 

210 if not self._presence_sent: 

211 return 

212 self.send_last_presence(force=True, no_cache_online=True) 

213 

214 @property 

215 def hats(self) -> list[Hat]: 

216 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else [] 

217 

218 def set_hats(self, hats: list[Hat]) -> None: 

219 if self.hats == hats: 

220 return 

221 self.stored.hats = hats 

222 if not self.muc.participants_filled: 

223 return 

224 self.commit(merge=True) 

225 if not self._presence_sent: 

226 return 

227 self.send_last_presence(force=True, no_cache_online=True) 

228 

229 def __update_resource(self, unescaped_nickname: str | None) -> None: 

230 if not unescaped_nickname: 

231 self.stored.resource = "" 

232 if self.is_system: 

233 self.stored.nickname_no_illegal = "" 

234 else: 

235 warnings.warn( 

236 "Only the system participant is allowed to not have a nickname" 

237 ) 

238 nickname = f"unnamed-{uuid.uuid4()}" 

239 self.stored.resource = self.stored.nickname_no_illegal = nickname 

240 return 

241 

242 self.stored.nickname_no_illegal, jid = escape_nickname( 

243 self.muc.jid, 

244 unescaped_nickname, 

245 ) 

246 self.stored.resource = jid.resource 

247 

248 def send_configuration_change(self, codes: tuple[int, ...]) -> None: 

249 if not self.is_system: 

250 raise RuntimeError("This is only possible for the system participant") 

251 msg = self._make_message() 

252 msg["muc"]["status_codes"] = codes 

253 self._send(msg) 

254 

255 @property 

256 def nickname(self) -> str: 

257 return self.stored.nickname 

258 

259 @nickname.setter 

260 def nickname(self, new_nickname: str) -> None: 

261 old = self.nickname 

262 if new_nickname == old: 

263 return 

264 

265 if self.muc.stored.id is not None: 

266 with self.xmpp.store.session() as orm: 

267 if not self.xmpp.store.rooms.nick_available( 

268 orm, self.muc.stored.id, new_nickname 

269 ): 

270 if self.contact is None: 

271 new_nickname = f"{new_nickname} ({self.occupant_id})" 

272 else: 

273 new_nickname = f"{new_nickname} ({self.contact.legacy_id})" 

274 

275 cache = getattr(self, "_last_presence", None) 

276 if cache: 

277 last_seen = cache.last_seen 

278 kwargs = cache.presence_kwargs 

279 else: 

280 last_seen = None 

281 kwargs = {} 

282 

283 kwargs["status_codes"] = {303} 

284 

285 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs) 

286 # in this order so pfrom=old resource and we actually use the escaped nick 

287 # in the muc/item/nick element 

288 self.__update_resource(new_nickname) 

289 p["muc"]["item"]["nick"] = self.jid.resource 

290 self._send(p) 

291 

292 self.stored.nickname = new_nickname 

293 self.commit() 

294 kwargs["status_codes"] = set() 

295 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs) 

296 self._send(p) 

297 

298 def _make_presence( # type:ignore[no-untyped-def] 

299 self, 

300 *, 

301 last_seen: datetime | None = None, 

302 status_codes: set[int] | None = None, 

303 user_full_jid: JID | None = None, 

304 **presence_kwargs, # noqa type:ignore[no-untyped-def] 

305 ) -> Presence: 

306 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

307 p["muc"]["affiliation"] = self.affiliation 

308 p["muc"]["role"] = self.role 

309 if self.hats: 

310 p["hats"].add_hats(self.hats) 

311 codes = status_codes or set() 

312 if self.is_user: 

313 codes.add(110) 

314 if not self.muc.is_anonymous and not self.is_system: 

315 if self.is_user: 

316 if user_full_jid: 

317 p["muc"]["jid"] = user_full_jid 

318 else: 

319 jid = JID(self.user_jid) 

320 try: 

321 jid.resource = next(iter(self.muc.get_user_resources())) 

322 except StopIteration: 

323 jid.resource = "pseudo-resource" 

324 p["muc"]["jid"] = self.user_jid 

325 codes.add(100) 

326 elif self.contact: 

327 p["muc"]["jid"] = self.contact.jid 

328 if a := self.contact.get_avatar(): 

329 p["vcard_temp_update"]["photo"] = a.id 

330 else: 

331 warnings.warn( 

332 f"Private group but no 1:1 JID associated to '{self}'", 

333 ) 

334 if self.is_user and (hash_ := self.session.user.avatar_hash): 

335 p["vcard_temp_update"]["photo"] = hash_ 

336 p["muc"]["status_codes"] = codes 

337 return p 

338 

339 @property 

340 def DISCO_NAME(self) -> str: 

341 return self.nickname 

342 

343 @DISCO_NAME.setter 

344 def DISCO_NAME(self, _: str) -> Never: 

345 raise RuntimeError 

346 

347 def __send_presence_if_needed( 

348 self, stanza: Message | Presence, full_jid: JID, archive_only: bool 

349 ) -> None: 

350 if ( 

351 archive_only 

352 or self.is_system 

353 or self.is_user 

354 or self._presence_sent 

355 or stanza["subject"] 

356 ): 

357 return 

358 if isinstance(stanza, Message): 

359 if stanza.get_plugin("muc", check=True): 

360 return 

361 self.send_initial_presence(full_jid) 

362 

363 @property 

364 def occupant_id(self) -> str: 

365 return self.stored.occupant_id 

366 

367 def _send( 

368 self, 

369 stanza: MessageOrPresenceTypeVar, 

370 full_jid: JID | None = None, 

371 archive_only: bool = False, 

372 legacy_msg_id: LegacyMessageType | None = None, 

373 initial_presence: bool = False, 

374 **send_kwargs: Any, # noqa:ANN401 

375 ) -> MessageOrPresenceTypeVar: 

376 if stanza.get_from().resource: 

377 stanza["occupant-id"]["id"] = self.occupant_id 

378 else: 

379 stanza["occupant-id"]["id"] = "room" 

380 self.__add_nick_element(stanza) 

381 if not self.is_user and isinstance(stanza, Presence): 

382 if stanza["type"] == "unavailable" and not self._presence_sent: 

383 return stanza 

384 if initial_presence: 

385 self.stored.presence_sent = True 

386 else: 

387 self._presence_sent = True 

388 if full_jid: 

389 stanza["to"] = full_jid 

390 self.__send_presence_if_needed(stanza, full_jid, archive_only) 

391 if self.is_user: 

392 assert stanza.stream is not None 

393 stanza.stream.send(stanza, use_filters=False) 

394 else: 

395 stanza.send() 

396 else: 

397 if hasattr(self.muc, "archive") and isinstance(stanza, Message): 

398 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id) 

399 if archive_only: 

400 return stanza 

401 for user_full_jid in self.muc.user_full_jids(): 

402 stanza = copy(stanza) 

403 stanza["to"] = user_full_jid 

404 self.__send_presence_if_needed(stanza, user_full_jid, archive_only) 

405 stanza.send() 

406 return stanza 

407 

408 def mucadmin_item(self) -> MUCAdminItem: 

409 item = MUCAdminItem() 

410 item["nick"] = self.nickname 

411 item["affiliation"] = self.affiliation 

412 item["role"] = self.role 

413 if not self.muc.is_anonymous: 

414 if self.is_user: 

415 item["jid"] = self.user_jid.bare 

416 elif self.contact: 

417 item["jid"] = self.contact.jid.bare 

418 else: 

419 warnings.warn( 

420 ( 

421 f"Private group but no contact JID associated to {self.jid} in" 

422 f" {self}" 

423 ), 

424 ) 

425 return item 

426 

427 def __add_nick_element(self, stanza: Presence | Message) -> None: 

428 if (nick := self.nickname_no_illegal) != self.jid.resource: 

429 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

430 n["nick"] = nick 

431 stanza.append(n) 

432 

433 def _get_last_presence(self) -> CachedPresence | None: 

434 own = super()._get_last_presence() 

435 if own is None and self.contact: 

436 return self.contact._get_last_presence() 

437 return own 

438 

439 def send_initial_presence( 

440 self, 

441 full_jid: JID, 

442 nick_change: bool = False, 

443 presence_id: str | None = None, 

444 ) -> None: 

445 """ 

446 Called when the user joins a MUC, as a mechanism 

447 to indicate to the joining XMPP client the list of "participants". 

448 

449 Can be called this to trigger a "participant has joined the group" event. 

450 

451 :param full_jid: Set this to only send to a specific user XMPP resource. 

452 :param nick_change: Used when the user joins and the MUC renames them (code 210) 

453 :param presence_id: set the presence ID. used internally by slidge 

454 """ 

455 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes 

456 codes = set() 

457 if nick_change: 

458 codes.add(210) 

459 

460 if self.is_user: 

461 # the "initial presence" of the user has to be vanilla, as it is 

462 # a crucial part of the MUC join sequence for XMPP clients. 

463 kwargs = {} 

464 else: 

465 cache = self._get_last_presence() 

466 self.log.debug("Join muc, initial presence: %s", cache) 

467 if cache: 

468 ptype = cache.ptype 

469 if ptype == "unavailable": 

470 return 

471 kwargs = dict( 

472 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow 

473 ) 

474 else: 

475 kwargs = {} 

476 p = self._make_presence( 

477 status_codes=codes, 

478 user_full_jid=full_jid, 

479 **kwargs, # type:ignore 

480 ) 

481 if presence_id: 

482 p["id"] = presence_id 

483 self._send(p, full_jid, initial_presence=True) 

484 

485 def leave(self) -> None: 

486 """ 

487 Call this when the participant leaves the room 

488 """ 

489 self.muc.remove_participant(self) 

490 

491 def kick(self, reason: str | None = None) -> None: 

492 """ 

493 Call this when the participant is kicked from the room 

494 """ 

495 self.muc.remove_participant(self, kick=True, reason=reason) 

496 

497 def ban(self, reason: str | None = None) -> None: 

498 """ 

499 Call this when the participant is banned from the room 

500 """ 

501 self.muc.remove_participant(self, ban=True, reason=reason) 

502 

503 async def get_disco_info( 

504 self, jid: OptJid = None, node: str | None = None 

505 ) -> DiscoInfo: 

506 if self.contact is not None: 

507 return await self.contact.get_disco_info() 

508 return await super().get_disco_info() 

509 

510 def moderate( 

511 self, legacy_msg_id: LegacyMessageType, reason: str | None = None 

512 ) -> None: 

513 for i in self._legacy_to_xmpp(legacy_msg_id): 

514 m = self.muc.get_system_participant()._make_message() 

515 m["retract"]["id"] = i 

516 if self.is_system: 

517 m["retract"].enable("moderated") 

518 else: 

519 m["retract"]["moderated"]["by"] = self.jid 

520 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id 

521 if reason: 

522 m["retract"]["reason"] = reason 

523 self._send(m) 

524 

525 def set_room_subject( 

526 self, 

527 subject: str, 

528 full_jid: JID | None = None, 

529 when: datetime | None = None, 

530 update_muc: bool = True, 

531 ) -> None: 

532 if update_muc: 

533 self.muc._subject = subject # type: ignore 

534 self.muc.subject_setter = self.nickname 

535 self.muc.subject_date = when 

536 

537 msg = self._make_message() 

538 if when is not None: 

539 msg["delay"].set_stamp(when) 

540 msg["delay"]["from"] = self.muc.jid 

541 if subject: 

542 msg["subject"] = subject 

543 else: 

544 # may be simplified if slixmpp lets it do it more easily some day 

545 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

546 self._send(msg, full_jid) 

547 

548 def set_thread_subject( 

549 self, 

550 thread: LegacyThreadType, 

551 subject: str | None, 

552 when: datetime | None = None, 

553 ) -> None: 

554 msg = self._make_message() 

555 msg["thread"] = str(thread) 

556 if when is not None: 

557 msg["delay"].set_stamp(when) 

558 msg["delay"]["from"] = self.muc.jid 

559 if subject: 

560 msg["subject"] = subject 

561 else: 

562 # may be simplified if slixmpp lets it do it more easily some day 

563 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

564 self._send(msg) 

565 

566 

567def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]: 

568 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ") 

569 

570 jid = JID(muc_jid) 

571 

572 try: 

573 jid.resource = nickname 

574 except InvalidJID: 

575 nickname = nickname.encode("punycode").decode() 

576 try: 

577 jid.resource = nickname 

578 except InvalidJID: 

579 # at this point there still might be control chars 

580 jid.resource = strip_non_printable(nickname) 

581 

582 return nickname_no_illegal, jid 

583 

584 

585log = logging.getLogger(__name__)