Coverage for slidge / group / participant.py: 87%

367 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1import logging 

2import string 

3import uuid 

4import warnings 

5from copy import copy 

6from datetime import datetime 

7from typing import TYPE_CHECKING, Any, Literal 

8from xml.etree import ElementTree as ET 

9 

10import sqlalchemy as sa 

11from slixmpp import JID, InvalidJID, Message, Presence 

12from slixmpp.plugins.xep_0030.stanza.info import DiscoInfo 

13from slixmpp.plugins.xep_0045.stanza import MUCAdminItem 

14from slixmpp.plugins.xep_0492.stanza import Never 

15from slixmpp.types import MessageTypes, OptJid 

16from sqlalchemy.orm.exc import DetachedInstanceError 

17 

18from ..contact import LegacyContact 

19from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin 

20from ..core.mixins.db import DBMixin 

21from ..db.models import Participant 

22from ..util import SubclassableOnce, strip_illegal_chars 

23from ..util.types import ( 

24 AnyMUC, 

25 CachedPresence, 

26 Hat, 

27 LegacyMessageType, 

28 LegacyThreadType, 

29 MessageOrPresenceTypeVar, 

30 MucAffiliation, 

31 MucRole, 

32) 

33 

34if TYPE_CHECKING: 

35 from slidge.command.base import ContactCommand 

36 

37 

38def strip_non_printable(nickname: str) -> str: 

39 new = ( 

40 "".join(x for x in nickname if x in string.printable) 

41 + f"-slidge-{hash(nickname)}" 

42 ) 

43 warnings.warn(f"Could not use {nickname} as a nickname, using {new}") 

44 return new 

45 

46 

47class LegacyParticipant( 

48 PresenceMixin, 

49 MessageMixin, 

50 ChatterDiscoMixin, 

51 DBMixin, 

52 SubclassableOnce, 

53): 

54 """ 

55 A legacy participant of a legacy group chat. 

56 """ 

57 

58 is_participant: Literal[True] = True 

59 

60 mtype: MessageTypes = "groupchat" 

61 _can_send_carbon = False 

62 USE_STANZA_ID = True 

63 STRIP_SHORT_DELAY = False 

64 stored: Participant 

65 contact: LegacyContact[Any] | None 

66 

67 def __init__( 

68 self, 

69 muc: AnyMUC, 

70 stored: Participant, 

71 is_system: bool = False, 

72 contact: LegacyContact[Any] | None = None, 

73 ) -> None: 

74 self.muc = muc 

75 self.session = muc.session 

76 self.xmpp = muc.session.xmpp 

77 self.is_system = is_system 

78 

79 if contact is None and stored.contact is not None: 

80 contact = self.session.contacts.from_store(stored=stored.contact) 

81 if contact is not None and stored.contact is None: 

82 stored.contact = contact.stored 

83 

84 self.stored = stored 

85 self.contact = contact 

86 

87 super().__init__() 

88 

89 if stored.resource is None: 

90 self.__update_resource(stored.nickname) 

91 

92 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}") 

93 

94 @property 

95 def is_user(self) -> bool: 

96 try: 

97 return self.stored.is_user 

98 except DetachedInstanceError: 

99 self.merge() 

100 return self.stored.is_user 

101 

102 @is_user.setter 

103 def is_user(self, is_user: bool) -> None: 

104 with self.xmpp.store.session(expire_on_commit=True) as orm: 

105 orm.add(self.stored) 

106 self.stored.is_user = is_user 

107 orm.commit() 

108 

109 @property 

110 def jid(self) -> JID: 

111 jid = JID(self.muc.jid) 

112 if self.stored.resource: 

113 jid.resource = self.stored.resource 

114 return jid 

115 

116 @jid.setter 

117 def jid(self, x: JID) -> None: 

118 # FIXME: without this, mypy yields 

119 # "Cannot override writeable attribute with read-only property" 

120 # But it does not happen for LegacyContact. WTF? 

