Coverage for slidge / command / user.py: 47%

197 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-02-15 09:02 +0000

1# Commands available to users 

2from copy import deepcopy 

3from typing import TYPE_CHECKING, Any, cast 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7 

8from ..group.room import LegacyMUC 

9from ..util.types import AnyBaseSession, LegacyGroupIdType, MucType, UserPreferences 

10from .base import ( 

11 Command, 

12 CommandAccess, 

13 Confirmation, 

14 Form, 

15 FormField, 

16 FormValues, 

17 SearchResult, 

18 TableResult, 

19) 

20from .categories import CONTACTS, GROUPS 

21 

22if TYPE_CHECKING: 

23 pass 

24 

25 

26class Search(Command): 

27 NAME = "🔎 Search for contacts" 

28 HELP = "Search for contacts via this gateway" 

29 CHAT_COMMAND = "find" 

30 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

31 ACCESS = CommandAccess.USER_LOGGED 

32 CATEGORY = CONTACTS 

33 

34 async def run( 

35 self, session: AnyBaseSession | None, _ifrom: JID, *args: str 

36 ) -> Form | SearchResult | None: 

37 assert session is not None 

38 await session.ready 

39 if args: 

40 return await session.on_search( 

41 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)} 

42 ) 

43 return Form( 

44 title=self.xmpp.SEARCH_TITLE, 

45 instructions=self.xmpp.SEARCH_INSTRUCTIONS, 

46 fields=self.xmpp.SEARCH_FIELDS, 

47 handler=self.search, 

48 ) 

49 

50 @staticmethod 

51 async def search( 

52 form_values: FormValues, session: AnyBaseSession | None, _ifrom: JID 

53 ) -> SearchResult: 

54 assert session is not None 

55 results = await session.on_search(form_values) # type: ignore 

56 if results is None: 

57 raise XMPPError("item-not-found", "No contact was found") 

58 

59 return results 

60 

61 

62class SyncContacts(Command): 

63 NAME = "🔄 Sync XMPP roster" 

64 HELP = ( 

65 "Synchronize your XMPP roster with your legacy contacts. " 

66 "Slidge will only add/remove/modify contacts in its dedicated roster group" 

67 ) 

68 CHAT_COMMAND = "sync-contacts" 

69 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

70 ACCESS = CommandAccess.USER_LOGGED 

71 CATEGORY = CONTACTS 

72 

73 async def run(self, session: AnyBaseSession | None, _ifrom, *_) -> Confirmation: 

74 assert session is not None 

75 await session.ready 

76 return Confirmation( 

77 prompt="Are you sure you want to sync your roster?", 

78 success=None, 

79 handler=self.sync, 

80 ) 

81 

82 async def sync(self, session: AnyBaseSession | None, _ifrom: JID) -> str: 

83 if session is None: 

84 raise RuntimeError 

85 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare) 

86 

87 contacts = session.contacts.known_contacts() 

88 

89 added = 0 

90 removed = 0 

91 updated = 0 

92 for item in roster_iq["roster"]: 

93 groups = set(item["groups"]) 

94 if self.xmpp.ROSTER_GROUP in groups: 

95 contact = contacts.pop(item["jid"], None) 

96 if contact is None: 

97 if len(groups) == 1: 

98 await self.xmpp["xep_0356"].set_roster( 

99 session.user_jid, {item["jid"]: {"subscription": "remove"}} 

100 ) 

101 removed += 1 

102 else: 

103 groups.remove(self.xmpp.ROSTER_GROUP) 

104 await self.xmpp["xep_0356"].set_roster( 

105 session.user_jid, 

106 { 

107 item["jid"]: { 

108 "subscription": item["subscription"], 

109 "name": item["name"], 

110 "groups": groups, 

111 } 

112 }, 

113 ) 

114 updated += 1 

115 else: 

116 if contact.name != item["name"]: 

117 await contact.add_to_roster(force=True) 

118 updated += 1 

119 

120 # we popped before so this only acts on slidge contacts not in the xmpp roster 

121 for contact in contacts.values(): 

122 added += 1 

123 await contact.add_to_roster() 

124 

125 return f"{added} added, {removed} removed, {updated} updated" 

126 

127 

128class ListContacts(Command): 

129 NAME = HELP = "👤 List your legacy contacts" 

130 CHAT_COMMAND = "contacts" 

131 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

132 ACCESS = CommandAccess.USER_LOGGED 

133 CATEGORY = CONTACTS 

134 

