Coverage for slidge / command / user.py: 47%

197 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1# Commands available to users 

2from copy import deepcopy 

3from typing import TYPE_CHECKING, Any, cast 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7 

8from slidge.db.meta import JSONSerializable 

9 

10from ..util.types import ( 

11 AnyMUC, 

12 AnySession, 

13 LegacyGroupIdType, 

14 MucType, 

15 UserPreferences, 

16) 

17from .base import ( 

18 Command, 

19 CommandAccess, 

20 Confirmation, 

21 ConfirmationSession, 

22 Form, 

23 FormField, 

24 FormSession, 

25 FormValues, 

26 SearchResult, 

27 TableResult, 

28) 

29from .categories import CONTACTS, GROUPS 

30 

31if TYPE_CHECKING: 

32 pass 

33 

34 

35class Search(Command[AnySession]): 

36 NAME = "🔎 Search for contacts" 

37 HELP = "Search for contacts via this gateway" 

38 CHAT_COMMAND = "find" 

39 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

40 ACCESS = CommandAccess.USER_LOGGED 

41 CATEGORY = CONTACTS 

42 

43 async def run( 

44 self, session: AnySession | None, _ifrom: JID, *args: str 

45 ) -> FormSession[AnySession] | SearchResult | None: 

46 assert session is not None 

47 await session.ready 

48 if args: 

49 return await session.on_search( 

50 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)} 

51 ) 

52 return FormSession[AnySession]( 

53 title=self.xmpp.SEARCH_TITLE, 

54 instructions=self.xmpp.SEARCH_INSTRUCTIONS, 

55 fields=self.xmpp.SEARCH_FIELDS, 

56 handler=self.search, 

57 ) 

58 

59 @staticmethod 

60 async def search( 

61 form_values: FormValues, session: AnySession | None, _ifrom: JID 

62 ) -> SearchResult: 

63 assert session is not None 

64 results = await session.on_search(form_values) # type: ignore 

65 if results is None: 

66 raise XMPPError("item-not-found", "No contact was found") 

67 

68 return results 

69 

70 

71class SyncContacts(Command[AnySession]): 

72 NAME = "🔄 Sync XMPP roster" 

73 HELP = ( 

74 "Synchronize your XMPP roster with your legacy contacts. " 

75 "Slidge will only add/remove/modify contacts in its dedicated roster group" 

76 ) 

77 CHAT_COMMAND = "sync-contacts" 

78 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

79 ACCESS = CommandAccess.USER_LOGGED 

80 CATEGORY = CONTACTS 

81 

82 async def run( 

83 self, 

84 session: AnySession | None, 

85 _ifrom: JID, 

86 *args: str, 

87 ) -> ConfirmationSession[AnySession]: 

88 assert session is not None 

89 await session.ready 

90 return ConfirmationSession[AnySession]( 

91 prompt="Are you sure you want to sync your roster?", 

92 success=None, 

93 handler=self.sync, 

94 ) 

95 

96 async def sync(self, session: AnySession | None, _ifrom: JID) -> str: 

97 if session is None: 

98 raise RuntimeError 

99 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare) 

100 

101 contacts = session.contacts.known_contacts() 

102 

103 added = 0 

104 removed = 0 

105 updated = 0 

106 for item in roster_iq["roster"]: 

107 groups = set(item["groups"]) 

108 if self.xmpp.ROSTER_GROUP in groups: 

109 contact = contacts.pop(item["jid"], None) 

110 if contact is None: 

111 if len(groups) == 1: 

112 await self.xmpp["xep_0356"].set_roster( 

113 session.user_jid, {item["jid"]: {"subscription": "remove"}} 

114 ) 

115 removed += 1 

116 else: 

117 groups.remove(self.xmpp.ROSTER_GROUP) 

118 await self.xmpp["xep_0356"].set_roster( 

119 session.user_jid, 

120 { 

121 item["jid"]: { 

122 "subscription": item["subscription"], 

123 "name": item["name"], 

124 "groups": groups, 

125 } 

126 }, 

127 ) 

128 updated += 1 

129 else: 

130 if contact.name != item["name"]: 

131 await contact.add_to_roster(force=True) 

132 updated += 1 

133 

134 # we popped before so this only acts on slidge contacts not in the xmpp roster 

135 for contact in contacts.values(): 

136 added += 1 

137 await contact.add_to_roster() 

138 

139 return f"{added} added, {removed} removed, {updated} updated" 

140 

141 

142class ListContacts(Command[AnySession]): 

143 NAME = HELP = "👤 List your legacy contacts" 

144 CHAT_COMMAND = "contacts" 

145 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

146 ACCESS = CommandAccess.USER_LOGGED 

147 CATEGORY = CONTACTS 

148 

149 async def run( 

150 self, 

151 session: AnySession | None, 

152 _ifrom: JID, 

153 *_: str, 

154 ) -> TableResult: 