121 raise RuntimeError 

122 

123 @property 

124 def commands(self) -> dict[str, "type[ContactCommand[Any]]"]: # type:ignore[override] 

125 if self.contact is None: 

126 return {} 

127 else: 

128 return self.contact.commands 

129 

130 def __should_commit(self) -> bool: 

131 if self.is_system: 

132 return False 

133 if self.muc.get_lock("fill participants"): 

134 return False 

135 return not self.muc.get_lock("fill history") 

136 

137 def commit(self) -> None: 

138 if not self.__should_commit(): 

139 return 

140 super().commit() 

141 

142 @property 

143 def user_jid(self) -> JID: 

144 return self.session.user_jid 

145 

146 def __repr__(self) -> str: 

147 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>" 

148 

149 @property 

150 def _presence_sent(self) -> bool: 

151 # we track if we already sent a presence for this participant. 

152 # if we didn't, we send it before the first message. 

153 # this way, event in plugins that don't map "user has joined" events, 

154 # we send a "join"-presence from the participant before the first message 

155 return self.stored.presence_sent 

156 

157 @_presence_sent.setter 

158 def _presence_sent(self, val: bool) -> None: 

159 if self._presence_sent == val: 

160 return 

161 self.stored.presence_sent = val 

162 if not self.__should_commit(): 

163 return 

164 with self.xmpp.store.session() as orm: 

165 orm.execute( 

166 sa.update(Participant) 

167 .where(Participant.id == self.stored.id) 

168 .values(presence_sent=val) 

169 ) 

170 orm.commit() 

171 

172 @property 

173 def nickname_no_illegal(self) -> str: 

174 return self.stored.nickname_no_illegal 

175 

176 @property 

177 def affiliation(self) -> MucAffiliation: 

178 return self.stored.affiliation 

179 

180 @affiliation.setter 

181 def affiliation(self, affiliation: MucAffiliation) -> None: 

182 if self.affiliation == affiliation: 

183 return 

184 was = self.stored.affiliation 

185 self.stored.affiliation = affiliation 

186 if not self.muc.participants_filled: 

187 return 

188 self.commit() 

189 if self.cached_presence is None or self.cached_presence.ptype == "unavailable": 

190 self.muc.send_affiliation_change(self, was) 

191 self.send_last_presence(force=True, no_cache_online=True) 

192 

193 @property 

194 def role(self) -> MucRole: 

195 return self.stored.role 

196 

197 @role.setter 

198 def role(self, role: MucRole) -> None: 

199 if self.role == role: 

200 return 

201 self.stored.role = role 

202 if not self.muc.participants_filled: 

203 return 

204 self.commit() 

205 if not self._presence_sent: 

206 return 

207 self.send_last_presence(force=True, no_cache_online=True) 

208 

209 @property 

210 def hats(self) -> list[Hat]: 

211 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else [] 

212 

213 def set_hats(self, hats: list[Hat]) -> None: 

214 if self.hats == hats: 

215 return 

216 self.stored.hats = hats 

217 if not self.muc.participants_filled: 

218 return 

219 self.commit() 

220 if not self._presence_sent: 

221 return 

222 self.send_last_presence(force=True, no_cache_online=True) 

223 

224 def __update_resource(self, unescaped_nickname: str | None) -> None: 

225 if not unescaped_nickname: 

226 self.stored.resource = "" 

227 if self.is_system: 

228 self.stored.nickname_no_illegal = "" 

229 else: 

230 warnings.warn( 

231 "Only the system participant is allowed to not have a nickname" 

232 ) 

233 nickname = f"unnamed-{uuid.uuid4()}" 

234 self.stored.resource = self.stored.nickname_no_illegal = nickname 

235 return 

236 

237 self.stored.nickname_no_illegal, jid = escape_nickname( 

238 self.muc.jid, 

239 unescaped_nickname, 

240 ) 

241 self.stored.resource = jid.resource 

