Coverage for slidge/command/user.py: 49%

173 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1# Commands available to users 

2from copy import deepcopy 

3from typing import TYPE_CHECKING, Any, Optional, Union, cast 

4 

5from slixmpp import JID 

6from slixmpp.exceptions import XMPPError 

7 

8from ..group.room import LegacyMUC 

9from ..util.types import AnyBaseSession, LegacyGroupIdType, UserPreferences 

10from .base import ( 

11 Command, 

12 CommandAccess, 

13 Confirmation, 

14 Form, 

15 FormField, 

16 FormValues, 

17 SearchResult, 

18 TableResult, 

19) 

20from .categories import CONTACTS, GROUPS 

21 

22if TYPE_CHECKING: 

23 pass 

24 

25 

26class Search(Command): 

27 NAME = "🔎 Search for contacts" 

28 HELP = "Search for contacts via this gateway" 

29 CHAT_COMMAND = "find" 

30 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

31 ACCESS = CommandAccess.USER_LOGGED 

32 CATEGORY = CONTACTS 

33 

34 async def run( 

35 self, session: Optional[AnyBaseSession], _ifrom: JID, *args: str 

36 ) -> Union[Form, SearchResult, None]: 

37 if args: 

38 assert session is not None 

39 return await session.on_search( 

40 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)} 

41 ) 

42 return Form( 

43 title=self.xmpp.SEARCH_TITLE, 

44 instructions=self.xmpp.SEARCH_INSTRUCTIONS, 

45 fields=self.xmpp.SEARCH_FIELDS, 

46 handler=self.search, 

47 ) 

48 

49 @staticmethod 

50 async def search( 

51 form_values: FormValues, session: Optional[AnyBaseSession], _ifrom: JID 

52 ) -> SearchResult: 

53 assert session is not None 

54 results = await session.on_search(form_values) # type: ignore 

55 if results is None: 

56 raise XMPPError("item-not-found", "No contact was found") 

57 

58 return results 

59 

60 

61class SyncContacts(Command): 

62 NAME = "🔄 Sync XMPP roster" 

63 HELP = ( 

64 "Synchronize your XMPP roster with your legacy contacts. " 

65 "Slidge will only add/remove/modify contacts in its dedicated roster group" 

66 ) 

67 CHAT_COMMAND = "sync-contacts" 

68 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

69 ACCESS = CommandAccess.USER_LOGGED 

70 CATEGORY = CONTACTS 

71 

72 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_) -> Confirmation: 

73 return Confirmation( 

74 prompt="Are you sure you want to sync your roster?", 

75 success=None, 

76 handler=self.sync, 

77 ) 

78 

79 async def sync(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str: 

80 if session is None: 

81 raise RuntimeError 

82 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare) 

83 

84 contacts = session.contacts.known_contacts() 

85 

86 added = 0 

87 removed = 0 

88 updated = 0 

89 for item in roster_iq["roster"]: 

90 groups = set(item["groups"]) 

91 if self.xmpp.ROSTER_GROUP in groups: 

92 contact = contacts.pop(item["jid"], None) 

93 if contact is None: 

94 if len(groups) == 1: 

95 await self.xmpp["xep_0356"].set_roster( 

96 session.user_jid, {item["jid"]: {"subscription": "remove"}} 

97 ) 

98 removed += 1 

99 else: 

100 groups.remove(self.xmpp.ROSTER_GROUP) 

101 await self.xmpp["xep_0356"].set_roster( 

102 session.user_jid, 

103 { 

104 item["jid"]: { 

105 "subscription": item["subscription"], 

106 "name": item["name"], 

107 "groups": groups, 

108 } 

109 }, 

110 ) 

111 updated += 1 

112 else: 

113 if contact.name != item["name"]: 

114 await contact.add_to_roster(force=True) 

115 updated += 1 

116 

117 # we popped before so this only acts on slidge contacts not in the xmpp roster 

118 for contact in contacts.values(): 

119 added += 1 

