Coverage for slidge / command / user.py: 50%

244 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1# Commands available to users 

2import contextlib 

3from copy import deepcopy 

4from typing import Any, cast 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import XMPPError 

8 

9from slidge.db.meta import JSONSerializable 

10from slidge.db.models import Space 

11 

12from ..util.types import ( 

13 AnyMUC, 

14 AnySession, 

15 LegacyGroupIdType, 

16 MucType, 

17 UserPreferences, 

18) 

19from .base import ( 

20 Command, 

21 CommandAccess, 

22 Confirmation, 

23 ConfirmationSession, 

24 Form, 

25 FormField, 

26 FormSession, 

27 FormValues, 

28 SearchResult, 

29 TableResult, 

30) 

31from .categories import CONTACTS, GROUPS, SPACES 

32 

33 

34class Search(Command[AnySession]): 

35 NAME = "🔎 Search for contacts" 

36 HELP = "Search for contacts via this gateway" 

37 CHAT_COMMAND = "find" 

38 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

39 ACCESS = CommandAccess.USER_LOGGED 

40 CATEGORY = CONTACTS 

41 

42 async def run( 

43 self, session: AnySession | None, _ifrom: JID, *args: str 

44 ) -> FormSession[AnySession] | SearchResult | None: 

45 assert session is not None 

46 await session.ready 

47 if args: 

48 return await session.on_search( 

49 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)} 

50 ) 

51 return FormSession[AnySession]( 

52 title=self.xmpp.SEARCH_TITLE, 

53 instructions=self.xmpp.SEARCH_INSTRUCTIONS, 

54 fields=self.xmpp.SEARCH_FIELDS, 

55 handler=self.search, 

56 ) 

57 

58 @staticmethod 

59 async def search( 

60 form_values: FormValues, session: AnySession | None, _ifrom: JID 

61 ) -> SearchResult: 

62 assert session is not None 

63 results = await session.on_search(form_values) # type: ignore 

64 if results is None: 

65 raise XMPPError("item-not-found", "No contact was found") 

66 

67 return results 

68 

69 

70class SyncContacts(Command[AnySession]): 

71 NAME = "🔄 Sync XMPP roster" 

72 HELP = ( 

73 "Synchronize your XMPP roster with your legacy contacts. " 

74 "Slidge will only add/remove/modify contacts in its dedicated roster group" 

75 ) 

76 CHAT_COMMAND = "sync-contacts" 

77 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

78 ACCESS = CommandAccess.USER_LOGGED 

79 CATEGORY = CONTACTS 

80 

81 async def run( 

82 self, 

83 session: AnySession | None, 

84 _ifrom: JID, 

85 *args: str, 

86 ) -> ConfirmationSession[AnySession]: 

87 assert session is not None 

88 await session.ready 

89 return ConfirmationSession[AnySession]( 

90 prompt="Are you sure you want to sync your roster?", 

91 success=None, 

92 handler=self.sync, 

93 ) 

94 

95 async def sync(self, session: AnySession | None, _ifrom: JID) -> str: 

96 if session is None: 

97 raise RuntimeError 

98 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare) 

99 

100 contacts = session.contacts.known_contacts() 

101 

102 added = 0 

103 removed = 0 

104 updated = 0 

105 for item in roster_iq["roster"]: 

106 groups = set(item["groups"]) 

107 if self.xmpp.ROSTER_GROUP in groups: 

108 contact = contacts.pop(item["jid"], None) 

109 if contact is None: 

110 if len(groups) == 1: 

111 await self.xmpp["xep_0356"].set_roster( 

112 session.user_jid, {item["jid"]: {"subscription": "remove"}} 

113 ) 

114 removed += 1 

115 else: 

116 groups.remove(self.xmpp.ROSTER_GROUP) 

117 await self.xmpp["xep_0356"].set_roster( 

118 session.user_jid, 

119 { 

120 item["jid"]: { 

121 "subscription": item["subscription"], 

122 "name": item["name"], 

123 "groups": groups, 

124 } 

125 }, 

126 ) 

127 updated += 1 

128 else: 

129 if contact.name != item["name"]: 

130 await contact.add_to_roster(force=True) 

131 updated += 1 

132 

133 # we popped before so this only acts on slidge contacts not in the xmpp roster 

134 for contact in contacts.values(): 

135 added += 1 

136 await contact.add_to_roster() 

137 

138 return f"{added} added, {removed} removed, {updated} updated" 

139 

140 

141class ListContacts(Command[AnySession]): 

142 NAME = HELP = "👤 List your legacy contacts" 

143 CHAT_COMMAND = "contacts" 

144 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

145 ACCESS = CommandAccess.USER_LOGGED 

146 CATEGORY = CONTACTS 

147 

148 async def run( 

149 self, 

150 session: AnySession | None, 

151 _ifrom: JID, 

152 *_: str, 

153 ) -> TableResult: 

