Coverage for slidge / command / user.py: 50%

243 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1# Commands available to users 

2import contextlib 

3from copy import deepcopy 

4from typing import Any, cast 

5 

6from slixmpp import JID 

7from slixmpp.exceptions import XMPPError 

8 

9from slidge.db.meta import JSONSerializable 

10from slidge.db.models import Space 

11 

12from ..util.types import ( 

13 AnyMUC, 

14 AnySession, 

15 MucType, 

16 UserPreferences, 

17) 

18from .base import ( 

19 Command, 

20 CommandAccess, 

21 Confirmation, 

22 ConfirmationSession, 

23 Form, 

24 FormField, 

25 FormSession, 

26 FormValues, 

27 SearchResult, 

28 TableResult, 

29) 

30from .categories import CONTACTS, GROUPS, SPACES 

31 

32 

33class Search(Command[AnySession]): 

34 NAME = "🔎 Search for contacts" 

35 HELP = "Search for contacts via this gateway" 

36 CHAT_COMMAND = "find" 

37 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

38 ACCESS = CommandAccess.USER_LOGGED 

39 CATEGORY = CONTACTS 

40 

41 async def run( 

42 self, session: "AnySession | None", _ifrom: JID, *args: str 

43 ) -> FormSession[AnySession] | SearchResult | None: 

44 assert session is not None 

45 await session.ready 

46 if args: 

47 return await session.on_search( 

48 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)} 

49 ) 

50 return FormSession[AnySession]( 

51 title=self.xmpp.SEARCH_TITLE, 

52 instructions=self.xmpp.SEARCH_INSTRUCTIONS, 

53 fields=self.xmpp.SEARCH_FIELDS, 

54 handler=self.search, 

55 ) 

56 

57 @staticmethod 

58 async def search( 

59 form_values: FormValues, session: "AnySession | None", _ifrom: JID 

60 ) -> SearchResult: 

61 assert session is not None 

62 results = await session.on_search(form_values) # type: ignore 

63 if results is None: 

64 raise XMPPError("item-not-found", "No contact was found") 

65 

66 return results 

67 

68 

69class SyncContacts(Command[AnySession]): 

70 NAME = "🔄 Sync XMPP roster" 

71 HELP = ( 

72 "Synchronize your XMPP roster with your legacy contacts. " 

73 "Slidge will only add/remove/modify contacts in its dedicated roster group" 

74 ) 

75 CHAT_COMMAND = "sync-contacts" 

76 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

77 ACCESS = CommandAccess.USER_LOGGED 

78 CATEGORY = CONTACTS 

79 

80 async def run( 

81 self, 

82 session: "AnySession | None", 

83 _ifrom: JID, 

84 *args: str, 

85 ) -> ConfirmationSession[AnySession]: 

86 assert session is not None 

87 await session.ready 

88 return ConfirmationSession[AnySession]( 

89 prompt="Are you sure you want to sync your roster?", 

90 success=None, 

91 handler=self.sync, 

92 ) 

93 

94 async def sync(self, session: "AnySession | None", _ifrom: JID) -> str: 

95 if session is None: 

96 raise RuntimeError 

97 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare) 

98 

99 contacts = session.contacts.known_contacts() 

100 

101 added = 0 

102 removed = 0 

103 updated = 0 

104 for item in roster_iq["roster"]: 

105 groups = set(item["groups"]) 

106 if self.xmpp.ROSTER_GROUP in groups: 

107 contact = contacts.pop(item["jid"], None) 

108 if contact is None: 

109 if len(groups) == 1: 

110 await self.xmpp["xep_0356"].set_roster( 

111 session.user_jid, {item["jid"]: {"subscription": "remove"}} 

112 ) 

113 removed += 1 

114 else: 

115 groups.remove(self.xmpp.ROSTER_GROUP) 

116 await self.xmpp["xep_0356"].set_roster( 

117 session.user_jid, 

118 { 

119 item["jid"]: { 

120 "subscription": item["subscription"], 

121 "name": item["name"], 

122 "groups": groups, 

123 } 

124 }, 

125 ) 

126 updated += 1 

127 else: 

128 if contact.name != item["name"]: 

129 await contact.add_to_roster(force=True) 

130 updated += 1 

131 

132 # we popped before so this only acts on slidge contacts not in the xmpp roster 

133 for contact in contacts.values(): 

134 added += 1 

135 await contact.add_to_roster() 

136 

137 return f"{added} added, {removed} removed, {updated} updated" 

138 

139 

140class ListContacts(Command[AnySession]): 

141 NAME = HELP = "👤 List your legacy contacts" 

142 CHAT_COMMAND = "contacts" 

143 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

144 ACCESS = CommandAccess.USER_LOGGED 

145 CATEGORY = CONTACTS 

146 