135 async def run(self, session: AnyBaseSession | None, _ifrom: JID, *_) -> TableResult: 

136 assert session is not None 

137 await session.ready 

138 contacts = sorted( 

139 session.contacts, key=lambda c: c.name.casefold() if c.name else "" 

140 ) 

141 return TableResult( 

142 description="Your buddies", 

143 fields=[FormField("name"), FormField("jid", type="jid-single")], 

144 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts], 

145 ) 

146 

147 

148class ListGroups(Command): 

149 NAME = HELP = "👥 List your legacy groups" 

150 CHAT_COMMAND = "groups" 

151 NODE = GROUPS.node + "/" + CHAT_COMMAND 

152 ACCESS = CommandAccess.USER_LOGGED 

153 CATEGORY = GROUPS 

154 

155 async def run(self, session, _ifrom, *_): 

156 assert session is not None 

157 await session.ready 

158 groups = sorted( 

159 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold() 

160 ) 

161 return TableResult( 

162 description="Your groups", 

163 fields=[FormField("name"), FormField("jid", type="jid-single")], 

164 items=[{"name": g.name, "jid": g.jid.bare} for g in groups], 

165 jids_are_mucs=True, 

166 ) 

167 

168 

169class Login(Command): 

170 NAME = "🔐 Re-login to the legacy network" 

171 HELP = "Login to the legacy service" 

172 CHAT_COMMAND = "re-login" 

173 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND 

174 

175 ACCESS = CommandAccess.USER_NON_LOGGED 

176 

177 async def run(self, session: AnyBaseSession | None, _ifrom, *_): 

178 assert session is not None 

179 if session.is_logging_in: 

180 raise XMPPError("bad-request", "You are already logging in.") 

181 session.is_logging_in = True 

182 try: 

183 msg = await self.xmpp.login_wrap(session) 

184 except Exception as e: 

185 session.send_gateway_status(f"Re-login failed: {e}", show="dnd") 

186 raise XMPPError( 

187 "internal-server-error", etype="wait", text=f"Could not login: {e}" 

188 ) 

189 finally: 

190 session.is_logging_in = False 

191 session.logged = True 

192 session.send_gateway_status(msg or "Re-connected", show="chat") 

193 session.send_gateway_message(msg or "Re-connected") 

194 return msg 

195 

196 

197class CreateGroup(Command): 

198 NAME = "🆕 New legacy group" 

199 HELP = "Create a group on the legacy service" 

200 CHAT_COMMAND = "create-group" 

201 NODE = GROUPS.node + "/" + CHAT_COMMAND 

202 CATEGORY = GROUPS 

203 

204 ACCESS = CommandAccess.USER_LOGGED 

205 

206 async def run(self, session: AnyBaseSession | None, _ifrom, *_): 

207 assert session is not None 

208 await session.ready 

209 contacts = session.contacts.known_contacts(only_friends=True) 

210 return Form( 

211 title="Create a new group", 

212 instructions="Pick contacts that should be part of this new group", 

213 fields=[ 

214 FormField(var="group_name", label="Name of the group", required=True), 

215 FormField( 

216 var="contacts", 

217 label="Contacts to add to the new group", 

218 type="list-multi", 

219 options=[ 

220 {"value": str(contact.jid), "label": contact.name} 

221 for contact in sorted(contacts.values(), key=lambda c: c.name) 

222 ], 

223 required=False, 

224 ), 

225 ], 

226 handler=self.finish, 

227 ) 

228 

229 @staticmethod 

230 async def finish(form_values: FormValues, session: AnyBaseSession | None, *_): 

231 assert session is not None 

232 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore 

233 cast(str, form_values["group_name"]), 

234 [ 

235 await session.contacts.by_jid(JID(j)) 

236 for j in form_values.get("contacts", []) # type:ignore 

237 ], 

238 ) 

239 muc = await session.bookmarks.by_legacy_id(legacy_id) 

240 return TableResult( 

241 description=f"Your new group: xmpp:{muc.jid}?join", 

242 fields=[FormField("name"), FormField("jid", type="jid-single")], 

243 items=[{"name": muc.name, "jid": muc.jid}], 

244 jids_are_mucs=True, 

245 ) 

246 

247 

248class Preferences(Command): 

249 NAME = "⚙️ Preferences" 

250 HELP = "Customize the gateway behaviour to your liking" 

251 CHAT_COMMAND = "preferences" 

252 NODE = "https://slidge.im/command/core/preferences" 