154 assert session is not None 

155 await session.ready 

156 contacts = sorted( 

157 session.contacts, key=lambda c: c.name.casefold() if c.name else "" 

158 ) 

159 return TableResult( 

160 description="Your buddies", 

161 fields=[FormField("name"), FormField("jid", type="jid-single")], 

162 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts], 

163 ) 

164 

165 

166class ListGroups(Command[AnySession]): 

167 NAME = HELP = "👥 List your legacy groups" 

168 CHAT_COMMAND = "groups" 

169 NODE = GROUPS.node + "/" + CHAT_COMMAND 

170 ACCESS = CommandAccess.USER_LOGGED 

171 CATEGORY = GROUPS 

172 

173 async def run( 

174 self, session: AnySession | None, _ifrom: JID, *_: str 

175 ) -> TableResult: 

176 assert session is not None 

177 await session.ready 

178 groups: list[AnyMUC] = sorted( 

179 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold() 

180 ) 

181 return TableResult( 

182 description="Your groups", 

183 fields=[FormField("name"), FormField("jid", type="jid-single")], 

184 items=[ 

185 {"name": g.name or str(g.legacy_id), "jid": g.jid.bare} for g in groups 

186 ], 

187 jids_are_mucs=True, 

188 ) 

189 

190 

191class ListSpaces(Command[AnySession]): 

192 NAME = "🌐 List my spaces" 

193 HELP = "List the spaces you are part of. Spaces are collections of rooms." 

194 CHAT_COMMAND = "spaces" 

195 NODE = GROUPS.node + "/" + CHAT_COMMAND 

196 ACCESS = CommandAccess.USER_LOGGED 

197 CATEGORY = SPACES 

198 

199 related_to_spaces = True 

200 

201 async def run( 

202 self, session: "AnySession | None", _ifrom: JID, *_: str 

203 ) -> TableResult: 

204 assert session is not None 

205 spaces = await _get_updated_spaces(session) 

206 return TableResult( 

207 description="Your spaces. If your client does not support spaces, use the 'space-rooms' command.", 

208 fields=[FormField("name"), FormField("iri")], 

209 items=[ 

210 { 

211 "name": s.name or str(s.legacy_id), 

212 "iri": f"xmpp:{self.xmpp.boundjid.bare}?;node={await session.bookmarks.space_legacy_id_to_node(s.legacy_id)}", 

213 } 

214 for s in spaces 

215 ], 

216 jids_are_mucs=True, 

217 ) 

218 

219 

220class ListRoomsInSpace(Command[AnySession]): 

221 NAME = "🌐 List the rooms in a space" 

222 HELP = "List the rooms of a space you are part of. Spaces are collections of rooms." 

223 CHAT_COMMAND = "space-rooms" 

224 NODE = GROUPS.node + "/" + CHAT_COMMAND 

225 ACCESS = CommandAccess.USER_LOGGED 

226 CATEGORY = SPACES 

227 

228 async def run( 

229 self, session: "AnySession | None", _ifrom: JID, *_: str 

230 ) -> "FormSession[AnySession]": 

231 assert session is not None 

232 spaces = await _get_updated_spaces(session) 

233 return FormSession( 

234 title="Your spaces", 

235 instructions="Select a space to view its rooms", 

236 fields=[ 

237 FormField( 

238 var="space_legacy_id", 

239 label="Space", 

240 required=True, 

241 type="list-single", 

242 options=[ 

243 { 

244 "label": s.name or str(s.legacy_id), 

245 "value": str(s.legacy_id), 

246 } 

247 for s in spaces 

248 ], 

249 ) 

250 ], 

251 handler=self.list_rooms, 

252 handler_args=({str(s.legacy_id): s.name for s in spaces},), 

253 ) 

254 

255 async def list_rooms( 

256 self, 

257 form_values: FormValues, 

258 session: AnySession | None, 

259 _ifrom: JID, 

260 space_names: dict[str, str], 

261 ) -> TableResult: 

262 assert session is not None 

263 space_legacy_id_str = form_values.get("space_legacy_id") 

264 if space_legacy_id_str is None: 

265 raise XMPPError("bad-request", "You need to specify a space") 

266 assert isinstance(space_legacy_id_str, str) 

267 await session.ready 

268 space_legacy_id = self.xmpp.LEGACY_ROOM_ID_TYPE(space_legacy_id_str) 

269 with self.xmpp.store.session() as orm: 

270 rooms = sorted( 

271 list( 

272 self.xmpp.store.spaces.get_rooms( 

273 orm, 

274 session.user_pk, 

275 await session.bookmarks.space_legacy_id_to_node( 

276 space_legacy_id 

277 ), 

278 ) 

279 ), 

280 key=lambda r: (r.name or str(r.jid.node)).casefold(), 

281 ) 

