Coverage for slidge/command/user.py: 47%

197 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1# Commands available to users 

2from copy import deepcopy 

3from typing import TYPE_CHECKING, Any, Optional, Union, cast 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7 

8from ..group.room import LegacyMUC 

9from ..util.types import AnyBaseSession, LegacyGroupIdType, MucType, UserPreferences 

10from .base import ( 

11 Command, 

12 CommandAccess, 

13 Confirmation, 

14 Form, 

15 FormField, 

16 FormValues, 

17 SearchResult, 

18 TableResult, 

19) 

20from .categories import CONTACTS, GROUPS 

21 

22if TYPE_CHECKING: 

23 pass 

24 

25 

26class Search(Command): 

27 NAME = "🔎 Search for contacts" 

28 HELP = "Search for contacts via this gateway" 

29 CHAT_COMMAND = "find" 

30 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

31 ACCESS = CommandAccess.USER_LOGGED 

32 CATEGORY = CONTACTS 

33 

34 async def run( 

35 self, session: Optional[AnyBaseSession], _ifrom: JID, *args: str 

36 ) -> Union[Form, SearchResult, None]: 

37 assert session is not None 

38 await session.ready 

39 if args: 

40 return await session.on_search( 

41 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)} 

42 ) 

43 return Form( 

44 title=self.xmpp.SEARCH_TITLE, 

45 instructions=self.xmpp.SEARCH_INSTRUCTIONS, 

46 fields=self.xmpp.SEARCH_FIELDS, 

47 handler=self.search, 

48 ) 

49 

50 @staticmethod 

51 async def search( 

52 form_values: FormValues, session: Optional[AnyBaseSession], _ifrom: JID 

53 ) -> SearchResult: 

54 assert session is not None 

55 results = await session.on_search(form_values) # type: ignore 

56 if results is None: 

57 raise XMPPError("item-not-found", "No contact was found") 

58 

59 return results 

60 

61 

62class SyncContacts(Command): 

63 NAME = "🔄 Sync XMPP roster" 

64 HELP = ( 

65 "Synchronize your XMPP roster with your legacy contacts. " 

66 "Slidge will only add/remove/modify contacts in its dedicated roster group" 

67 ) 

68 CHAT_COMMAND = "sync-contacts" 

69 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

70 ACCESS = CommandAccess.USER_LOGGED 

71 CATEGORY = CONTACTS 

72 

73 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_) -> Confirmation: 

74 assert session is not None 

75 await session.ready 

76 return Confirmation( 

77 prompt="Are you sure you want to sync your roster?", 

78 success=None, 

79 handler=self.sync, 

80 ) 

81 

82 async def sync(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str: 

83 if session is None: 

84 raise RuntimeError 

85 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare) 

86 

87 contacts = session.contacts.known_contacts() 

88 

89 added = 0 

90 removed = 0 

91 updated = 0 

92 for item in roster_iq["roster"]: 

93 groups = set(item["groups"]) 

94 if self.xmpp.ROSTER_GROUP in groups: 

95 contact = contacts.pop(item["jid"], None) 

96 if contact is None: 

97 if len(groups) == 1: 

98 await self.xmpp["xep_0356"].set_roster( 

99 session.user_jid, {item["jid"]: {"subscription": "remove"}} 

100 ) 

101 removed += 1 

102 else: 

103 groups.remove(self.xmpp.ROSTER_GROUP) 

104 await self.xmpp["xep_0356"].set_roster( 

105 session.user_jid, 

106 { 

107 item["jid"]: { 

108 "subscription": item["subscription"], 

109 "name": item["name"], 

110 "groups": groups, 

111 } 

112 }, 

113 ) 

114 updated += 1 

115 else: 

116 if contact.name != item["name"]: 

117 await contact.add_to_roster(force=True) 

118 updated += 1 

119 

120 # we popped before so this only acts on slidge contacts not in the xmpp roster 

121 for contact in contacts.values(): 

122 added += 1 

123 await contact.add_to_roster() 

124 

125 return f"{added} added, {removed} removed, {updated} updated" 

126 

127 

128class ListContacts(Command): 

129 NAME = HELP = "👤 List your legacy contacts" 

130 CHAT_COMMAND = "contacts" 

131 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

132 ACCESS = CommandAccess.USER_LOGGED 

133 CATEGORY = CONTACTS 

134 