155 assert session is not None 

156 await session.ready 

157 contacts = sorted( 

158 session.contacts, key=lambda c: c.name.casefold() if c.name else "" 

159 ) 

160 return TableResult( 

161 description="Your buddies", 

162 fields=[FormField("name"), FormField("jid", type="jid-single")], 

163 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts], 

164 ) 

165 

166 

167class ListGroups(Command[AnySession]): 

168 NAME = HELP = "👥 List your legacy groups" 

169 CHAT_COMMAND = "groups" 

170 NODE = GROUPS.node + "/" + CHAT_COMMAND 

171 ACCESS = CommandAccess.USER_LOGGED 

172 CATEGORY = GROUPS 

173 

174 async def run( 

175 self, session: AnySession | None, _ifrom: JID, *_: str 

176 ) -> TableResult: 

177 assert session is not None 

178 await session.ready 

179 groups: list[AnyMUC] = sorted( 

180 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold() 

181 ) 

182 return TableResult( 

183 description="Your groups", 

184 fields=[FormField("name"), FormField("jid", type="jid-single")], 

185 items=[ 

186 {"name": g.name or str(g.legacy_id), "jid": g.jid.bare} for g in groups 

187 ], 

188 jids_are_mucs=True, 

189 ) 

190 

191 

192class Login(Command[AnySession]): 

193 NAME = "🔐 Re-login to the legacy network" 

194 HELP = "Login to the legacy service" 

195 CHAT_COMMAND = "re-login" 

196 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND 

197 

198 ACCESS = CommandAccess.USER_NON_LOGGED 

199 

200 async def run( 

201 self, 

202 session: AnySession | None, 

203 _ifrom: JID, 

204 *_: str, 

205 ) -> str: 

206 assert session is not None 

207 if session.is_logging_in: 

208 raise XMPPError("bad-request", "You are already logging in.") 

209 session.is_logging_in = True 

210 try: 

211 msg = await self.xmpp.login_wrap(session) 

212 except Exception as e: 

213 session.send_gateway_status(f"Re-login failed: {e}", show="dnd") 

214 raise XMPPError( 

215 "internal-server-error", etype="wait", text=f"Could not login: {e}" 

216 ) 

217 finally: 

218 session.is_logging_in = False 

219 session.logged = True 

220 session.send_gateway_status(msg or "Re-connected", show="chat") 

221 session.send_gateway_message(msg or "Re-connected") 

222 return msg 

223 

224 

225class CreateGroup(Command[AnySession]): 

226 NAME = "🆕 New legacy group" 

227 HELP = "Create a group on the legacy service" 

228 CHAT_COMMAND = "create-group" 

229 NODE = GROUPS.node + "/" + CHAT_COMMAND 

230 CATEGORY = GROUPS 

231 

232 ACCESS = CommandAccess.USER_LOGGED 

233 

234 async def run( 

235 self, 

236 session: AnySession | None, 

237 _ifrom: JID, 

238 *_: str, 

239 ) -> FormSession[AnySession]: 

240 assert session is not None 

241 await session.ready 

242 contacts = session.contacts.known_contacts(only_friends=True) 

243 return FormSession( 

244 title="Create a new group", 

245 instructions="Pick contacts that should be part of this new group", 

246 fields=[ 

247 FormField(var="group_name", label="Name of the group", required=True), 

248 FormField( 

249 var="contacts", 

250 label="Contacts to add to the new group", 

251 type="list-multi", 

252 options=[ 

253 {"value": str(contact.jid), "label": contact.name} 

254 for contact in sorted(contacts.values(), key=lambda c: c.name) 

255 ], 

256 required=False, 

257 ), 

258 ], 

259 handler=self.finish, 

260 ) 

261 

262 @staticmethod 

263 async def finish( 

264 form_values: FormValues, 

265 session: AnySession | None, 

266 *_: Any, # noqa:ANN401 

267 ) -> TableResult: 

268 assert session is not None 

269 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore 

270 cast(str, form_values["group_name"]), 

271 [ 

272 await session.contacts.by_jid(JID(j)) 

273 for j in form_values.get("contacts", []) # type:ignore 

274 ], 

275 ) 

276 muc = await session.bookmarks.by_legacy_id(legacy_id) 

277 return TableResult( 

278 description=f"Your new group: xmpp:{muc.jid}?join", 

279 fields=[FormField("name"), FormField("jid", type="jid-single")], 

280 items=[{"name": muc.name, "jid": muc.jid}], 

281 jids_are_mucs=True, 

282 ) 

283 

284 

285class Preferences(Command[AnySession]): 

286 NAME = "⚙️ Preferences" 

287 HELP = "Customize the gateway behaviour to your liking" 

288 CHAT_COMMAND = "preferences" 

289 NODE = "https://slidge.im/command/core/preferences" 

290 ACCESS = CommandAccess.USER 