242 

243 def send_configuration_change(self, codes: tuple[int, ...]) -> None: 

244 if not self.is_system: 

245 raise RuntimeError("This is only possible for the system participant") 

246 msg = self._make_message() 

247 msg["muc"]["status_codes"] = codes 

248 self._send(msg) 

249 

250 @property 

251 def nickname(self) -> str: 

252 return self.stored.nickname 

253 

254 @nickname.setter 

255 def nickname(self, new_nickname: str) -> None: 

256 old = self.nickname 

257 if new_nickname == old: 

258 return 

259 

260 if self.muc.stored.id is not None: 

261 with self.xmpp.store.session() as orm: 

262 if not self.xmpp.store.rooms.nick_available( 

263 orm, self.muc.stored.id, new_nickname 

264 ): 

265 if self.contact is None: 

266 new_nickname = f"{new_nickname} ({self.occupant_id})" 

267 else: 

268 new_nickname = f"{new_nickname} ({self.contact.legacy_id})" 

269 

270 cache = getattr(self, "_last_presence", None) 

271 if cache: 

272 last_seen = cache.last_seen 

273 kwargs = cache.presence_kwargs 

274 else: 

275 last_seen = None 

276 kwargs = {} 

277 

278 kwargs["status_codes"] = {303} 

279 

280 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs) 

281 # in this order so pfrom=old resource and we actually use the escaped nick 

282 # in the muc/item/nick element 

283 self.__update_resource(new_nickname) 

284 p["muc"]["item"]["nick"] = self.jid.resource 

285 self._send(p) 

286 

287 self.stored.nickname = new_nickname 

288 self.commit() 

289 kwargs["status_codes"] = set() 

290 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs) 

291 self._send(p) 

292 

293 def _make_presence( # type:ignore[no-untyped-def] 

294 self, 

295 *, 

296 last_seen: datetime | None = None, 

297 status_codes: set[int] | None = None, 

298 user_full_jid: JID | None = None, 

299 **presence_kwargs, # noqa type:ignore[no-untyped-def] 

300 ) -> Presence: 

301 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

302 p["muc"]["affiliation"] = self.affiliation 

303 p["muc"]["role"] = self.role 

304 if self.hats: 

305 p["hats"].add_hats(self.hats) 

306 codes = status_codes or set() 

307 if self.is_user: 

308 codes.add(110) 

309 if not self.muc.is_anonymous and not self.is_system: 

310 if self.is_user: 

311 if user_full_jid: 

312 p["muc"]["jid"] = user_full_jid 

313 else: 

314 jid = JID(self.user_jid) 

315 try: 

316 jid.resource = next(iter(self.muc.get_user_resources())) 

317 except StopIteration: 

318 jid.resource = "pseudo-resource" 

319 p["muc"]["jid"] = self.user_jid 

320 codes.add(100) 

321 elif self.contact: 

322 p["muc"]["jid"] = self.contact.jid 

323 if a := self.contact.get_avatar(): 

324 p["vcard_temp_update"]["photo"] = a.id 

325 else: 

326 warnings.warn( 

327 f"Private group but no 1:1 JID associated to '{self}'", 

328 ) 

329 if self.is_user and (hash_ := self.session.user.avatar_hash): 

330 p["vcard_temp_update"]["photo"] = hash_ 

331 p["muc"]["status_codes"] = codes 

332 return p 

333 

334 @property 

335 def DISCO_NAME(self) -> str: 

336 return self.nickname 

337 

338 @DISCO_NAME.setter 

339 def DISCO_NAME(self, _: str) -> Never: 

340 raise RuntimeError 

341 

342 def __send_presence_if_needed( 

343 self, stanza: Message | Presence, full_jid: JID, archive_only: bool 

344 ) -> None: 

345 if ( 

346 archive_only 

347 or self.is_system 

348 or self.is_user 

349 or self._presence_sent 

350 or stanza["subject"] 

351 ): 

