Coverage for slidge/group/participant.py: 89%

339 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1import logging 

2import string 

3import uuid 

4import warnings 

5from copy import copy 

6from datetime import datetime 

7from functools import cached_property 

8from typing import TYPE_CHECKING, Any, Optional, Union 

9 

10import sqlalchemy as sa 

11from slixmpp import JID, InvalidJID, Message, Presence 

12from slixmpp.plugins.xep_0045.stanza import MUCAdminItem 

13from slixmpp.types import MessageTypes, OptJid 

14from sqlalchemy.orm.exc import DetachedInstanceError 

15 

16from ..contact import LegacyContact 

17from ..core.mixins import ChatterDiscoMixin, MessageMixin, PresenceMixin 

18from ..core.mixins.db import DBMixin 

19from ..db.models import Participant 

20from ..util import SubclassableOnce, strip_illegal_chars 

21from ..util.types import ( 

22 CachedPresence, 

23 Hat, 

24 LegacyMessageType, 

25 MessageOrPresenceTypeVar, 

26 MucAffiliation, 

27 MucRole, 

28) 

29 

30if TYPE_CHECKING: 

31 from .room import LegacyMUC 

32 

33 

34def strip_non_printable(nickname: str): 

35 new = ( 

36 "".join(x for x in nickname if x in string.printable) 

37 + f"-slidge-{hash(nickname)}" 

38 ) 

39 warnings.warn(f"Could not use {nickname} as a nickname, using {new}") 

40 return new 

41 

42 

43class LegacyParticipant( 

44 PresenceMixin, 

45 MessageMixin, 

46 ChatterDiscoMixin, 

47 DBMixin, 

48 metaclass=SubclassableOnce, 

49): 

50 """ 

51 A legacy participant of a legacy group chat. 

52 """ 

53 

54 mtype: MessageTypes = "groupchat" 

55 _can_send_carbon = False 

56 USE_STANZA_ID = True 

57 STRIP_SHORT_DELAY = False 

58 stored: Participant 

59 contact: LegacyContact[Any] | None 

60 

61 def __init__( 

62 self, 

63 muc: "LegacyMUC", 

64 stored: Participant, 

65 is_system: bool = False, 

66 contact: LegacyContact[Any] | None = None, 

67 ) -> None: 

68 self.muc = muc 

69 self.session = muc.session 

70 self.xmpp = muc.session.xmpp 

71 self.is_system = is_system 

72 

73 if contact is None and stored.contact is not None: 

74 contact = self.session.contacts.from_store(stored=stored.contact) 

75 if contact is not None and stored.contact is None: 

76 stored.contact = contact.stored 

77 

78 self.stored = stored 

79 self.contact = contact 

80 

81 super().__init__() 

82 

83 if stored.resource is None: 

84 self.__update_resource(stored.nickname) 

85 

86 self.log = logging.getLogger(f"{self.user_jid.bare}:{self.jid}") 

87 

88 @property 

89 def is_user(self) -> bool: 

90 try: 

91 return self.stored.is_user 

92 except DetachedInstanceError: 

93 self.merge() 

94 return self.stored.is_user 

95 

96 @property 

97 def jid(self) -> JID: 

98 jid = JID(self.muc.jid) 

99 if self.stored.resource: 

100 jid.resource = self.stored.resource 

101 return jid 

102 

103 @jid.setter 

104 def jid(self, x: JID): 

105 # FIXME: without this, mypy yields 

106 # "Cannot override writeable attribute with read-only property" 

107 # But it does not happen for LegacyContact. WTF? 

108 raise RuntimeError 

109 

110 def commit(self, *args, **kwargs) -> None: 

111 if self.is_system: 

112 return 

113 if self.muc.get_lock("fill participants") or self.muc.get_lock("fill history"): 

114 return 

115 super().commit(*args, **kwargs) 

116 

117 @property 

118 def user_jid(self): 

119 return self.session.user_jid 

120 

121 def __repr__(self) -> str: 

122 return f"<Participant '{self.nickname}'/'{self.jid}' of '{self.muc}'>" 

123 

124 @property 

125 def _presence_sent(self) -> bool: 

126 # we track if we already sent a presence for this participant. 

127 # if we didn't, we send it before the first message. 

128 # this way, event in plugins that don't map "user has joined" events, 

129 # we send a "join"-presence from the participant before the first message 

130 return self.stored.presence_sent 

131 

132 @_presence_sent.setter 

133 def _presence_sent(self, val: bool) -> None: 

134 if self._presence_sent == val: 

135 return 

136 self.stored.presence_sent = val 

137 self.commit(merge=True) 

138 

139 @property 

140 def nickname_no_illegal(self) -> str: 

141 return self.stored.nickname_no_illegal 

