Coverage for slidge / core / session.py: 83%

229 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import abc 

2import asyncio 

3import logging 

4from asyncio.tasks import Task 

5from collections.abc import Coroutine 

6from typing import TYPE_CHECKING, Any, Generic, NamedTuple, Self, cast 

7 

8import aiohttp 

9import sqlalchemy as sa 

10from slixmpp import JID, Iq, Message, Presence 

11from slixmpp.exceptions import XMPPError 

12from slixmpp.types import PresenceShows, ResourceDict 

13 

14from slidge.db.meta import JSONSerializable 

15 

16from ..command import SearchResult 

17from ..contact import LegacyContact 

18from ..db.models import Contact, GatewayUser 

19from ..util import SubclassableOnce 

20from ..util.lock import NamedLockMixin 

21from ..util.types import ( 

22 AnyBookmarks, 

23 AnyMUC, 

24 AnyParticipant, 

25 AnyRoster, 

26 AnySession, 

27 LegacyContactType, 

28 PseudoPresenceShow, 

29) 

30from ..util.util import noop_coro 

31 

32if TYPE_CHECKING: 

33 from .gateway import BaseGateway 

34 

35 

36class CachedPresence(NamedTuple): 

37 status: str | None 

38 show: str | None 

39 kwargs: dict[str, Any] 

40 

41 

42class BaseSession( 

43 Generic[LegacyContactType], NamedLockMixin, SubclassableOnce, abc.ABC 

44): 

45 """ 

46 The session of a registered :term:`User`. 

47 

48 Represents a gateway user logged in to the legacy network and performing actions. 

49 

50 Will be instantiated automatically on slidge startup for each registered user, 

51 or upon registration for new (validated) users. 

52 

53 Must be subclassed for a functional :term:`Legacy Module`. 

54 """ 

55 

56 """ 

57 Since we cannot set the XMPP ID of messages sent by XMPP clients, we need to keep a mapping 

58 between XMPP IDs and legacy message IDs if we want to further refer to a message that was sent 

59 by the user. This also applies to 'carboned' messages, ie, messages sent by the user from 

60 the official client of a legacy network. 

61 """ 

62 

63 xmpp: "BaseGateway[Self]" 

64 """ 

65 The gateway instance singleton. Use it for low-level XMPP calls or custom methods that are not 

66 session-specific. 

67 """ 

68 

69 MESSAGE_IDS_ARE_THREAD_IDS = False 

70 """ 

71 Set this to True if the legacy service uses message IDs as thread IDs, 

72 eg Mattermost, where you can only 'create a thread' by replying to the message, 

73 in which case the message ID is also a thread ID (and all messages are potential 

74 threads). 

75 """ 

76 SPECIAL_MSG_ID_PREFIX: str | None = None 

77 """ 

78 If you set this, XMPP message IDs starting with this won't be converted to legacy ID, 

79 but passed as is to :meth:`LegacyContact.on_react`, and usual checks for emoji restriction won't be 

80 applied. 

81 This can be used to implement voting in polls in a hacky way. 

82 """ 

83 

84 _roster_cls: type[AnyRoster] 

85 _bookmarks_cls: type[AnyBookmarks] 

86 

87 def __init__(self, user: GatewayUser) -> None: 

88 super().__init__() 

89 self.user = user 

90 """ 

91 The :term:`slidge user <User>`. 

92 """ 

93 self.log = logging.getLogger(user.jid.bare) 

94 

95 self.ignore_messages = set[str]() 

96 

97 self.contacts: AnyRoster = self._roster_cls(self) 

98 self.is_logging_in = False 

99 self._logged = False 

100 self.__reset_ready() 

101 

102 self.bookmarks = self._bookmarks_cls(self) 

103 

104 self.thread_creation_lock = asyncio.Lock() 

105 

106 self.__cached_presence: CachedPresence | None = None 

107 