147 async def run( 

148 self, 

149 session: "AnySession | None", 

150 _ifrom: JID, 

151 *_: str, 

152 ) -> TableResult: 

153 assert session is not None 

154 await session.ready 

155 contacts = sorted( 

156 session.contacts, key=lambda c: c.name.casefold() if c.name else "" 

157 ) 

158 return TableResult( 

159 description="Your buddies", 

160 fields=[FormField("name"), FormField("jid", type="jid-single")], 

161 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts], 

162 ) 

163 

164 

165class ListGroups(Command[AnySession]): 

166 NAME = HELP = "👥 List your legacy groups" 

167 CHAT_COMMAND = "groups" 

168 NODE = GROUPS.node + "/" + CHAT_COMMAND 

169 ACCESS = CommandAccess.USER_LOGGED 

170 CATEGORY = GROUPS 

171 

172 async def run( 

173 self, session: "AnySession | None", _ifrom: JID, *_: str 

174 ) -> TableResult: 

175 assert session is not None 

176 await session.ready 

177 groups: list[AnyMUC] = sorted( 

178 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold() 

179 ) 

180 return TableResult( 

181 description="Your groups", 

182 fields=[FormField("name"), FormField("jid", type="jid-single")], 

183 items=[ 

184 {"name": g.name or str(g.legacy_id), "jid": g.jid.bare} for g in groups 

185 ], 

186 jids_are_mucs=True, 

187 ) 

188 

189 

190class ListSpaces(Command[AnySession]): 

191 NAME = "🌐 List my spaces" 

192 HELP = "List the spaces you are part of. Spaces are collections of rooms." 

193 CHAT_COMMAND = "spaces" 

194 NODE = GROUPS.node + "/" + CHAT_COMMAND 

195 ACCESS = CommandAccess.USER_LOGGED 

196 CATEGORY = SPACES 

197 

198 related_to_spaces = True 

199 

200 async def run( 

201 self, session: "AnySession | None", _ifrom: JID, *_: str 

202 ) -> TableResult: 

203 assert session is not None 

204 spaces = await _get_updated_spaces(session) 

205 return TableResult( 

206 description="Your spaces. If your client does not support spaces, use the 'space-rooms' command.", 

207 fields=[FormField("name"), FormField("iri")], 

208 items=[ 

209 { 

210 "name": s.name or str(s.legacy_id), 

211 "iri": f"xmpp:{self.xmpp.boundjid.bare}?;node={await session.bookmarks.space_legacy_id_to_node(s.legacy_id)}", 

212 } 

213 for s in spaces 

214 ], 

215 jids_are_mucs=True, 

216 ) 

217 

218 

219class ListRoomsInSpace(Command[AnySession]): 

220 NAME = "🌐 List the rooms in a space" 

221 HELP = "List the rooms of a space you are part of. Spaces are collections of rooms." 

222 CHAT_COMMAND = "space-rooms" 

223 NODE = GROUPS.node + "/" + CHAT_COMMAND 

224 ACCESS = CommandAccess.USER_LOGGED 

225 CATEGORY = SPACES 

226 

227 async def run( 

228 self, session: "AnySession | None", _ifrom: JID, *_: str 

229 ) -> "FormSession[AnySession]": 

230 assert session is not None 

231 spaces = await _get_updated_spaces(session) 

232 return FormSession( 

233 title="Your spaces", 

234 instructions="Select a space to view its rooms", 

235 fields=[ 

236 FormField( 

237 var="space_legacy_id", 

238 label="Space", 

239 required=True, 

240 type="list-single", 

241 options=[ 

242 { 

243 "label": s.name or str(s.legacy_id), 

244 "value": str(s.legacy_id), 

245 } 

246 for s in spaces 

247 ], 

248 ) 

249 ], 

250 handler=self.list_rooms, 

251 handler_args=({str(s.legacy_id): s.name for s in spaces},), 

252 ) 

253 

254 async def list_rooms( 

255 self, 

256 form_values: FormValues, 

257 session: "AnySession | None", 

258 _ifrom: JID, 

259 space_names: dict[str, str], 

260 ) -> TableResult: 

261 assert session is not None 

262 space_legacy_id = form_values.get("space_legacy_id") 

263 if space_legacy_id is None: 

264 raise XMPPError("bad-request", "You need to specify a space") 

265 assert isinstance(space_legacy_id, str) 

266 await session.ready 

267 with self.xmpp.store.session() as orm: 

268 rooms = sorted( 

269 list( 

270 self.xmpp.store.spaces.get_rooms( 

271 orm, 

272 session.user_pk, 

273 await session.bookmarks.space_legacy_id_to_node( 

274 space_legacy_id 

275 ), 

276 ) 

277 ), 

278 key=lambda r: (r.name or str(r.jid.node)).casefold(), 

279 ) 

