Coverage for slidge / group / participant.py: 89%

365 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-01-06 15:18 +0000

1import logging 

2import string 

3import uuid 

4import warnings 

5from copy import copy 

6from datetime import datetime 

7from typing import TYPE_CHECKING, Any, Optional, Union 

8from xml.etree import ElementTree as ET 

9 

10import sqlalchemy as sa 

11from slixmpp import JID, InvalidJID, Message, Presence 

12from slixmpp.plugins.xep_0045.stanza import MUCAdminItem 

13from slixmpp.types import MessageTypes, OptJid 

14from sqlalchemy.orm.exc import DetachedInstanceError 

15 

16from ..contact import LegacyContact 

17from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin 

18from ..core.mixins.db import DBMixin 

19from ..db.models import Participant 

20from ..util import SubclassableOnce, strip_illegal_chars 

21from ..util.types import ( 

22 CachedPresence, 

23 Hat, 

24 LegacyMessageType, 

25 LegacyThreadType, 

26 MessageOrPresenceTypeVar, 

27 MucAffiliation, 

28 MucRole, 

29) 

30 

31if TYPE_CHECKING: 

32 from .room import LegacyMUC 

33 

34 

35def strip_non_printable(nickname: str): 

36 new = ( 

37 "".join(x for x in nickname if x in string.printable) 

38 + f"-slidge-{hash(nickname)}" 

39 ) 

40 warnings.warn(f"Could not use {nickname} as a nickname, using {new}") 

41 return new 

42 

43 

44class LegacyParticipant( 

45 PresenceMixin, 

46 MessageMixin, 

47 ChatterDiscoMixin, 

48 DBMixin, 

49 metaclass=SubclassableOnce, 

50): 

51 """ 

52 A legacy participant of a legacy group chat. 

53 """ 

54 

55 is_participant = True 

56 

57 mtype: MessageTypes = "groupchat" 

58 _can_send_carbon = False 

59 USE_STANZA_ID = True 

60 STRIP_SHORT_DELAY = False 

61 stored: Participant 

62 contact: LegacyContact[Any] | None 

63 

64 def __init__( 

65 self, 

66 muc: "LegacyMUC", 

67 stored: Participant, 

68 is_system: bool = False, 

69 contact: LegacyContact[Any] | None = None, 

70 ) -> None: 

71 self.muc = muc 

72 self.session = muc.session 

73 self.xmpp = muc.session.xmpp 

74 self.is_system = is_system 

75 

76 if contact is None and stored.contact is not None: 

77 contact = self.session.contacts.from_store(stored=stored.contact) 

78 if contact is not None and stored.contact is None: 

79 stored.contact = contact.stored 

80 

81 self.stored = stored 

82 self.contact = contact 

83 

84 super().__init__() 

85 

86 if stored.resource is None: 

87 self.__update_resource(stored.nickname) 

88 

89 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}") 

90 

91 @property 

92 def is_user(self) -> bool: 

93 try: 

94 return self.stored.is_user 

95 except DetachedInstanceError: 

96 self.merge() 

97 return self.stored.is_user 

98 

99 @is_user.setter 

100 def is_user(self, is_user: bool) -> None: 

101 with self.xmpp.store.session(expire_on_commit=True) as orm: 

102 orm.add(self.stored) 

103 self.stored.is_user = is_user 

104 orm.commit() 

105 

106 @property 

107 def jid(self) -> JID: 

108 jid = JID(self.muc.jid) 

109 if self.stored.resource: 

110 jid.resource = self.stored.resource 

111 return jid 

112 

113 @jid.setter 

114 def jid(self, x: JID): 

115 # FIXME: without this, mypy yields 

116 # "Cannot override writeable attribute with read-only property" 

117 # But it does not happen for LegacyContact. WTF? 

118 raise RuntimeError 

119 

120 def __should_commit(self) -> bool: 

121 if self.is_system: 

122 return False 

123 if self.muc.get_lock("fill participants"): 

124 return False 

125 if self.muc.get_lock("fill history"): 

126 return False 

127 return True 

128 

129 def commit(self, *args, **kwargs) -> None: 

130 if not self.__should_commit(): 

131 return 

132 super().commit(*args, **kwargs) 

133 

134 @property 

135 def user_jid(self): 

136 return self.session.user_jid 

137 

138 def __repr__(self) -> str: 

139 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>" 

140 

141 @property 

142 def _presence_sent(self) -> bool: 

143 # we track if we already sent a presence for this participant. 

144 # if we didn't, we send it before the first message. 

145 # this way, event in plugins that don't map "user has joined" events, 

146 # we send a "join"-presence from the participant before the first message 

147 return self.stored.presence_sent 

148 

149 @_presence_sent.setter 

150 def _presence_sent(self, val: bool) -> None: 

151 if self._presence_sent == val: 