142 

143 @property 

144 def affiliation(self): 

145 return self.stored.affiliation 

146 

147 @affiliation.setter 

148 def affiliation(self, affiliation: MucAffiliation) -> None: 

149 if self.affiliation == affiliation: 

150 return 

151 self.stored.affiliation = affiliation 

152 if not self.muc.participants_filled: 

153 return 

154 self.commit() 

155 if not self._presence_sent: 

156 return 

157 self.send_last_presence(force=True, no_cache_online=True) 

158 

159 def send_affiliation_change(self) -> None: 

160 # internal use by slidge 

161 msg = self._make_message() 

162 msg["muc"]["affiliation"] = self.affiliation 

163 msg["type"] = "normal" 

164 if not self.muc.is_anonymous and not self.is_system: 

165 if self.contact: 

166 msg["muc"]["jid"] = self.contact.jid 

167 else: 

168 warnings.warn( 

169 f"Private group but no 1:1 JID associated to '{self}'", 

170 ) 

171 self._send(msg) 

172 

173 @property 

174 def role(self): 

175 return self.stored.role 

176 

177 @role.setter 

178 def role(self, role: MucRole) -> None: 

179 if self.role == role: 

180 return 

181 self.stored.role = role 

182 if not self.muc.participants_filled: 

183 return 

184 self.commit() 

185 if not self._presence_sent: 

186 return 

187 self.send_last_presence(force=True, no_cache_online=True) 

188 

189 @property 

190 def hats(self) -> list[Hat]: 

191 return [Hat(*h) for h in self.stored.hats] if self.stored.hats else [] 

192 

193 def set_hats(self, hats: list[Hat]) -> None: 

194 if self.hats == hats: 

195 return 

196 self.stored.hats = hats # type:ignore[assignment] 

197 if not self.muc.participants_filled: 

198 return 

199 self.commit(merge=True) 

200 if not self._presence_sent: 

201 return 

202 self.send_last_presence(force=True, no_cache_online=True) 

203 

204 def __update_resource(self, unescaped_nickname: Optional[str]) -> None: 

205 if not unescaped_nickname: 

206 self.stored.resource = "" 

207 if self.is_system: 

208 self.stored.nickname_no_illegal = "" 

209 else: 

210 warnings.warn( 

211 "Only the system participant is allowed to not have a nickname" 

212 ) 

213 nickname = f"unnamed-{uuid.uuid4()}" 

214 self.stored.resource = self.stored.nickname_no_illegal = nickname 

215 return 

216 

217 self.stored.nickname_no_illegal, jid = escape_nickname( 

218 self.muc.jid, 

219 unescaped_nickname, 

220 ) 

221 self.stored.resource = jid.resource 

222 

223 def send_configuration_change(self, codes: tuple[int]): 

224 if not self.is_system: 

225 raise RuntimeError("This is only possible for the system participant") 

226 msg = self._make_message() 

227 msg["muc"]["status_codes"] = codes 

228 self._send(msg) 

229 

230 @property 

231 def nickname(self): 

232 return self.stored.nickname 

233 

234 @nickname.setter 

235 def nickname(self, new_nickname: str) -> None: 

236 old = self.nickname 

237 if new_nickname == old: 

238 return 

239 

240 cache = getattr(self, "_last_presence", None) 

241 if cache: 

242 last_seen = cache.last_seen 

243 kwargs = cache.presence_kwargs 

244 else: 

245 last_seen = None 

246 kwargs = {} 

247 

248 kwargs["status_codes"] = {303} 

249 

250 p = self._make_presence(ptype="unavailable", last_seen=last_seen, **kwargs) 

251 # in this order so pfrom=old resource and we actually use the escaped nick 

252 # in the muc/item/nick element 

253 self.__update_resource(new_nickname) 

254 p["muc"]["item"]["nick"] = self.jid.resource 

255 self._send(p) 

256 

257 self.stored.nickname = new_nickname 

258 self.commit() 

259 kwargs["status_codes"] = set() 

260 p = self._make_presence(ptype="available", last_seen=last_seen, **kwargs) 

261 self._send(p) 

262 

263 def _make_presence( 

264 self, 

265 *, 

266 last_seen: Optional[datetime] = None, 

267 status_codes: Optional[set[int]] = None, 

268 user_full_jid: Optional[JID] = None, 

269 **presence_kwargs, 

270 ): 

271 p = super()._make_presence(last_seen=last_seen, **presence_kwargs) 

272 p["muc"]["affiliation"] = self.affiliation 

273 p["muc"]["role"] = self.role 

274 if self.hats: 

275 p["hats"].add_hats(self.hats) 

276 codes = status_codes or set() 

277 if self.is_user: 