282 name = space_names.get(space_legacy_id_str) 

283 return TableResult( 

284 fields=[ 

285 FormField("name", "Name"), 

286 FormField("jid", "JID", type="jid-single"), 

287 ], 

288 description=f"Rooms of '{name or space_legacy_id}'", 

289 items=[{"name": r.name or str(r.legacy_id), "jid": r.jid} for r in rooms], 

290 jids_are_mucs=True, 

291 ) 

292 

293 

294class Login(Command[AnySession]): 

295 NAME = "🔐 Re-login to the legacy network" 

296 HELP = "Login to the legacy service" 

297 CHAT_COMMAND = "re-login" 

298 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND 

299 

300 ACCESS = CommandAccess.USER_NON_LOGGED 

301 

302 async def run( 

303 self, 

304 session: AnySession | None, 

305 _ifrom: JID, 

306 *_: str, 

307 ) -> str: 

308 assert session is not None 

309 if session.is_logging_in: 

310 raise XMPPError("bad-request", "You are already logging in.") 

311 session.is_logging_in = True 

312 try: 

313 msg = await self.xmpp.login_wrap(session) 

314 except Exception as e: 

315 session.send_gateway_status(f"Re-login failed: {e}", show="dnd") 

316 raise XMPPError( 

317 "internal-server-error", etype="wait", text=f"Could not login: {e}" 

318 ) 

319 finally: 

320 session.is_logging_in = False 

321 session.logged = True 

322 session.send_gateway_status(msg or "Re-connected", show="chat") 

323 session.send_gateway_message(msg or "Re-connected") 

324 return msg 

325 

326 

327class CreateGroup(Command[AnySession]): 

328 NAME = "🆕 New legacy group" 

329 HELP = "Create a group on the legacy service" 

330 CHAT_COMMAND = "create-group" 

331 NODE = GROUPS.node + "/" + CHAT_COMMAND 

332 CATEGORY = GROUPS 

333 

334 ACCESS = CommandAccess.USER_LOGGED 

335 

336 async def run( 

337 self, 

338 session: AnySession | None, 

339 _ifrom: JID, 

340 *_: str, 

341 ) -> FormSession[AnySession]: 

342 assert session is not None 

343 await session.ready 

344 contacts = session.contacts.known_contacts(only_friends=True) 

345 return FormSession( 

346 title="Create a new group", 

347 instructions="Pick contacts that should be part of this new group", 

348 fields=[ 

349 FormField(var="group_name", label="Name of the group", required=True), 

350 FormField( 

351 var="contacts", 

352 label="Contacts to add to the new group", 

353 type="list-multi", 

354 options=[ 

355 {"value": str(contact.jid), "label": contact.name} 

356 for contact in sorted(contacts.values(), key=lambda c: c.name) 

357 ], 

358 required=False, 

359 ), 

360 ], 

361 handler=self.finish, 

362 ) 

363 

364 @staticmethod 

365 async def finish( 

366 form_values: FormValues, 

367 session: AnySession | None, 

368 *_: Any, # noqa:ANN401 

369 ) -> TableResult: 

370 assert session is not None 

371 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore 

372 cast(str, form_values["group_name"]), 

373 [ 

374 await session.contacts.by_jid(JID(j)) 

375 for j in form_values.get("contacts", []) # type:ignore 

376 ], 

377 ) 

378 muc = await session.bookmarks.by_legacy_id(legacy_id) 

379 return TableResult( 

380 description=f"Your new group: xmpp:{muc.jid}?join", 

381 fields=[FormField("name"), FormField("jid", type="jid-single")], 

382 items=[{"name": muc.name, "jid": muc.jid}], 

383 jids_are_mucs=True, 

384 ) 

385 

386 

387class Preferences(Command[AnySession]): 

388 NAME = "⚙️ Preferences" 

389 HELP = "Customize the gateway behaviour to your liking" 

390 CHAT_COMMAND = "preferences" 

391 NODE = "https://slidge.im/command/core/preferences" 

392 ACCESS = CommandAccess.USER 

393 

394 async def run( 

395 self, 

396 session: AnySession | None, 

397 _ifrom: JID, 

398 *_: str, 

399 ) -> FormSession[AnySession]: 

400 fields = deepcopy(self.xmpp.PREFERENCES) 

401 assert session is not None 

402 current = session.user.preferences 

403 for field in fields: 

404 field.value = current.get(field.var, field.value) # type:ignore 

405 return Form( 

406 title="Preferences", 

407 instructions=self.HELP, 

408 fields=fields, 

409 handler=self.finish, # type:ignore 

410 handler_kwargs={"previous": current}, 

411 ) 

412 

413 async def finish( 

414 self, 

415 form_values: UserPreferences, 

416 session: AnySession | None, 

417 *_: Any, # noqa:ANN401 

418 previous: JSONSerializable, 

419 ) -> str: 