152 return 

153 self.stored.presence_sent = val 

154 if not self.__should_commit(): 

155 return 

156 with self.xmpp.store.session() as orm: 

157 orm.execute( 

158 sa.update(Participant) 

159 .where(Participant.id == self.stored.id) 

160 .values(presence_sent=val) 

161 ) 

162 orm.commit() 

163 

164 @property 

165 def nickname_no_illegal(self) -> str: 

166 return self.stored.nickname_no_illegal 

167 

168 @property 

169 def affiliation(self): 

170 return self.stored.affiliation 

171 

172 @affiliation.setter 

173 def affiliation(self, affiliation: MucAffiliation) -> None: 

174 if self.affiliation == affiliation: 

175 return 

176 self.stored.affiliation = affiliation 

177 if not self.muc.participants_filled: 

178 return 

179 self.commit() 

180 if not self._presence_sent: 

181 return 

182 self.send_last_presence(force=True, no_cache_online=True) 

183 

184 def send_affiliation_change(self) -> None: 

185 # internal use by slidge 

186 msg = self._make_message() 

187 msg["muc"]["affiliation"] = self.affiliation 

188 msg["type"] = "normal" 

189 if not self.muc.is_anonymous and not self.is_system: 

190 if self.contact: 

191 msg["muc"]["jid"] = self.contact.jid 

192 else: 

193 warnings.warn( 

194 f"Private group but no 1:1 JID associated to '{self}'", 

195 ) 

196 self._send(msg) 

197 

198 @property 

199 def role(self): 

200 return self.stored.role 

201 

202 @role.setter 

203 def role(self, role: MucRole) -> None: 

204 if self.role == role: 

205 return 

206 self.stored.role = role 

207 if not self.muc.participants_filled: 

208 return 

209 self.commit() 

210 if not self._presence_sent: 

211 return 

212 self.send_last_presence(force=True, no_cache_online=True) 

213 

214 @property 

215 def hats(self) -> list[Hat]: 

216 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else [] 

217 

218 def set_hats(self, hats: list[Hat]) -> None: 

219 if self.hats == hats: 

220 return 

221 self.stored.hats = hats # type:ignore[assignment] 

222 if not self.muc.participants_filled: 

223 return 

224 self.commit(merge=True) 

225 if not self._presence_sent: 

226 return 

227 self.send_last_presence(force=True, no_cache_online=True) 

228 

229 def __update_resource(self, unescaped_nickname: Optional[str]) -> None: 

230 if not unescaped_nickname: 

231 self.stored.resource = "" 

232 if self.is_system: 

233 self.stored.nickname_no_illegal = "" 

234 else: 

235 warnings.warn( 

236 "Only the system participant is allowed to not have a nickname" 

237 ) 

238 nickname = f"unnamed-{uuid.uuid4()}" 

239 self.stored.resource = self.stored.nickname_no_illegal = nickname 

240 return 

241 

242 self.stored.nickname_no_illegal, jid = escape_nickname( 

243 self.muc.jid, 

244 unescaped_nickname, 

245 ) 

246 self.stored.resource = jid.resource 

247 

248 def send_configuration_change(self, codes: tuple[int]): 

249 if not self.is_system: 

250 raise RuntimeError("This is only possible for the system participant") 

251 msg = self._make_message() 

252 msg["muc"]["status_codes"] = codes 

253 self._send(msg) 

254 

255 @property 

256 def nickname(self): 

257 return self.stored.nickname 

258 

259 @nickname.setter 

260 def nickname(self, new_nickname: str) -> None: 

261 old = self.nickname 

262 if new_nickname == old: 

263 return 

264 

265 if self.muc.stored.id is not None: 

266 with self.xmpp.store.session() as orm: 

267 if not self.xmpp.store.rooms.nick_available( 

268 orm, self.muc.stored.id, new_nickname 

269 ): 

270 if self.contact is None: 

271 new_nickname = f"{new_nickname} ({self.occupant_id})" 

272 else: 

273 new_nickname = f"{new_nickname} ({self.contact.legacy_id})" 

274 

275 cache = getattr(self, "_last_presence", None) 

276 if cache: 

277 last_seen = cache.last_seen 

278 kwargs = cache.presence_kwargs 

279 else: 

280 last_seen = None 

281 kwargs = {} 

282 

283 kwargs["status_codes"] = {303} 

284 

285 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs) 

286 # in this order so pfrom=old resource and we actually use the escaped nick 

287 # in the muc/item/nick element 

288 self.__update_resource(new_nickname) 

289 p["muc"]["item"]["nick"] = self.jid.resource 

290 self._send(p) 

291 

292 self.stored.nickname = new_nickname 

293 self.commit() 

294 kwargs["status_codes"] = set() 

295 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs) 

296 self._send(p) 

297 