135 async def run( 

136 self, session: Optional[AnyBaseSession], _ifrom: JID, *_ 

137 ) -> TableResult: 

138 assert session is not None 

139 await session.ready 

140 contacts = sorted( 

141 session.contacts, key=lambda c: c.name.casefold() if c.name else "" 

142 ) 

143 return TableResult( 

144 description="Your buddies", 

145 fields=[FormField("name"), FormField("jid", type="jid-single")], 

146 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts], 

147 ) 

148 

149 

150class ListGroups(Command): 

151 NAME = HELP = "👥 List your legacy groups" 

152 CHAT_COMMAND = "groups" 

153 NODE = GROUPS.node + "/" + CHAT_COMMAND 

154 ACCESS = CommandAccess.USER_LOGGED 

155 CATEGORY = GROUPS 

156 

157 async def run(self, session, _ifrom, *_): 

158 assert session is not None 

159 await session.ready 

160 groups = sorted( 

161 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold() 

162 ) 

163 return TableResult( 

164 description="Your groups", 

165 fields=[FormField("name"), FormField("jid", type="jid-single")], 

166 items=[{"name": g.name, "jid": g.jid.bare} for g in groups], 

167 jids_are_mucs=True, 

168 ) 

169 

170 

171class Login(Command): 

172 NAME = "🔐 Re-login to the legacy network" 

173 HELP = "Login to the legacy service" 

174 CHAT_COMMAND = "re-login" 

175 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND 

176 

177 ACCESS = CommandAccess.USER_NON_LOGGED 

178 

179 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_): 

180 assert session is not None 

181 if session.is_logging_in: 

182 raise XMPPError("bad-request", "You are already logging in.") 

183 session.is_logging_in = True 

184 try: 

185 msg = await session.login() 

186 except Exception as e: 

187 session.send_gateway_status(f"Re-login failed: {e}", show="dnd") 

188 raise XMPPError( 

189 "internal-server-error", etype="wait", text=f"Could not login: {e}" 

190 ) 

191 finally: 

192 session.is_logging_in = False 

193 session.logged = True 

194 session.send_gateway_status(msg or "Re-connected", show="chat") 

195 session.send_gateway_message(msg or "Re-connected") 

196 return msg 

197 

198 

199class CreateGroup(Command): 

200 NAME = "🆕 New legacy group" 

201 HELP = "Create a group on the legacy service" 

202 CHAT_COMMAND = "create-group" 

203 NODE = GROUPS.node + "/" + CHAT_COMMAND 

204 CATEGORY = GROUPS 

205 

206 ACCESS = CommandAccess.USER_LOGGED 

207 

208 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_): 

209 assert session is not None 

210 await session.ready 

211 contacts = session.contacts.known_contacts(only_friends=True) 

212 return Form( 

213 title="Create a new group", 

214 instructions="Pick contacts that should be part of this new group", 

215 fields=[ 

216 FormField(var="group_name", label="Name of the group", required=True), 

217 FormField( 

218 var="contacts", 

219 label="Contacts to add to the new group", 

220 type="list-multi", 

221 options=[ 

222 {"value": str(contact.jid), "label": contact.name} 

223 for contact in sorted(contacts.values(), key=lambda c: c.name) 

224 ], 

225 required=False, 

226 ), 

227 ], 

228 handler=self.finish, 

229 ) 

230 

231 @staticmethod 

232 async def finish(form_values: FormValues, session: Optional[AnyBaseSession], *_): 

233 assert session is not None 

234 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore 

235 cast(str, form_values["group_name"]), 

236 [ 

237 await session.contacts.by_jid(JID(j)) 

238 for j in form_values.get("contacts", []) # type:ignore 

239 ], 

240 ) 

241 muc = await session.bookmarks.by_legacy_id(legacy_id) 

242 return TableResult( 

243 description=f"Your new group: xmpp:{muc.jid}?join", 

244 fields=[FormField("name"), FormField("jid", type="jid-single")], 

245 items=[{"name": muc.name, "jid": muc.jid}], 

246 jids_are_mucs=True, 

247 ) 

248 

249 

250class Preferences(Command): 

251 NAME = "⚙️ Preferences" 

252 HELP = "Customize the gateway behaviour to your liking" 

253 CHAT_COMMAND = "preferences" 

254 NODE = "https://slidge.im/command/core/preferences" 