280 name = space_names.get(space_legacy_id) 

281 return TableResult( 

282 fields=[ 

283 FormField("name", "Name"), 

284 FormField("jid", "JID", type="jid-single"), 

285 ], 

286 description=f"Rooms of '{name or space_legacy_id}'", 

287 items=[{"name": r.name or str(r.legacy_id), "jid": r.jid} for r in rooms], 

288 jids_are_mucs=True, 

289 ) 

290 

291 

292class Login(Command[AnySession]): 

293 NAME = "🔐 Re-login to the legacy network" 

294 HELP = "Login to the legacy service" 

295 CHAT_COMMAND = "re-login" 

296 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND 

297 

298 ACCESS = CommandAccess.USER_NON_LOGGED 

299 

300 async def run( 

301 self, 

302 session: "AnySession | None", 

303 _ifrom: JID, 

304 *_: str, 

305 ) -> str: 

306 assert session is not None 

307 if session.is_logging_in: 

308 raise XMPPError("bad-request", "You are already logging in.") 

309 session.is_logging_in = True 

310 try: 

311 msg = await self.xmpp.login_wrap(session) 

312 except Exception as e: 

313 session.send_gateway_status(f"Re-login failed: {e}", show="dnd") 

314 raise XMPPError( 

315 "internal-server-error", etype="wait", text=f"Could not login: {e}" 

316 ) 

317 finally: 

318 session.is_logging_in = False 

319 session.logged = True 

320 session.send_gateway_status(msg or "Re-connected", show="chat") 

321 session.send_gateway_message(msg or "Re-connected") 

322 return msg 

323 

324 

325class CreateGroup(Command[AnySession]): 

326 NAME = "🆕 New legacy group" 

327 HELP = "Create a group on the legacy service" 

328 CHAT_COMMAND = "create-group" 

329 NODE = GROUPS.node + "/" + CHAT_COMMAND 

330 CATEGORY = GROUPS 

331 

332 ACCESS = CommandAccess.USER_LOGGED 

333 

334 async def run( 

335 self, 

336 session: "AnySession | None", 

337 _ifrom: JID, 

338 *_: str, 

339 ) -> FormSession[AnySession]: 

340 assert session is not None 

341 await session.ready 

342 contacts = session.contacts.known_contacts(only_friends=True) 

343 return FormSession( 

344 title="Create a new group", 

345 instructions="Pick contacts that should be part of this new group", 

346 fields=[ 

347 FormField(var="group_name", label="Name of the group", required=True), 

348 FormField( 

349 var="contacts", 

350 label="Contacts to add to the new group", 

351 type="list-multi", 

352 options=[ 

353 {"value": str(contact.jid), "label": contact.name} 

354 for contact in sorted(contacts.values(), key=lambda c: c.name) 

355 ], 

356 required=False, 

357 ), 

358 ], 

359 handler=self.finish, 

360 ) 

361 

362 @staticmethod 

363 async def finish( 

364 form_values: FormValues, 

365 session: "AnySession | None", 

366 *_: Any, # noqa:ANN401 

367 ) -> TableResult: 

368 assert session is not None 

369 legacy_id: str = await session.on_create_group( 

370 cast(str, form_values["group_name"]), 

371 [ 

372 await session.contacts.by_jid(JID(j)) 

373 for j in form_values.get("contacts", []) # type:ignore 

374 ], 

375 ) 

376 muc = await session.bookmarks.by_legacy_id(legacy_id) 

377 return TableResult( 

378 description=f"Your new group: xmpp:{muc.jid}?join", 

379 fields=[FormField("name"), FormField("jid", type="jid-single")], 

380 items=[{"name": muc.name, "jid": muc.jid}], 

381 jids_are_mucs=True, 

382 ) 

383 

384 

385class Preferences(Command[AnySession]): 

386 NAME = "⚙️ Preferences" 

387 HELP = "Customize the gateway behaviour to your liking" 

388 CHAT_COMMAND = "preferences" 

389 NODE = "https://slidge.im/command/core/preferences" 

390 ACCESS = CommandAccess.USER 

391 

392 async def run( 

393 self, 

394 session: "AnySession | None", 

395 _ifrom: JID, 

396 *_: str, 

397 ) -> FormSession[AnySession]: 

398 fields = deepcopy(self.xmpp.PREFERENCES) 

399 assert session is not None 

400 current = session.user.preferences 

401 for field in fields: 

402 field.value = current.get(field.var, field.value) # type:ignore 

403 return Form( 

404 title="Preferences", 

405 instructions=self.HELP, 

406 fields=fields, 

407 handler=self.finish, # type:ignore 

408 handler_kwargs={"previous": current}, 

409 ) 

410 

411 async def finish( 

412 self, 

413 form_values: UserPreferences, 

414 session: "AnySession | None", 

415 *_: Any, # noqa:ANN401 

416 previous: JSONSerializable, 

417 ) -> str: 