108 self.__tasks = set[asyncio.Task[Any]]() 

109 

110 @property 

111 def user_jid(self) -> JID: 

112 return self.user.jid 

113 

114 @property 

115 def user_pk(self) -> int: 

116 return self.user.id 

117 

118 @property 

119 def http(self) -> aiohttp.ClientSession: 

120 return self.xmpp.http 

121 

122 def __remove_task(self, fut: Task[Any]) -> None: 

123 self.log.debug("Removing fut %s", fut) 

124 self.__tasks.remove(fut) 

125 

126 def create_task( 

127 self, coro: Coroutine[Any, Any, Any], name: str | None = None 

128 ) -> asyncio.Task[Any]: 

129 task = self.xmpp.loop.create_task(coro, name=name) 

130 self.__tasks.add(task) 

131 self.log.debug("Creating task %s", task) 

132 task.add_done_callback(lambda _: self.__remove_task(task)) 

133 return task 

134 

135 def cancel_all_tasks(self) -> None: 

136 for task in self.__tasks: 

137 task.cancel() 

138 

139 @abc.abstractmethod 

140 async def login(self) -> str | None: 

141 """ 

142 Logs in the gateway user to the legacy network. 

143 

144 Triggered when the gateway start and on user registration. 

145 It is recommended that this function returns once the user is logged in, 

146 so if you need to await forever (for instance to listen to incoming events), 

147 it's a good idea to wrap your listener in an asyncio.Task. 

148 

149 :return: Optionally, a text to use as the gateway status, e.g., "Connected as 'dude@legacy.network'" 

150 """ 

151 raise NotImplementedError 

152 

153 async def logout(self) -> None: 

154 """ 

155 Logs out the gateway user from the legacy network. 

156 

157 Called on gateway shutdown. 

158 """ 

159 raise NotImplementedError 

160 

161 async def on_presence( 

162 self, 

163 resource: str, 

164 show: PseudoPresenceShow, 

165 status: str, 

166 resources: dict[str, ResourceDict], 

167 merged_resource: ResourceDict | None, 

168 ) -> None: 

169 """ 

170 Called when the gateway component receives a presence, ie, when 

171 one of the user's clients goes online of offline, or changes its 

172 status. 

173 

174 :param resource: The XMPP client identifier, arbitrary string. 

175 :param show: The presence ``<show>``, if available. If the resource is 

176 just 'available' without any ``<show>`` element, this is an empty 

177 str. 

178 :param status: A status message, like a deeply profound quote, eg, 

179 "Roses are red, violets are blue, [INSERT JOKE]". 

180 :param resources: A summary of all the resources for this user. 

181 :param merged_resource: A global presence for the user account, 

182 following rules described in :meth:`merge_resources` 

183 """ 

184 raise NotImplementedError 

185 

186 async def on_search(self, form_values: dict[str, str]) -> SearchResult | None: 

187 """ 

188 Triggered when the user uses Jabber Search (:xep:`0055`) on the component 

189 

190 Form values is a dict in which keys are defined in :attr:`.BaseGateway.SEARCH_FIELDS` 

191 

192 :param form_values: search query, defined for a specific plugin by overriding 

193 in :attr:`.BaseGateway.SEARCH_FIELDS` 

194 :return: 

195 """ 

196 raise NotImplementedError 

197 

198 async def on_avatar( 

199 self, 

200 bytes_: bytes | None, 

201 hash_: str | None, 

202 type_: str | None, 

203 width: int | None, 

204 height: int | None, 

205 ) -> None: 

206 """ 

207 Triggered when the user uses modifies their avatar via :xep:`0084`. 

208 

209 :param bytes_: The data of the avatar. According to the spec, this 

210 should always be a PNG, but some implementations do not respect 

211 that. If `None` it means the user has unpublished their avatar. 

212 :param hash_: The SHA1 hash of the avatar data. This is an identifier of 

213 the avatar. 

214 :param type_: The MIME type of the avatar. 

215 :param width: The width of the avatar image. 

216 :param height: The height of the avatar image. 

217 """ 