278 codes.add(110) 

279 if not self.muc.is_anonymous and not self.is_system: 

280 if self.is_user: 

281 if user_full_jid: 

282 p["muc"]["jid"] = user_full_jid 

283 else: 

284 jid = JID(self.user_jid) 

285 try: 

286 jid.resource = next(iter(self.muc.get_user_resources())) 

287 except StopIteration: 

288 jid.resource = "pseudo-resource" 

289 p["muc"]["jid"] = self.user_jid 

290 codes.add(100) 

291 elif self.contact: 

292 p["muc"]["jid"] = self.contact.jid 

293 if a := self.contact.get_avatar(): 

294 p["vcard_temp_update"]["photo"] = a.id 

295 else: 

296 warnings.warn( 

297 f"Private group but no 1:1 JID associated to '{self}'", 

298 ) 

299 if self.is_user and (hash_ := self.session.user.avatar_hash): 

300 p["vcard_temp_update"]["photo"] = hash_ 

301 p["muc"]["status_codes"] = codes 

302 return p 

303 

304 @property 

305 def DISCO_NAME(self): 

306 return self.nickname 

307 

308 def __send_presence_if_needed( 

309 self, stanza: Union[Message, Presence], full_jid: JID, archive_only: bool 

310 ) -> None: 

311 if ( 

312 archive_only 

313 or self.is_system 

314 or self.is_user 

315 or self._presence_sent 

316 or stanza["subject"] 

317 ): 

318 return 

319 if isinstance(stanza, Message): 

320 if stanza.get_plugin("muc", check=True): 

321 return 

322 self.send_initial_presence(full_jid) 

323 

324 @cached_property 

325 def __occupant_id(self): 

326 if self.contact: 

327 return self.contact.jid 

328 elif self.is_user: 

329 return "slidge-user" 

330 elif self.is_system: 

331 return "room" 

332 else: 

333 return str(uuid.uuid4()) 

334 

335 def _send( 

336 self, 

337 stanza: MessageOrPresenceTypeVar, 

338 full_jid: Optional[JID] = None, 

339 archive_only: bool = False, 

340 legacy_msg_id=None, 

341 initial_presence=False, 

342 **send_kwargs, 

343 ) -> MessageOrPresenceTypeVar: 

344 if stanza.get_from().resource: 

345 stanza["occupant-id"]["id"] = self.__occupant_id 

346 else: 

347 stanza["occupant-id"]["id"] = "room" 

348 self.__add_nick_element(stanza) 

349 if not self.is_user and isinstance(stanza, Presence): 

350 if stanza["type"] == "unavailable" and not self._presence_sent: 

351 return stanza # type:ignore 

352 if initial_presence: 

353 self.stored.presence_sent = True 

354 else: 

355 self._presence_sent = True 

356 if full_jid: 

357 stanza["to"] = full_jid 

358 self.__send_presence_if_needed(stanza, full_jid, archive_only) 

359 if self.is_user: 

360 assert stanza.stream is not None 

361 stanza.stream.send(stanza, use_filters=False) 

362 else: 

363 stanza.send() 

364 else: 

365 if hasattr(self.muc, "archive") and isinstance(stanza, Message): 

366 self.muc.archive.add(stanza, self, archive_only, legacy_msg_id) 

367 if archive_only: 

368 return stanza 

369 for user_full_jid in self.muc.user_full_jids(): 

370 stanza = copy(stanza) 

371 stanza["to"] = user_full_jid 

372 self.__send_presence_if_needed(stanza, user_full_jid, archive_only) 

373 stanza.send() 

374 return stanza 

375 

376 def mucadmin_item(self): 

377 item = MUCAdminItem() 

378 item["nick"] = self.nickname 

379 item["affiliation"] = self.affiliation 

380 item["role"] = self.role 

381 if not self.muc.is_anonymous: 

382 if self.is_user: 

383 item["jid"] = self.user_jid.bare 

384 elif self.contact: 

385 item["jid"] = self.contact.jid.bare 

386 else: 

387 warnings.warn( 

388 ( 

389 f"Public group but no contact JID associated to {self.jid} in" 

390 f" {self}" 

391 ), 

392 ) 

393 return item 

394 

395 def __add_nick_element(self, stanza: Union[Presence, Message]) -> None: 

396 if (nick := self.nickname_no_illegal) != self.jid.resource: 

397 n = self.xmpp.plugin["xep_0172"].stanza.UserNick() 

398 n["nick"] = nick 

399 stanza.append(n) 

400 

401 def _get_last_presence(self) -> Optional[CachedPresence]: 

402 own = super()._get_last_presence() 

403 if own is None and self.contact: 

404 return self.contact._get_last_presence() 

405 return own 

406 