255 ACCESS = CommandAccess.USER 

256 

257 async def run( 

258 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any 

259 ) -> Form: 

260 fields = deepcopy(self.xmpp.PREFERENCES) 

261 assert session is not None 

262 current = session.user.preferences 

263 for field in fields: 

264 field.value = current.get(field.var, field.value) # type:ignore 

265 return Form( 

266 title="Preferences", 

267 instructions=self.HELP, 

268 fields=fields, 

269 handler=self.finish, # type:ignore 

270 handler_kwargs={"previous": current}, 

271 ) 

272 

273 async def finish( 

274 self, 

275 form_values: UserPreferences, 

276 session: Optional[AnyBaseSession], 

277 *_, 

278 previous, 

279 ) -> str: 

280 assert session is not None 

281 if previous == form_values: 

282 return "No preference was changed" 

283 

284 user = session.user 

285 user.preferences.update(form_values) # type:ignore 

286 self.xmpp.store.users.update(user) 

287 

288 try: 

289 await session.on_preferences(previous, form_values) # type:ignore[arg-type] 

290 except NotImplementedError: 

291 pass 

292 

293 if not previous["sync_avatar"] and form_values["sync_avatar"]: 

294 await self.xmpp.fetch_user_avatar(session) 

295 else: 

296 user.avatar_hash = None 

297 

298 return "Your preferences have been updated." 

299 

300 

301class Unregister(Command): 

302 NAME = "❌ Unregister from the gateway" 

303 HELP = "Unregister from the gateway" 

304 CHAT_COMMAND = "unregister" 

305 NODE = "https://slidge.im/command/core/unregister" 

306 ACCESS = CommandAccess.USER 

307 

308 async def run( 

309 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any 

310 ) -> Confirmation: 

311 return Confirmation( 

312 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?", 

313 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.", 

314 handler=self.unregister, 

315 ) 

316 

317 async def unregister(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str: 

318 assert session is not None 

319 await self.xmpp.unregister_user(session.user) 

320 return "You are not registered anymore. Bye!" 

321 

322 

323class LeaveGroup(Command): 

324 NAME = HELP = "❌ Leave a legacy group" 

325 CHAT_COMMAND = "leave-group" 

326 NODE = GROUPS.node + "/" + CHAT_COMMAND 

327 ACCESS = CommandAccess.USER_LOGGED 

328 CATEGORY = GROUPS 

329 

330 async def run(self, session, _ifrom, *_): 

331 assert session is not None 

332 await session.ready 

333 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

334 return Form( 

335 title="Leave a group", 

336 instructions="Select the group you want to leave", 

337 fields=[ 

338 FormField( 

339 "group", 

340 "Group name", 

341 type="list-single", 

342 options=[ 

343 {"label": g.name, "value": str(i)} for i, g in enumerate(groups) 

344 ], 

345 ) 

346 ], 

347 handler=self.confirm, # type:ignore 

348 handler_args=(groups,), 

349 ) 

350 

351 async def confirm( 

352 self, 

353 form_values: FormValues, 

354 _session: AnyBaseSession, 

355 _ifrom, 

356 groups: list[LegacyMUC], 

357 ): 

358 group = groups[int(form_values["group"])] # type:ignore 

359 return Confirmation( 

360 prompt=f"Are you sure you want to leave the group '{group.name}'?", 

361 handler=self.finish, # type:ignore 

362 handler_args=(group,), 

363 ) 

364 

365 @staticmethod 

366 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC) -> None: 

367 await session.on_leave_group(group.legacy_id) 

368 await session.bookmarks.remove(group, reason="You left this group via slidge.") 

369 

370 

371class InviteInGroups(Command): 

372 NAME = "💌 Re-invite me in my groups" 

373 HELP = "Ask the gateway to send invitations for all your private groups" 

374 CHAT_COMMAND = "re-invite" 

375 NODE = GROUPS.node + "/" + CHAT_COMMAND 

376 ACCESS = CommandAccess.USER_LOGGED 

377 CATEGORY = GROUPS 

378 

379 async def run(self, session, _ifrom, *_): 

380 assert session is not None 

381 await session.ready 

382 for muc in session.bookmarks: 

383 if muc.type == MucType.GROUP: 

384 session.send_gateway_invite( 

385 muc, reason="You asked to be re-invited in all groups." 

386 )