420 assert session is not None 

421 if previous == form_values: 

422 return "No preference was changed" 

423 

424 previous = previous.copy() 

425 user = session.user 

426 user.preferences.update(form_values) # type:ignore 

427 self.xmpp.store.users.update(user) 

428 

429 with contextlib.suppress(NotImplementedError): 

430 await session.on_preferences(previous, form_values) # type:ignore[arg-type] 

431 

432 if not previous["sync_avatar"] and form_values["sync_avatar"]: 

433 await self.xmpp.fetch_user_avatar(session) 

434 else: 

435 user.avatar_hash = None 

436 

437 return "Your preferences have been updated." 

438 

439 

440class Unregister(Command[AnySession]): 

441 NAME = "❌ Unregister from the gateway" 

442 HELP = "Unregister from the gateway" 

443 CHAT_COMMAND = "unregister" 

444 NODE = "https://slidge.im/command/core/unregister" 

445 ACCESS = CommandAccess.USER 

446 

447 async def run( 

448 self, session: AnySession | None, _ifrom: JID, *_: str 

449 ) -> ConfirmationSession[AnySession]: 

450 return ConfirmationSession[AnySession]( 

451 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?", 

452 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.", 

453 handler=self.unregister, 

454 ) 

455 

456 async def unregister(self, session: AnySession | None, _ifrom: JID) -> str: 

457 assert session is not None 

458 await self.xmpp.unregister_user(session.user) 

459 return "You are not registered anymore. Bye!" 

460 

461 

462class LeaveGroup(Command[AnySession]): 

463 NAME = HELP = "❌ Leave a legacy group" 

464 CHAT_COMMAND = "leave-group" 

465 NODE = GROUPS.node + "/" + CHAT_COMMAND 

466 ACCESS = CommandAccess.USER_LOGGED 

467 CATEGORY = GROUPS 

468 

469 async def run( 

470 self, 

471 session: AnySession | None, 

472 _ifrom: JID, 

473 *_: str, 

474 ) -> FormSession[AnySession]: 

475 assert session is not None 

476 await session.ready 

477 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

478 return FormSession( 

479 title="Leave a group", 

480 instructions="Select the group you want to leave", 

481 fields=[ 

482 FormField( 

483 "group", 

484 "Group name", 

485 type="list-single", 

486 options=[ 

487 {"label": g.name or str(g.legacy_id), "value": str(i)} 

488 for i, g in enumerate(groups) 

489 ], 

490 ) 

491 ], 

492 handler=self.confirm, 

493 handler_args=(groups,), 

494 ) 

495 

496 async def confirm( 

497 self, 

498 form_values: FormValues, 

499 _session: AnySession | None, 

500 _ifrom: JID, 

501 groups: list[AnyMUC], 

502 ) -> Confirmation: 

503 group = groups[int(form_values["group"])] # type:ignore 

504 return Confirmation( 

505 prompt=f"Are you sure you want to leave the group '{group.name}'?", 

506 handler=self.finish, 

507 handler_args=(group,), 

508 ) 

509 

510 @staticmethod 

511 async def finish(session: AnySession, _ifrom: JID, group: AnyMUC) -> None: 

512 await session.on_leave_group(group.legacy_id) 

513 await session.bookmarks.remove(group, reason="You left this group via slidge.") 

514 

515 

516class InviteInGroups(Command[AnySession]): 

517 NAME = "💌 Re-invite me in my groups" 

518 HELP = "Ask the gateway to send invitations for all your private groups" 

519 CHAT_COMMAND = "re-invite" 

520 NODE = GROUPS.node + "/" + CHAT_COMMAND 

521 ACCESS = CommandAccess.USER_LOGGED 

522 CATEGORY = GROUPS 

523 

524 async def run(self, session: "AnySession | None", _ifrom: JID, *_: str) -> None: 

525 assert session is not None 

526 await session.ready 

527 for muc in session.bookmarks: 

528 if muc.type == MucType.GROUP: 

529 session.send_gateway_invite( 

530 muc, reason="You asked to be re-invited in all groups." 

531 ) 

532 

533 

534async def _get_updated_spaces(session: AnySession) -> list[Space]: 

535 await session.ready 

536 with session.xmpp.store.session() as orm: 

537 spaces = list(session.xmpp.store.spaces.get_all(orm, session.user_pk)) 

538 updated_spaces: list[Space] = [] 

539 for space in spaces: 

540 try: 

541 updated_spaces.append(await session.bookmarks.update_space_if_needed(space)) 

542 except Exception: 

543 session.log.exception( 

544 "Something went wrong trying to update space '%r'", space 

545 ) 

546 

547 return sorted(updated_spaces, key=lambda s: s.name or str(s.legacy_id))