407 def send_initial_presence( 

408 self, 

409 full_jid: JID, 

410 nick_change: bool = False, 

411 presence_id: Optional[str] = None, 

412 ) -> None: 

413 """ 

414 Called when the user joins a MUC, as a mechanism 

415 to indicate to the joining XMPP client the list of "participants". 

416 

417 Can be called this to trigger a "participant has joined the group" event. 

418 

419 :param full_jid: Set this to only send to a specific user XMPP resource. 

420 :param nick_change: Used when the user joins and the MUC renames them (code 210) 

421 :param presence_id: set the presence ID. used internally by slidge 

422 """ 

423 # MUC status codes: https://xmpp.org/extensions/xep-0045.html#registrar-statuscodes 

424 codes = set() 

425 if nick_change: 

426 codes.add(210) 

427 

428 if self.is_user: 

429 # the "initial presence" of the user has to be vanilla, as it is 

430 # a crucial part of the MUC join sequence for XMPP clients. 

431 kwargs = {} 

432 else: 

433 cache = self._get_last_presence() 

434 self.log.debug("Join muc, initial presence: %s", cache) 

435 if cache: 

436 ptype = cache.ptype 

437 if ptype == "unavailable": 

438 return 

439 kwargs = dict( 

440 last_seen=cache.last_seen, pstatus=cache.pstatus, pshow=cache.pshow 

441 ) 

442 else: 

443 kwargs = {} 

444 p = self._make_presence( 

445 status_codes=codes, 

446 user_full_jid=full_jid, 

447 **kwargs, # type:ignore 

448 ) 

449 if presence_id: 

450 p["id"] = presence_id 

451 self._send(p, full_jid, initial_presence=True) 

452 

453 def leave(self) -> None: 

454 """ 

455 Call this when the participant leaves the room 

456 """ 

457 self.muc.remove_participant(self) 

458 

459 def kick(self, reason: str | None = None) -> None: 

460 """ 

461 Call this when the participant is kicked from the room 

462 """ 

463 self.muc.remove_participant(self, kick=True, reason=reason) 

464 

465 def ban(self, reason: str | None = None) -> None: 

466 """ 

467 Call this when the participant is banned from the room 

468 """ 

469 self.muc.remove_participant(self, ban=True, reason=reason) 

470 

471 def get_disco_info(self, jid: OptJid = None, node: Optional[str] = None): 

472 if self.contact is not None: 

473 return self.contact.get_disco_info() 

474 return super().get_disco_info() 

475 

476 def moderate( 

477 self, legacy_msg_id: LegacyMessageType, reason: Optional[str] = None 

478 ) -> None: 

479 xmpp_id = self._legacy_to_xmpp(legacy_msg_id) 

480 with self.xmpp.store.session() as orm: 

481 msg_ids = self.xmpp.store.id_map.get_xmpp( 

482 orm, self.muc.stored.id, str(legacy_msg_id), True 

483 ) 

484 

485 for i in set(msg_ids + [xmpp_id]): 

486 m = self.muc.get_system_participant()._make_message() 

487 m["retract"]["id"] = i 

488 if self.is_system: 

489 m["retract"].enable("moderated") 

490 else: 

491 m["retract"]["moderated"]["by"] = self.jid 

492 m["retract"]["moderated"]["occupant-id"]["id"] = self.__occupant_id 

493 if reason: 

494 m["retract"]["reason"] = reason 

495 self._send(m) 

496 

497 def set_room_subject( 

498 self, 

499 subject: str, 

500 full_jid: Optional[JID] = None, 

501 when: Optional[datetime] = None, 

502 update_muc: bool = True, 

503 ) -> None: 

504 if update_muc: 

505 self.muc._subject = subject # type: ignore 

506 self.muc.subject_setter = self.nickname 

507 self.muc.subject_date = when 

508 

509 msg = self._make_message() 

510 if when is not None: 

511 msg["delay"].set_stamp(when) 

512 msg["delay"]["from"] = self.muc.jid 

513 msg["subject"] = subject or str(self.muc.name) 

514 self._send(msg, full_jid) 

515 

516 

517def escape_nickname(muc_jid: JID, nickname: str) -> tuple[str, JID]: 

518 nickname = nickname_no_illegal = strip_illegal_chars(nickname) 

519 

520 jid = JID(muc_jid) 

521 

522 try: 

523 jid.resource = nickname 

524 except InvalidJID: 

525 nickname = nickname.encode("punycode").decode() 

526 try: 

527 jid.resource = nickname 

528 except InvalidJID: 

529 # at this point there still might be control chars 

530 jid.resource = strip_non_printable(nickname) 

531 

532 return nickname_no_illegal, jid 

533 

534 

535log = logging.getLogger(__name__)