120 await contact.add_to_roster() 

121 

122 return f"{added} added, {removed} removed, {updated} updated" 

123 

124 

125class ListContacts(Command): 

126 NAME = HELP = "👤 List your legacy contacts" 

127 CHAT_COMMAND = "contacts" 

128 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

129 ACCESS = CommandAccess.USER_LOGGED 

130 CATEGORY = CONTACTS 

131 

132 async def run( 

133 self, session: Optional[AnyBaseSession], _ifrom: JID, *_ 

134 ) -> TableResult: 

135 assert session is not None 

136 contacts = sorted( 

137 session.contacts, key=lambda c: c.name.casefold() if c.name else "" 

138 ) 

139 return TableResult( 

140 description="Your buddies", 

141 fields=[FormField("name"), FormField("jid", type="jid-single")], 

142 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts], 

143 ) 

144 

145 

146class ListGroups(Command): 

147 NAME = HELP = "👥 List your legacy groups" 

148 CHAT_COMMAND = "groups" 

149 NODE = GROUPS.node + "/" + CHAT_COMMAND 

150 ACCESS = CommandAccess.USER_LOGGED 

151 CATEGORY = GROUPS 

152 

153 async def run(self, session, _ifrom, *_): 

154 assert session is not None 

155 await session.bookmarks.fill() 

156 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

157 return TableResult( 

158 description="Your groups", 

159 fields=[FormField("name"), FormField("jid", type="jid-single")], 

160 items=[{"name": g.name, "jid": g.jid.bare} for g in groups], 

161 jids_are_mucs=True, 

162 ) 

163 

164 

165class Login(Command): 

166 NAME = "🔐 Re-login to the legacy network" 

167 HELP = "Login to the legacy service" 

168 CHAT_COMMAND = "re-login" 

169 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND 

170 

171 ACCESS = CommandAccess.USER_NON_LOGGED 

172 

173 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_): 

174 assert session is not None 

175 if session.is_logging_in: 

176 raise XMPPError("bad-request", "You are already logging in.") 

177 session.is_logging_in = True 

178 try: 

179 msg = await session.login() 

180 except Exception as e: 

181 session.send_gateway_status(f"Re-login failed: {e}", show="dnd") 

182 raise XMPPError( 

183 "internal-server-error", etype="wait", text=f"Could not login: {e}" 

184 ) 

185 finally: 

186 session.is_logging_in = False 

187 session.logged = True 

188 session.send_gateway_status(msg or "Re-connected", show="chat") 

189 session.send_gateway_message(msg or "Re-connected") 

190 return msg 

191 

192 

193class CreateGroup(Command): 

194 NAME = "🆕 New legacy group" 

195 HELP = "Create a group on the legacy service" 

196 CHAT_COMMAND = "create-group" 

197 NODE = GROUPS.node + "/" + CHAT_COMMAND 

198 CATEGORY = GROUPS 

199 

200 ACCESS = CommandAccess.USER_LOGGED 

201 

202 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_): 

203 assert session is not None 

204 contacts = session.contacts.known_contacts(only_friends=True) 

205 return Form( 

206 title="Create a new group", 

207 instructions="Pick contacts that should be part of this new group", 

208 fields=[ 

209 FormField(var="group_name", label="Name of the group", required=True), 

210 FormField( 

211 var="contacts", 

212 label="Contacts to add to the new group", 

213 type="list-multi", 

214 options=[ 

215 {"value": str(contact.jid), "label": contact.name} 

216 for contact in sorted(contacts.values(), key=lambda c: c.name) 

217 ], 

218 required=False, 

219 ), 

220 ], 

221 handler=self.finish, 

222 ) 

223 

224 @staticmethod 

225 async def finish(form_values: FormValues, session: Optional[AnyBaseSession], *_): 

226 assert session is not None 

227 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore 

228 cast(str, form_values["group_name"]), 

229 [ 

230 await session.contacts.by_jid(JID(j)) 

231 for j in form_values.get("contacts", []) # type:ignore 

232 ], 