352 return 

353 if isinstance(stanza, Message): 

354 if "muc" in stanza: 

355 return 

356 self.send_initial_presence(full_jid) 

357 

358 @property 

359 def occupant_id(self) -> str: 

360 return self.stored.occupant_id 

361 

362 def _send( 

363 self, 

364 stanza: MessageOrPresenceTypeVar, 

365 full_jid: JID | None = None, 

366 archive_only: bool = False, 

367 legacy_msg_id: LegacyMessageType | None = None, 

368 initial_presence: bool = False, 

369 **send_kwargs: Any, # noqa:ANN401 

370 ) -> MessageOrPresenceTypeVar: 

371 if stanza.get_from().resource: 

372 stanza["occupant-id"]["id"] = self.occupant_id 

373 else: 

374 stanza["occupant-id"]["id"] = "room" 

375 self.__add_nick_element(stanza) 

376 if not self.is_user and isinstance(stanza, Presence): 

377 if stanza["type"] == "unavailable" and not self._presence_sent: 

378 return stanza 

379 if initial_presence: 

380 self._presence_sent = True 

381 else: 

382 self._presence_sent = True 

383 if full_jid: 

384 stanza["to"] = full_jid 

385 self.__send_presence_if_needed(stanza, full_jid, archive_only) 

386 if self.is_user: 

387 assert stanza.stream is not None 

388 stanza.stream.send(stanza, use_filters=False) 

389 else: 

390 stanza.send() 

391 else: 

392 if hasattr(self.muc, "archive") and isinstance(stanza, Message): 

393 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id) 

394 if archive_only: 

395 return stanza 

396 for user_full_jid in self.muc.user_full_jids(): 

397 stanza = copy(stanza) 

398 stanza["to"] = user_full_jid 

399 self.__send_presence_if_needed(stanza, user_full_jid, archive_only) 

400 stanza.send() 

401 return stanza 

402 

403 def mucadmin_item(self) -> MUCAdminItem: 

404 item = MUCAdminItem() 

405 item["nick"] = self.nickname 

406 item["affiliation"] = self.affiliation 

407 item["role"] = self.role 

408 if not self.muc.is_anonymous: 

409 if self.is_user: 

410 item["jid"] = self.user_jid.bare 

411 elif self.contact: 

412 item["jid"] = self.contact.jid.bare 

413 else: 

414 warnings.warn( 

415 ( 

416 f"Private group but no contact JID associated to {self.jid} in" 

417 f" {self}" 

418 ), 

419 ) 

420 return item 

421 

422 def __add_nick_element(self, stanza: Presence | Message) -> None: 

423 if (nick := self.nickname_no_illegal) != self.jid.resource: 

424 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

425 n["nick"] = nick 

426 stanza.append(n) 

427 

428 def _get_last_presence(self) -> CachedPresence | None: 

429 own = super()._get_last_presence() 

430 if own is None and self.contact: 

431 return self.contact._get_last_presence() 

432 return own 

433 

434 def send_initial_presence( 

435 self, 

436 full_jid: JID, 

437 nick_change: bool = False, 

438 presence_id: str | None = None, 

439 mav_until: str | None = None, 

440 ) -> None: 

441 """ 

442 Called when the user joins a MUC, as a mechanism 

443 to indicate to the joining XMPP client the list of "participants". 

444 

445 Can be called this to trigger a "participant has joined the group" event. 

446 

447 :param full_jid: Set this to only send to a specific user XMPP resource. 

448 :param nick_change: Used when the user joins and the MUC renames them (code 210) 

449 :param presence_id: set the presence ID. used internally by slidge 

450 """ 

451 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes 

452 codes = set() 

453 if nick_change: 

454 codes.add(210) 

455 

456 if self.is_user: 

457 # the "initial presence" of the user has to be vanilla, as it is 

458 # a crucial part of the MUC join sequence for XMPP clients. 

459 kwargs = {} 