218 raise NotImplementedError 

219 

220 async def on_create_group( 

221 self, name: str, contacts: list[LegacyContactType] 

222 ) -> str: 

223 """ 

224 Triggered when the user request the creation of a group via the 

225 dedicated :term:`Command`. 

226 

227 :param name: Name of the group 

228 :param contacts: list of contacts that should be members of the group 

229 """ 

230 raise NotImplementedError 

231 

232 async def on_leave_space(self, space_legacy_id: str) -> None: 

233 """ 

234 Triggered when the user sends a request to leave a :xep:`0503` space. 

235 

236 :param space_legacy_id: The legacy ID of the space to leave 

237 """ 

238 raise NotImplementedError 

239 

240 async def on_preferences( 

241 self, previous: dict[str, Any], new: dict[str, Any] 

242 ) -> None: 

243 """ 

244 This is called when the user updates their preferences. 

245 

246 Override this if you need set custom preferences field and need to trigger 

247 something when a preference has changed. 

248 """ 

249 raise NotImplementedError 

250 

251 def __reset_ready(self) -> None: 

252 self.ready = self.xmpp.loop.create_future() 

253 

254 @property 

255 def logged(self) -> bool: 

256 return self._logged 

257 

258 @logged.setter 

259 def logged(self, v: bool) -> None: 

260 self.is_logging_in = False 

261 self._logged = v 

262 if self.ready.done(): 

263 if v: 

264 return 

265 self.__reset_ready() 

266 self.shutdown(logout=False) 

267 with self.xmpp.store.session() as orm: 

268 self.xmpp.store.mam.reset_source(orm) 

269 self.xmpp.store.rooms.reset_updated(orm) 

270 self.xmpp.store.contacts.reset_updated(orm) 

271 orm.commit() 

272 else: 

273 if v: 

274 self.ready.set_result(True) 

275 

276 def __repr__(self) -> str: 

277 return f"<Session of {self.user_jid}>" 

278 

279 def shutdown(self, logout: bool = True) -> asyncio.Task[None]: 

280 for m in self.bookmarks: 

281 m.shutdown() 

282 with self.xmpp.store.session() as orm: 

283 for jid in orm.execute( 

284 sa.select(Contact.jid).filter_by(user=self.user, is_friend=True) 

285 ).scalars(): 

286 pres = self.xmpp.make_presence( 

287 pfrom=jid, 

288 pto=self.user_jid, 

289 ptype="unavailable", 

290 pstatus="Gateway has shut down.", 

291 ) 

292 pres.send() 

293 if logout: 

294 return self.xmpp.loop.create_task(self.__logout()) 

295 else: 

296 return self.xmpp.loop.create_task(noop_coro()) 

297 

298 async def __logout(self) -> None: 

299 try: 

300 await self.logout() 

301 except NotImplementedError: 

302 pass 

303 except KeyboardInterrupt: 

304 pass 

305 

306 def raise_if_not_logged(self) -> None: 

307 if not self.logged: 

308 raise XMPPError( 

309 "internal-server-error", 

310 text="You are not logged to the legacy network", 

311 ) 

312 

313 @classmethod 

314 def _from_user_or_none(cls, user: GatewayUser | None) -> Self: 

315 if user is None: 

316 log.debug("user not found") 

317 raise XMPPError(text="User not found", condition="subscription-required") 

318 

319 session = _sessions.get(user.jid.bare) 

320 if session is None: 

321 _sessions[user.jid.bare] = session = cls(user) 

322 assert isinstance(session, cls) 

323 return session 

324 

325 @classmethod 

326 def from_user(cls, user: GatewayUser) -> Self: 

327 return cls._from_user_or_none(user) 

328 

329 @classmethod 