298 def _make_presence( 

299 self, 

300 *, 

301 last_seen: Optional[datetime] = None, 

302 status_codes: Optional[set[int]] = None, 

303 user_full_jid: Optional[JID] = None, 

304 **presence_kwargs, 

305 ): 

306 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

307 p["muc"]["affiliation"] = self.affiliation 

308 p["muc"]["role"] = self.role 

309 if self.hats: 

310 p["hats"].add_hats(self.hats) 

311 codes = status_codes or set() 

312 if self.is_user: 

313 codes.add(110) 

314 if not self.muc.is_anonymous and not self.is_system: 

315 if self.is_user: 

316 if user_full_jid: 

317 p["muc"]["jid"] = user_full_jid 

318 else: 

319 jid = JID(self.user_jid) 

320 try: 

321 jid.resource = next(iter(self.muc.get_user_resources())) 

322 except StopIteration: 

323 jid.resource = "pseudo-resource" 

324 p["muc"]["jid"] = self.user_jid 

325 codes.add(100) 

326 elif self.contact: 

327 p["muc"]["jid"] = self.contact.jid 

328 if a := self.contact.get_avatar(): 

329 p["vcard_temp_update"]["photo"] = a.id 

330 else: 

331 warnings.warn( 

332 f"Private group but no 1:1 JID associated to '{self}'", 

333 ) 

334 if self.is_user and (hash_ := self.session.user.avatar_hash): 

335 p["vcard_temp_update"]["photo"] = hash_ 

336 p["muc"]["status_codes"] = codes 

337 return p 

338 

339 @property 

340 def DISCO_NAME(self): # type:ignore[override] 

341 return self.nickname 

342 

343 def __send_presence_if_needed( 

344 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool 

345 ) -> None: 

346 if ( 

347 archive_only 

348 or self.is_system 

349 or self.is_user 

350 or self._presence_sent 

351 or stanza["subject"] 

352 ): 

353 return 

354 if isinstance(stanza, Message): 

355 if stanza.get_plugin("muc", check=True): 

356 return 

357 self.send_initial_presence(full_jid) 

358 

359 @property 

360 def occupant_id(self) -> str: 

361 return self.stored.occupant_id 

362 

363 def _send( 

364 self, 

365 stanza: MessageOrPresenceTypeVar, 

366 full_jid: Optional[JID] = None, 

367 archive_only: bool = False, 

368 legacy_msg_id=None, 

369 initial_presence=False, 

370 **send_kwargs, 

371 ) -> MessageOrPresenceTypeVar: 

372 if stanza.get_from().resource: 

373 stanza["occupant-id"]["id"] = self.occupant_id 

374 else: 

375 stanza["occupant-id"]["id"] = "room" 

376 self.__add_nick_element(stanza) 

377 if not self.is_user and isinstance(stanza, Presence): 

378 if stanza["type"] == "unavailable" and not self._presence_sent: 

379 return stanza # type:ignore 

380 if initial_presence: 

381 self.stored.presence_sent = True 

382 else: 

383 self._presence_sent = True 

384 if full_jid: 

385 stanza["to"] = full_jid 

386 self.__send_presence_if_needed(stanza, full_jid, archive_only) 

387 if self.is_user: 

388 assert stanza.stream is not None 

389 stanza.stream.send(stanza, use_filters=False) 

390 else: 

391 stanza.send() 

392 else: 

393 if hasattr(self.muc, "archive") and isinstance(stanza, Message): 

394 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id) 

395 if archive_only: 

396 return stanza 

397 for user_full_jid in self.muc.user_full_jids(): 

398 stanza = copy(stanza) 

399 stanza["to"] = user_full_jid 

400 self.__send_presence_if_needed(stanza, user_full_jid, archive_only) 

401 stanza.send() 

402 return stanza 

403 

404 def mucadmin_item(self): 

405 item = MUCAdminItem() 

406 item["nick"] = self.nickname 

407 item["affiliation"] = self.affiliation 

408 item["role"] = self.role 

409 if not self.muc.is_anonymous: 

410 if self.is_user: 

411 item["jid"] = self.user_jid.bare 

412 elif self.contact: 

413 item["jid"] = self.contact.jid.bare 

414 else: 

415 warnings.warn( 

416 ( 

417 f"Private group but no contact JID associated to {self.jid} in" 

418 f" {self}" 

419 ), 

420 ) 

421 return item 

422 

423 def __add_nick_element(self, stanza: Union[Presence, Message]) -> None: 

424 if (nick := self.nickname_no_illegal) != self.jid.resource: 

425 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

426 n["nick"] = nick 

427 stanza.append(n) 

428 

429 def _get_last_presence(self) -> Optional[CachedPresence]: 

430 own = super()._get_last_presence() 

431 if own is None and self.contact: 

432 return self.contact._get_last_presence() 

433 return own 