460 else: 

461 cache = self._get_last_presence() 

462 self.log.debug("Join muc, initial presence: %s", cache) 

463 if cache: 

464 ptype = cache.ptype 

465 if ptype == "unavailable": 

466 return 

467 kwargs = dict( 

468 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow 

469 ) 

470 else: 

471 kwargs = {} 

472 p = self._make_presence( 

473 status_codes=codes, 

474 user_full_jid=full_jid, 

475 **kwargs, # type:ignore 

476 ) 

477 if presence_id: 

478 p["id"] = presence_id 

479 if self.is_user and mav_until is not None: 

480 p["muc"]["mav"]["until"] = mav_until 

481 self._send(p, full_jid, initial_presence=True) 

482 

483 def leave(self) -> None: 

484 """ 

485 Call this when the participant leaves the room 

486 """ 

487 self.muc.remove_participant(self) 

488 

489 def kick(self, reason: str | None = None) -> None: 

490 """ 

491 Call this when the participant is kicked from the room 

492 """ 

493 self.muc.remove_participant(self, kick=True, reason=reason) 

494 

495 def ban(self, reason: str | None = None) -> None: 

496 """ 

497 Call this when the participant is banned from the room 

498 """ 

499 self.muc.remove_participant(self, ban=True, reason=reason) 

500 

501 async def get_disco_info( 

502 self, jid: OptJid = None, node: str | None = None 

503 ) -> DiscoInfo: 

504 if self.contact is not None: 

505 return await self.contact.get_disco_info() 

506 return await super().get_disco_info() 

507 

508 def moderate( 

509 self, legacy_msg_id: LegacyMessageType, reason: str | None = None 

510 ) -> None: 

511 for i in self._legacy_to_xmpp(legacy_msg_id): 

512 m = self.muc.get_system_participant()._make_message() 

513 m["retract"]["id"] = i 

514 if self.is_system: 

515 m["retract"].enable("moderated") 

516 else: 

517 m["retract"]["moderated"]["by"] = self.jid 

518 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id 

519 if reason: 

520 m["retract"]["reason"] = reason 

521 self._send(m) 

522 

523 def set_room_subject( 

524 self, 

525 subject: str, 

526 full_jid: JID | None = None, 

527 when: datetime | None = None, 

528 update_muc: bool = True, 

529 ) -> None: 

530 if update_muc: 

531 self.muc._subject = subject # type: ignore 

532 self.muc.subject_setter = self.nickname 

533 self.muc.subject_date = when 

534 

535 msg = self._make_message() 

536 if when is not None: 

537 msg["delay"].set_stamp(when) 

538 msg["delay"]["from"] = self.muc.jid 

539 if subject: 

540 msg["subject"] = subject 

541 else: 

542 # may be simplified if slixmpp lets it do it more easily some day 

543 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

544 self._send(msg, full_jid) 

545 

546 def set_thread_subject( 

547 self, 

548 thread: LegacyThreadType, 

549 subject: str | None, 

550 when: datetime | None = None, 

551 ) -> None: 

552 msg = self._make_message() 

553 msg["thread"] = str(thread) 

554 if when is not None: 

555 msg["delay"].set_stamp(when) 

556 msg["delay"]["from"] = self.muc.jid 

557 if subject: 

558 msg["subject"] = subject 

559 else: 

560 # may be simplified if slixmpp lets it do it more easily some day 

561 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

562 self._send(msg) 

563 

564 

565def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]: 

566 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ") 

567 

568 jid = JID(muc_jid) 

569 

570 try: 

571 jid.resource = nickname 

572 except InvalidJID: 

573 nickname = nickname.encode("punycode").decode() 

574 try: 

575 jid.resource = nickname 

576 except InvalidJID: 

577 # at this point there still might be control chars 

578 jid.resource = strip_non_printable(nickname) 

579 

580 return nickname_no_illegal, jid 

581 

582 

583log = logging.getLogger(__name__)