253 ACCESS = CommandAccess.USER 

254 

255 async def run(self, session: AnyBaseSession | None, _ifrom: JID, *_: Any) -> Form: 

256 fields = deepcopy(self.xmpp.PREFERENCES) 

257 assert session is not None 

258 current = session.user.preferences 

259 for field in fields: 

260 field.value = current.get(field.var, field.value) # type:ignore 

261 return Form( 

262 title="Preferences", 

263 instructions=self.HELP, 

264 fields=fields, 

265 handler=self.finish, # type:ignore 

266 handler_kwargs={"previous": current}, 

267 ) 

268 

269 async def finish( 

270 self, 

271 form_values: UserPreferences, 

272 session: AnyBaseSession | None, 

273 *_, 

274 previous, 

275 ) -> str: 

276 assert session is not None 

277 if previous == form_values: 

278 return "No preference was changed" 

279 

280 user = session.user 

281 user.preferences.update(form_values) # type:ignore 

282 self.xmpp.store.users.update(user) 

283 

284 try: 

285 await session.on_preferences(previous, form_values) # type:ignore[arg-type] 

286 except NotImplementedError: 

287 pass 

288 

289 if not previous["sync_avatar"] and form_values["sync_avatar"]: 

290 await self.xmpp.fetch_user_avatar(session) 

291 else: 

292 user.avatar_hash = None 

293 

294 return "Your preferences have been updated." 

295 

296 

297class Unregister(Command): 

298 NAME = "❌ Unregister from the gateway" 

299 HELP = "Unregister from the gateway" 

300 CHAT_COMMAND = "unregister" 

301 NODE = "https://slidge.im/command/core/unregister" 

302 ACCESS = CommandAccess.USER 

303 

304 async def run( 

305 self, session: AnyBaseSession | None, _ifrom: JID, *_: Any 

306 ) -> Confirmation: 

307 return Confirmation( 

308 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?", 

309 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.", 

310 handler=self.unregister, 

311 ) 

312 

313 async def unregister(self, session: AnyBaseSession | None, _ifrom: JID) -> str: 

314 assert session is not None 

315 await self.xmpp.unregister_user(session.user) 

316 return "You are not registered anymore. Bye!" 

317 

318 

319class LeaveGroup(Command): 

320 NAME = HELP = "❌ Leave a legacy group" 

321 CHAT_COMMAND = "leave-group" 

322 NODE = GROUPS.node + "/" + CHAT_COMMAND 

323 ACCESS = CommandAccess.USER_LOGGED 

324 CATEGORY = GROUPS 

325 

326 async def run(self, session, _ifrom, *_): 

327 assert session is not None 

328 await session.ready 

329 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

330 return Form( 

331 title="Leave a group", 

332 instructions="Select the group you want to leave", 

333 fields=[ 

334 FormField( 

335 "group", 

336 "Group name", 

337 type="list-single", 

338 options=[ 

339 {"label": g.name, "value": str(i)} for i, g in enumerate(groups) 

340 ], 

341 ) 

342 ], 

343 handler=self.confirm, # type:ignore 

344 handler_args=(groups,), 

345 ) 

346 

347 async def confirm( 

348 self, 

349 form_values: FormValues, 

350 _session: AnyBaseSession, 

351 _ifrom, 

352 groups: list[LegacyMUC], 

353 ): 

354 group = groups[int(form_values["group"])] # type:ignore 

355 return Confirmation( 

356 prompt=f"Are you sure you want to leave the group '{group.name}'?", 

357 handler=self.finish, # type:ignore 

358 handler_args=(group,), 

359 ) 

360 

361 @staticmethod 

362 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC) -> None: 

363 await session.on_leave_group(group.legacy_id) 

364 await session.bookmarks.remove(group, reason="You left this group via slidge.") 

365 

366 

367class InviteInGroups(Command): 

368 NAME = "💌 Re-invite me in my groups" 

369 HELP = "Ask the gateway to send invitations for all your private groups" 

370 CHAT_COMMAND = "re-invite" 

371 NODE = GROUPS.node + "/" + CHAT_COMMAND 

372 ACCESS = CommandAccess.USER_LOGGED 

373 CATEGORY = GROUPS 

374 

375 async def run(self, session, _ifrom, *_): 

376 assert session is not None 

377 await session.ready 

378 for muc in session.bookmarks: 

379 if muc.type == MucType.GROUP: 

380 session.send_gateway_invite( 

381 muc, reason="You asked to be re-invited in all groups." 

382 )