330 def from_stanza(cls, s: Message | Iq | Presence) -> Self: 

331 # """ 

332 # Get a user's :class:`.LegacySession` using the "from" field of a stanza 

333 # 

334 # Meant to be called from :class:`BaseGateway` only. 

335 # 

336 # :param s: 

337 # :return: 

338 # """ 

339 return cls.from_jid(s.get_from()) 

340 

341 @classmethod 

342 def from_jid(cls, jid: JID) -> Self: 

343 # """ 

344 # Get a user's :class:`.LegacySession` using its jid 

345 # 

346 # Meant to be called from :class:`BaseGateway` only. 

347 # 

348 # :param jid: 

349 # :return: 

350 # """ 

351 session = _sessions.get(jid.bare) 

352 if session is not None: 

353 assert isinstance(session, cls) 

354 return session 

355 with cls.xmpp.store.session() as orm: 

356 user = orm.query(GatewayUser).filter_by(jid=jid.bare).one_or_none() 

357 return cls._from_user_or_none(user) 

358 

359 @classmethod 

360 async def kill_by_jid(cls, jid: JID) -> None: 

361 # """ 

362 # Terminate a user session. 

363 # 

364 # Meant to be called from :class:`BaseGateway` only. 

365 # 

366 # :param jid: 

367 # :return: 

368 # """ 

369 log.debug("Killing session of %s", jid) 

370 for user_jid, session in _sessions.items(): 

371 if user_jid == jid.bare: 

372 break 

373 else: 

374 log.debug("Did not find a session for %s", jid) 

375 return 

376 for c in session.contacts: 

377 c.unsubscribe() 

378 for m in session.bookmarks: 

379 m.shutdown() 

380 

381 try: 

382 session = _sessions.pop(jid.bare) 

383 except KeyError: 

384 log.warning("User not found during unregistration") 

385 return 

386 

387 session.cancel_all_tasks() 

388 

389 await cls.xmpp.unregister(cast(Self, session)) 

390 with cls.xmpp.store.session() as orm: 

391 orm.delete(session.user) 

392 orm.commit() 

393 

394 def __ack(self, msg: Message) -> None: 

395 if not self.xmpp.PROPER_RECEIPTS: 

396 self.xmpp.delivery_receipt.ack(msg) 

397 

398 def send_gateway_status( 

399 self, 

400 status: str | None = None, 

401 show: PresenceShows | None = None, 

402 **kwargs: Any, # noqa 

403 ) -> None: 

404 """ 

405 Send a presence from the gateway to the user. 

406 

407 Can be used to indicate the user session status, ie "SMS code required", "connected", … 

408 

409 :param status: A status message 

410 :param show: Presence stanza 'show' element. I suggest using "dnd" to show 

411 that the gateway is not fully functional 

412 """ 

413 self.__cached_presence = CachedPresence(status, show, kwargs) 

414 self.xmpp.send_presence( 

415 pto=self.user_jid.bare, pstatus=status, pshow=show, **kwargs 

416 ) 

417 

418 def send_cached_presence(self, to: JID) -> None: 

419 if not self.__cached_presence: 

420 self.xmpp.send_presence(pto=to, ptype="unavailable") 

421 return 

422 self.xmpp.send_presence( 

423 pto=to, 

424 pstatus=self.__cached_presence.status, 

425 pshow=self.__cached_presence.show, 

426 **self.__cached_presence.kwargs, 

427 ) 

428 

429 def send_gateway_message( 

430 self, 

431 text: str, 

432 **msg_kwargs: Any, # noqa 

433 ) -> None: 

434 """ 

435 Send a message from the gateway component to the user. 

436 

437 Can be used to indicate the user session status, ie "SMS code required", "connected", … 

438 

439 :param text: A text 

440 """ 

441 self.xmpp.send_text(text, mto=self.user_jid, **msg_kwargs) 

442 