291 

292 async def run( 

293 self, 

294 session: AnySession | None, 

295 _ifrom: JID, 

296 *_: str, 

297 ) -> FormSession[AnySession]: 

298 fields = deepcopy(self.xmpp.PREFERENCES) 

299 assert session is not None 

300 current = session.user.preferences 

301 for field in fields: 

302 field.value = current.get(field.var, field.value) # type:ignore 

303 return Form( 

304 title="Preferences", 

305 instructions=self.HELP, 

306 fields=fields, 

307 handler=self.finish, # type:ignore 

308 handler_kwargs={"previous": current}, 

309 ) 

310 

311 async def finish( 

312 self, 

313 form_values: UserPreferences, 

314 session: AnySession | None, 

315 *_: Any, # noqa:ANN401 

316 previous: JSONSerializable, 

317 ) -> str: 

318 assert session is not None 

319 if previous == form_values: 

320 return "No preference was changed" 

321 

322 user = session.user 

323 user.preferences.update(form_values) # type:ignore 

324 self.xmpp.store.users.update(user) 

325 

326 try: 

327 await session.on_preferences(previous, form_values) # type:ignore[arg-type] 

328 except NotImplementedError: 

329 pass 

330 

331 if not previous["sync_avatar"] and form_values["sync_avatar"]: 

332 await self.xmpp.fetch_user_avatar(session) 

333 else: 

334 user.avatar_hash = None 

335 

336 return "Your preferences have been updated." 

337 

338 

339class Unregister(Command[AnySession]): 

340 NAME = "❌ Unregister from the gateway" 

341 HELP = "Unregister from the gateway" 

342 CHAT_COMMAND = "unregister" 

343 NODE = "https://slidge.im/command/core/unregister" 

344 ACCESS = CommandAccess.USER 

345 

346 async def run( 

347 self, session: AnySession | None, _ifrom: JID, *_: str 

348 ) -> ConfirmationSession[AnySession]: 

349 return ConfirmationSession[AnySession]( 

350 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?", 

351 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.", 

352 handler=self.unregister, 

353 ) 

354 

355 async def unregister(self, session: AnySession | None, _ifrom: JID) -> str: 

356 assert session is not None 

357 await self.xmpp.unregister_user(session.user) 

358 return "You are not registered anymore. Bye!" 

359 

360 

361class LeaveGroup(Command[AnySession]): 

362 NAME = HELP = "❌ Leave a legacy group" 

363 CHAT_COMMAND = "leave-group" 

364 NODE = GROUPS.node + "/" + CHAT_COMMAND 

365 ACCESS = CommandAccess.USER_LOGGED 

366 CATEGORY = GROUPS 

367 

368 async def run( 

369 self, 

370 session: AnySession | None, 

371 _ifrom: JID, 

372 *_: str, 

373 ) -> FormSession[AnySession]: 

374 assert session is not None 

375 await session.ready 

376 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

377 return FormSession( 

378 title="Leave a group", 

379 instructions="Select the group you want to leave", 

380 fields=[ 

381 FormField( 

382 "group", 

383 "Group name", 

384 type="list-single", 

385 options=[ 

386 {"label": g.name or str(g.legacy_id), "value": str(i)} 

387 for i, g in enumerate(groups) 

388 ], 

389 ) 

390 ], 

391 handler=self.confirm, 

392 handler_args=(groups,), 

393 ) 

394 

395 async def confirm( 

396 self, 

397 form_values: FormValues, 

398 _session: AnySession | None, 

399 _ifrom: JID, 

400 groups: list[AnyMUC], 

401 ) -> Confirmation: 

402 group = groups[int(form_values["group"])] # type:ignore 

403 return Confirmation( 

404 prompt=f"Are you sure you want to leave the group '{group.name}'?", 

405 handler=self.finish, 

406 handler_args=(group,), 

407 ) 

408 

409 @staticmethod 

410 async def finish(session: AnySession, _ifrom: JID, group: AnyMUC) -> None: 

411 await session.on_leave_group(group.legacy_id) 

412 await session.bookmarks.remove(group, reason="You left this group via slidge.") 

413 

414 

415class InviteInGroups(Command[AnySession]): 

416 NAME = "💌 Re-invite me in my groups" 

417 HELP = "Ask the gateway to send invitations for all your private groups" 

418 CHAT_COMMAND = "re-invite" 

419 NODE = GROUPS.node + "/" + CHAT_COMMAND 

420 ACCESS = CommandAccess.USER_LOGGED 

421 CATEGORY = GROUPS 

422 

423 async def run(self, session: "AnySession | None", _ifrom: JID, *_: str) -> None: 

424 assert session is not None 

425 await session.ready 

426 for muc in session.bookmarks: 

427 if muc.type == MucType.GROUP: 

428 session.send_gateway_invite( 

429 muc, reason="You asked to be re-invited in all groups." 

430 )