434 

435 def send_initial_presence( 

436 self, 

437 full_jid: JID, 

438 nick_change: bool = False, 

439 presence_id: Optional[str] = None, 

440 ) -> None: 

441 """ 

442 Called when the user joins a MUC, as a mechanism 

443 to indicate to the joining XMPP client the list of "participants". 

444 

445 Can be called this to trigger a "participant has joined the group" event. 

446 

447 :param full_jid: Set this to only send to a specific user XMPP resource. 

448 :param nick_change: Used when the user joins and the MUC renames them (code 210) 

449 :param presence_id: set the presence ID. used internally by slidge 

450 """ 

451 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes 

452 codes = set() 

453 if nick_change: 

454 codes.add(210) 

455 

456 if self.is_user: 

457 # the "initial presence" of the user has to be vanilla, as it is 

458 # a crucial part of the MUC join sequence for XMPP clients. 

459 kwargs = {} 

460 else: 

461 cache = self._get_last_presence() 

462 self.log.debug("Join muc, initial presence: %s", cache) 

463 if cache: 

464 ptype = cache.ptype 

465 if ptype == "unavailable": 

466 return 

467 kwargs = dict( 

468 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow 

469 ) 

470 else: 

471 kwargs = {} 

472 p = self._make_presence( 

473 status_codes=codes, 

474 user_full_jid=full_jid, 

475 **kwargs, # type:ignore 

476 ) 

477 if presence_id: 

478 p["id"] = presence_id 

479 self._send(p, full_jid, initial_presence=True) 

480 

481 def leave(self) -> None: 

482 """ 

483 Call this when the participant leaves the room 

484 """ 

485 self.muc.remove_participant(self) 

486 

487 def kick(self, reason: str | None = None) -> None: 

488 """ 

489 Call this when the participant is kicked from the room 

490 """ 

491 self.muc.remove_participant(self, kick=True, reason=reason) 

492 

493 def ban(self, reason: str | None = None) -> None: 

494 """ 

495 Call this when the participant is banned from the room 

496 """ 

497 self.muc.remove_participant(self, ban=True, reason=reason) 

498 

499 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None): 

500 if self.contact is not None: 

501 return self.contact.get_disco_info() 

502 return super().get_disco_info() 

503 

504 def moderate( 

505 self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None 

506 ) -> None: 

507 for i in self._legacy_to_xmpp(legacy_msg_id): 

508 m = self.muc.get_system_participant()._make_message() 

509 m["retract"]["id"] = i 

510 if self.is_system: 

511 m["retract"].enable("moderated") 

512 else: 

513 m["retract"]["moderated"]["by"] = self.jid 

514 m["retract"]["moderated"]["occupant-id"]["id"] = self.occupant_id 

515 if reason: 

516 m["retract"]["reason"] = reason 

517 self._send(m) 

518 

519 def set_room_subject( 

520 self, 

521 subject: str, 

522 full_jid: Optional[JID] = None, 

523 when: Optional[datetime] = None, 

524 update_muc: bool = True, 

525 ) -> None: 

526 if update_muc: 

527 self.muc._subject = subject # type: ignore 

528 self.muc.subject_setter = self.nickname 

529 self.muc.subject_date = when 

530 

531 msg = self._make_message() 

532 if when is not None: 

533 msg["delay"].set_stamp(when) 

534 msg["delay"]["from"] = self.muc.jid 

535 if subject: 

536 msg["subject"] = subject 

537 else: 

538 # may be simplified if slixmpp lets it do it more easily some day 

539 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

540 self._send(msg, full_jid) 

541 

542 def set_thread_subject( 

543 self, 

544 thread: LegacyThreadType, 

545 subject: str | None, 

546 when: Optional[datetime] = None, 

547 ) -> None: 

548 msg = self._make_message() 

549 msg["thread"] = str(thread) 

550 if when is not None: 

551 msg["delay"].set_stamp(when) 

552 msg["delay"]["from"] = self.muc.jid 

553 if subject: 

554 msg["subject"] = subject 

555 else: 

556 # may be simplified if slixmpp lets it do it more easily some day 

557 msg.xml.append(ET.Element(f"{{{msg.namespace}}}subject")) 

558 self._send(msg) 

559 

560 

561def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]: 

562 nickname = nickname_no_illegal = strip_illegal_chars(nickname).replace("\n", " | ") 

563 

564 jid = JID(muc_jid) 

565 

566 try: 

567 jid.resource = nickname 

568 except InvalidJID: 

569 nickname = nickname.encode("punycode").decode() 

570 try: 

571 jid.resource = nickname 

572 except InvalidJID: 

573 # at this point there still might be control chars 

574 jid.resource = strip_non_printable(nickname) 

575 

576 return nickname_no_illegal, jid 

577 

578 

579log = logging.getLogger(__name__)