233 ) 

234 muc = await session.bookmarks.by_legacy_id(legacy_id) 

235 return TableResult( 

236 description=f"Your new group: xmpp:{muc.jid}?join", 

237 fields=[FormField("name"), FormField("jid", type="jid-single")], 

238 items=[{"name": muc.name, "jid": muc.jid}], 

239 jids_are_mucs=True, 

240 ) 

241 

242 

243class Preferences(Command): 

244 NAME = "⚙️ Preferences" 

245 HELP = "Customize the gateway behaviour to your liking" 

246 CHAT_COMMAND = "preferences" 

247 NODE = "https://slidge.im/command/core/preferences" 

248 ACCESS = CommandAccess.USER 

249 

250 async def run( 

251 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any 

252 ) -> Form: 

253 fields = deepcopy(self.xmpp.PREFERENCES) 

254 assert session is not None 

255 current = session.user.preferences 

256 for field in fields: 

257 field.value = current.get(field.var, field.value) # type:ignore 

258 return Form( 

259 title="Preferences", 

260 instructions=self.HELP, 

261 fields=fields, 

262 handler=self.finish, # type:ignore 

263 ) 

264 

265 async def finish( 

266 self, form_values: UserPreferences, session: Optional[AnyBaseSession], *_ 

267 ) -> str: 

268 assert session is not None 

269 user = session.user 

270 user.preferences.update(form_values) # type:ignore 

271 if form_values["sync_avatar"]: 

272 await self.xmpp.fetch_user_avatar(session) 

273 else: 

274 user.avatar_hash = None 

275 

276 self.xmpp.store.users.update(user) 

277 return "Your preferences have been updated." 

278 

279 

280class Unregister(Command): 

281 NAME = "❌ Unregister from the gateway" 

282 HELP = "Unregister from the gateway" 

283 CHAT_COMMAND = "unregister" 

284 NODE = "https://slidge.im/command/core/unregister" 

285 ACCESS = CommandAccess.USER 

286 

287 async def run( 

288 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any 

289 ) -> Confirmation: 

290 return Confirmation( 

291 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?", 

292 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.", 

293 handler=self.unregister, 

294 ) 

295 

296 async def unregister(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str: 

297 assert session is not None 

298 await self.xmpp.unregister_user(session.user) 

299 return "You are not registered anymore. Bye!" 

300 

301 

302class LeaveGroup(Command): 

303 NAME = HELP = "❌ Leave a legacy group" 

304 CHAT_COMMAND = "leave-group" 

305 NODE = GROUPS.node + "/" + CHAT_COMMAND 

306 ACCESS = CommandAccess.USER_LOGGED 

307 CATEGORY = GROUPS 

308 

309 async def run(self, session, _ifrom, *_): 

310 assert session is not None 

311 await session.bookmarks.fill() 

312 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

313 return Form( 

314 title="Leave a group", 

315 instructions="Select the group you want to leave", 

316 fields=[ 

317 FormField( 

318 "group", 

319 "Group name", 

320 type="list-single", 

321 options=[ 

322 {"label": g.name, "value": str(i)} for i, g in enumerate(groups) 

323 ], 

324 ) 

325 ], 

326 handler=self.confirm, # type:ignore 

327 handler_args=(groups,), 

328 ) 

329 

330 async def confirm( 

331 self, 

332 form_values: FormValues, 

333 _session: AnyBaseSession, 

334 _ifrom, 

335 groups: list[LegacyMUC], 

336 ): 

337 group = groups[int(form_values["group"])] # type:ignore 

338 return Confirmation( 

339 prompt=f"Are you sure you want to leave the group '{group.name}'?", 

340 handler=self.finish, # type:ignore 

341 handler_args=(group,), 

342 ) 

343 

344 @staticmethod 

345 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC) -> None: 

346 await session.on_leave_group(group.legacy_id) 

347 await session.bookmarks.remove(group, reason="You left this group via slidge.")