418 assert session is not None 

419 if previous == form_values: 

420 return "No preference was changed" 

421 

422 previous = previous.copy() 

423 user = session.user 

424 user.preferences.update(form_values) # type:ignore 

425 self.xmpp.store.users.update(user) 

426 

427 with contextlib.suppress(NotImplementedError): 

428 await session.on_preferences(previous, form_values) # type:ignore[arg-type] 

429 

430 if not previous["sync_avatar"] and form_values["sync_avatar"]: 

431 await self.xmpp.fetch_user_avatar(session) 

432 else: 

433 user.avatar_hash = None 

434 

435 return "Your preferences have been updated." 

436 

437 

438class Unregister(Command[AnySession]): 

439 NAME = "❌ Unregister from the gateway" 

440 HELP = "Unregister from the gateway" 

441 CHAT_COMMAND = "unregister" 

442 NODE = "https://slidge.im/command/core/unregister" 

443 ACCESS = CommandAccess.USER 

444 

445 async def run( 

446 self, session: "AnySession | None", _ifrom: JID, *_: str 

447 ) -> ConfirmationSession[AnySession]: 

448 return ConfirmationSession[AnySession]( 

449 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?", 

450 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.", 

451 handler=self.unregister, 

452 ) 

453 

454 async def unregister(self, session: "AnySession | None", _ifrom: JID) -> str: 

455 assert session is not None 

456 await self.xmpp.unregister_user(session.user) 

457 return "You are not registered anymore. Bye!" 

458 

459 

460class LeaveGroup(Command[AnySession]): 

461 NAME = HELP = "❌ Leave a legacy group" 

462 CHAT_COMMAND = "leave-group" 

463 NODE = GROUPS.node + "/" + CHAT_COMMAND 

464 ACCESS = CommandAccess.USER_LOGGED 

465 CATEGORY = GROUPS 

466 

467 async def run( 

468 self, 

469 session: "AnySession | None", 

470 ifrom: JID, 

471 *_: str, 

472 ) -> FormSession[AnySession]: 

473 assert session is not None 

474 await session.ready 

475 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

476 return FormSession( 

477 title="Leave a group", 

478 instructions="Select the group you want to leave", 

479 fields=[ 

480 FormField( 

481 "group", 

482 "Group name", 

483 type="list-single", 

484 options=[ 

485 {"label": g.name or str(g.legacy_id), "value": str(i)} 

486 for i, g in enumerate(groups) 

487 ], 

488 ) 

489 ], 

490 handler=self.confirm, 

491 handler_args=(groups,), 

492 ) 

493 

494 async def confirm( 

495 self, 

496 form_values: FormValues, 

497 _session: "AnySession | None", 

498 _ifrom: JID, 

499 groups: list[AnyMUC], 

500 ) -> Confirmation: 

501 group = groups[int(form_values["group"])] # type:ignore 

502 return Confirmation( 

503 prompt=f"Are you sure you want to leave the group '{group.name}'?", 

504 handler=self.finish, 

505 handler_args=(group,), 

506 ) 

507 

508 @staticmethod 

509 async def finish(session: AnySession, _ifrom: JID, group: AnyMUC) -> None: 

510 await group.on_leave() 

511 await session.bookmarks.remove(group, reason="You left this group via slidge.") 

512 

513 

514class InviteInGroups(Command[AnySession]): 

515 NAME = "💌 Re-invite me in my groups" 

516 HELP = "Ask the gateway to send invitations for all your private groups" 

517 CHAT_COMMAND = "re-invite" 

518 NODE = GROUPS.node + "/" + CHAT_COMMAND 

519 ACCESS = CommandAccess.USER_LOGGED 

520 CATEGORY = GROUPS 

521 

522 async def run(self, session: "AnySession | None", _ifrom: JID, *_: str) -> None: 

523 assert session is not None 

524 await session.ready 

525 for muc in session.bookmarks: 

526 if muc.type == MucType.GROUP: 

527 session.send_gateway_invite( 

528 muc, reason="You asked to be re-invited in all groups." 

529 ) 

530 

531 

532async def _get_updated_spaces(session: AnySession) -> list[Space]: 

533 await session.ready 

534 with session.xmpp.store.session() as orm: 

535 spaces = list(session.xmpp.store.spaces.get_all(orm, session.user_pk)) 

536 updated_spaces: list[Space] = [] 

537 for space in spaces: 

538 try: 

539 updated_spaces.append(await session.bookmarks.update_space_if_needed(space)) 

540 except Exception: 

541 session.log.exception( 

542 "Something went wrong trying to update space '%r'", space 

543 ) 

544 

545 return sorted(updated_spaces, key=lambda s: s.name or str(s.legacy_id))