443 def send_gateway_invite( 

444 self, 

445 muc: AnyMUC, 

446 reason: str | None = None, 

447 password: str | None = None, 

448 ) -> None: 

449 """ 

450 Send an invitation to join a MUC, emanating from the gateway component. 

451 

452 :param muc: 

453 :param reason: 

454 :param password: 

455 """ 

456 self.xmpp.invite_to(muc, reason=reason, password=password, mto=self.user_jid) 

457 

458 async def input(self, text: str, **msg_kwargs: Any) -> str: # noqa 

459 """ 

460 Request user input via direct messages from the gateway component. 

461 

462 Wraps call to :meth:`.BaseSession.input` 

463 

464 :param text: The prompt to send to the user 

465 :param msg_kwargs: Extra attributes 

466 :return: 

467 """ 

468 return await self.xmpp.input(self.user_jid, text, **msg_kwargs) 

469 

470 async def send_qr(self, text: str) -> None: 

471 """ 

472 Sends a QR code generated from 'text' via HTTP Upload and send the URL to 

473 ``self.user`` 

474 

475 :param text: Text to encode as a QR code 

476 """ 

477 await self.xmpp.send_qr(text, mto=self.user_jid) 

478 

479 async def get_contact_or_group_or_participant( 

480 self, jid: JID, create: bool = True 

481 ) -> "LegacyContact | AnyMUC | AnyParticipant | None": 

482 if (contact := self.contacts.by_jid_only_if_exists(jid)) is not None: 

483 return contact # type:ignore[no-any-return] 

484 if (muc := self.bookmarks.by_jid_only_if_exists(JID(jid.bare))) is not None: 

485 return await self.__get_muc_or_participant(muc, jid) 

486 else: 

487 muc = None 

488 

489 if not create: 

490 return None 

491 

492 try: 

493 return await self.contacts.by_jid(jid) # type:ignore[no-any-return] 

494 except XMPPError: 

495 if muc is None: 

496 try: 

497 muc = await self.bookmarks.by_jid(jid) 

498 except XMPPError: 

499 return None 

500 return await self.__get_muc_or_participant(muc, jid) 

501 

502 @staticmethod 

503 async def __get_muc_or_participant( 

504 muc: AnyMUC, jid: JID 

505 ) -> "AnyMUC | AnyParticipant | None": 

506 if nick := jid.resource: 

507 return await muc.get_participant(nick, create=False, fill_first=True) 

508 return muc 

509 

510 async def wait_for_ready(self, timeout: int | float | None = 10) -> None: 

511 # """ 

512 # Wait until session, contacts and bookmarks are ready 

513 # 

514 # (slidge internal use) 

515 # 

516 # :param timeout: 

517 # :return: 

518 # """ 

519 try: 

520 await asyncio.wait_for(asyncio.shield(self.ready), timeout) 

521 await asyncio.wait_for(asyncio.shield(self.contacts.ready), timeout) 

522 await asyncio.wait_for(asyncio.shield(self.bookmarks.ready), timeout) 

523 except TimeoutError: 

524 raise XMPPError( 

525 "recipient-unavailable", 

526 "Legacy session is not fully initialized, retry later", 

527 ) 

528 

529 def legacy_module_data_update(self, data: JSONSerializable) -> None: 

530 user = self.user 

531 user.legacy_module_data.update(data) 

532 self.xmpp.store.users.update(user) 

533 

534 def legacy_module_data_set(self, data: JSONSerializable) -> None: 

535 user = self.user 

536 user.legacy_module_data = data 

537 self.xmpp.store.users.update(user) 

538 

539 def legacy_module_data_clear(self) -> None: 

540 user = self.user 

541 user.legacy_module_data.clear() 

542 self.xmpp.store.users.update(user) 

543 

544 

545# keys = user.jid.bare 

546_sessions: dict[str, AnySession] = {} 

547log = logging.getLogger(__name__)