Coverage for slidge/command/user.py: 49%

171 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1# Commands available to users 

2from copy import deepcopy 

3from typing import TYPE_CHECKING, Any, Optional, Union, cast 

4 

5from slixmpp import JID # type:ignore[attr-defined] 

6from slixmpp.exceptions import XMPPError 

7 

8from ..group.room import LegacyMUC 

9from ..util.types import AnyBaseSession, LegacyGroupIdType, UserPreferences 

10from .base import ( 

11 Command, 

12 CommandAccess, 

13 Confirmation, 

14 Form, 

15 FormField, 

16 FormValues, 

17 SearchResult, 

18 TableResult, 

19) 

20from .categories import CONTACTS, GROUPS 

21 

22if TYPE_CHECKING: 

23 pass 

24 

25 

26class Search(Command): 

27 NAME = "🔎 Search for contacts" 

28 HELP = "Search for contacts via this gateway" 

29 CHAT_COMMAND = "find" 

30 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

31 ACCESS = CommandAccess.USER_LOGGED 

32 CATEGORY = CONTACTS 

33 

34 async def run( 

35 self, session: Optional[AnyBaseSession], _ifrom: JID, *args: str 

36 ) -> Union[Form, SearchResult, None]: 

37 if args: 

38 assert session is not None 

39 return await session.on_search( 

40 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)} 

41 ) 

42 return Form( 

43 title=self.xmpp.SEARCH_TITLE, 

44 instructions=self.xmpp.SEARCH_INSTRUCTIONS, 

45 fields=self.xmpp.SEARCH_FIELDS, 

46 handler=self.search, 

47 ) 

48 

49 @staticmethod 

50 async def search( 

51 form_values: FormValues, session: Optional[AnyBaseSession], _ifrom: JID 

52 ) -> SearchResult: 

53 assert session is not None 

54 results = await session.on_search(form_values) # type: ignore 

55 if results is None: 

56 raise XMPPError("item-not-found", "No contact was found") 

57 

58 return results 

59 

60 

61class SyncContacts(Command): 

62 NAME = "🔄 Sync XMPP roster" 

63 HELP = ( 

64 "Synchronize your XMPP roster with your legacy contacts. " 

65 "Slidge will only add/remove/modify contacts in its dedicated roster group" 

66 ) 

67 CHAT_COMMAND = "sync-contacts" 

68 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

69 ACCESS = CommandAccess.USER_LOGGED 

70 CATEGORY = CONTACTS 

71 

72 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_) -> Confirmation: 

73 return Confirmation( 

74 prompt="Are you sure you want to sync your roster?", 

75 success=None, 

76 handler=self.sync, 

77 ) 

78 

79 async def sync(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str: 

80 if session is None: 

81 raise RuntimeError 

82 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare) 

83 

84 contacts = session.contacts.known_contacts() 

85 

86 added = 0 

87 removed = 0 

88 updated = 0 

89 for item in roster_iq["roster"]: 

90 groups = set(item["groups"]) 

91 if self.xmpp.ROSTER_GROUP in groups: 

92 contact = contacts.pop(item["jid"], None) 

93 if contact is None: 

94 if len(groups) == 1: 

95 await self.xmpp["xep_0356"].set_roster( 

96 session.user_jid, {item["jid"]: {"subscription": "remove"}} 

97 ) 

98 removed += 1 

99 else: 

100 groups.remove(self.xmpp.ROSTER_GROUP) 

101 await self.xmpp["xep_0356"].set_roster( 

102 session.user_jid, 

103 { 

104 item["jid"]: { 

105 "subscription": item["subscription"], 

106 "name": item["name"], 

107 "groups": groups, 

108 } 

109 }, 

110 ) 

111 updated += 1 

112 else: 

113 if contact.name != item["name"]: 

114 await contact.add_to_roster(force=True) 

115 updated += 1 

116 

117 # we popped before so this only acts on slidge contacts not in the xmpp roster 

118 for contact in contacts.values(): 

119 added += 1 

120 await contact.add_to_roster() 

121 

122 return f"{added} added, {removed} removed, {updated} updated" 

123 

124 

125class ListContacts(Command): 

126 NAME = HELP = "👤 List your legacy contacts" 

127 CHAT_COMMAND = "contacts" 

128 NODE = CONTACTS.node + "/" + CHAT_COMMAND 

129 ACCESS = CommandAccess.USER_LOGGED 

130 CATEGORY = CONTACTS 

131 

132 async def run( 

133 self, session: Optional[AnyBaseSession], _ifrom: JID, *_ 

134 ) -> TableResult: 

135 assert session is not None 

136 contacts = sorted( 

137 session.contacts, key=lambda c: c.name.casefold() if c.name else "" 

138 ) 

139 return TableResult( 

140 description="Your buddies", 

141 fields=[FormField("name"), FormField("jid", type="jid-single")], 

142 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts], 

143 ) 

144 

145 

146class ListGroups(Command): 

147 NAME = HELP = "👥 List your legacy groups" 

148 CHAT_COMMAND = "groups" 

149 NODE = GROUPS.node + "/" + CHAT_COMMAND 

150 ACCESS = CommandAccess.USER_LOGGED 

151 CATEGORY = GROUPS 

152 

153 async def run(self, session, _ifrom, *_): 

154 assert session is not None 

155 await session.bookmarks.fill() 

156 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

157 return TableResult( 

158 description="Your groups", 

159 fields=[FormField("name"), FormField("jid", type="jid-single")], 

160 items=[{"name": g.name, "jid": g.jid.bare} for g in groups], 

161 jids_are_mucs=True, 

162 ) 

163 

164 

165class Login(Command): 

166 NAME = "🔐 Re-login to the legacy network" 

167 HELP = "Login to the legacy service" 

168 CHAT_COMMAND = "re-login" 

169 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND 

170 

171 ACCESS = CommandAccess.USER_NON_LOGGED 

172 

173 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_): 

174 assert session is not None 

175 try: 

176 msg = await session.login() 

177 except Exception as e: 

178 session.send_gateway_status(f"Re-login failed: {e}", show="dnd") 

179 raise XMPPError( 

180 "internal-server-error", etype="wait", text=f"Could not login: {e}" 

181 ) 

182 session.logged = True 

183 session.send_gateway_status(msg or "Re-connected", show="chat") 

184 session.send_gateway_message(msg or "Re-connected") 

185 return msg 

186 

187 

188class CreateGroup(Command): 

189 NAME = "🆕 New legacy group" 

190 HELP = "Create a group on the legacy service" 

191 CHAT_COMMAND = "create-group" 

192 NODE = GROUPS.node + "/" + CHAT_COMMAND 

193 CATEGORY = GROUPS 

194 

195 ACCESS = CommandAccess.USER_LOGGED 

196 

197 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_): 

198 assert session is not None 

199 contacts = session.contacts.known_contacts(only_friends=True) 

200 return Form( 

201 title="Create a new group", 

202 instructions="Pick contacts that should be part of this new group", 

203 fields=[ 

204 FormField(var="group_name", label="Name of the group", required=True), 

205 FormField( 

206 var="contacts", 

207 label="Contacts to add to the new group", 

208 type="list-multi", 

209 options=[ 

210 {"value": str(contact.jid), "label": contact.name} 

211 for contact in sorted(contacts.values(), key=lambda c: c.name) 

212 ], 

213 required=False, 

214 ), 

215 ], 

216 handler=self.finish, 

217 ) 

218 

219 @staticmethod 

220 async def finish(form_values: FormValues, session: Optional[AnyBaseSession], *_): 

221 assert session is not None 

222 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore 

223 cast(str, form_values["group_name"]), 

224 [ 

225 await session.contacts.by_jid(JID(j)) 

226 for j in form_values.get("contacts", []) # type:ignore 

227 ], 

228 ) 

229 muc = await session.bookmarks.by_legacy_id(legacy_id) 

230 return TableResult( 

231 description=f"Your new group: xmpp:{muc.jid}?join", 

232 fields=[FormField("name"), FormField("jid", type="jid-single")], 

233 items=[{"name": muc.name, "jid": muc.jid}], 

234 jids_are_mucs=True, 

235 ) 

236 

237 

238class Preferences(Command): 

239 NAME = "⚙️ Preferences" 

240 HELP = "Customize the gateway behaviour to your liking" 

241 CHAT_COMMAND = "preferences" 

242 NODE = "https://slidge.im/command/core/preferences" 

243 ACCESS = CommandAccess.USER 

244 

245 async def run( 

246 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any 

247 ) -> Form: 

248 fields = deepcopy(self.xmpp.PREFERENCES) 

249 assert session is not None 

250 current = session.user.preferences 

251 for field in fields: 

252 field.value = current.get(field.var) # type:ignore 

253 return Form( 

254 title="Preferences", 

255 instructions=self.HELP, 

256 fields=fields, 

257 handler=self.finish, # type:ignore 

258 ) 

259 

260 async def finish( 

261 self, form_values: UserPreferences, session: Optional[AnyBaseSession], *_ 

262 ) -> str: 

263 assert session is not None 

264 user = session.user 

265 user.preferences.update(form_values) # type:ignore 

266 self.xmpp.store.users.update(user) 

267 if form_values["sync_avatar"]: 

268 await self.xmpp.fetch_user_avatar(session) 

269 else: 

270 session.xmpp.store.users.set_avatar_hash(session.user_pk, None) 

271 return "Your preferences have been updated." 

272 

273 

274class Unregister(Command): 

275 NAME = "❌ Unregister from the gateway" 

276 HELP = "Unregister from the gateway" 

277 CHAT_COMMAND = "unregister" 

278 NODE = "https://slidge.im/command/core/unregister" 

279 ACCESS = CommandAccess.USER 

280 

281 async def run( 

282 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any 

283 ) -> Confirmation: 

284 return Confirmation( 

285 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?", 

286 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.", 

287 handler=self.unregister, 

288 ) 

289 

290 async def unregister(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str: 

291 assert session is not None 

292 user = self.xmpp.store.users.get(session.user_jid) 

293 assert user is not None 

294 await self.xmpp.unregister_user(user) 

295 return "You are not registered anymore. Bye!" 

296 

297 

298class LeaveGroup(Command): 

299 NAME = HELP = "❌ Leave a legacy group" 

300 CHAT_COMMAND = "leave-group" 

301 NODE = GROUPS.node + "/" + CHAT_COMMAND 

302 ACCESS = CommandAccess.USER_LOGGED 

303 CATEGORY = GROUPS 

304 

305 async def run(self, session, _ifrom, *_): 

306 assert session is not None 

307 await session.bookmarks.fill() 

308 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold()) 

309 return Form( 

310 title="Leave a group", 

311 instructions="Select the group you want to leave", 

312 fields=[ 

313 FormField( 

314 "group", 

315 "Group name", 

316 type="list-single", 

317 options=[ 

318 {"label": g.name, "value": str(i)} for i, g in enumerate(groups) 

319 ], 

320 ) 

321 ], 

322 handler=self.confirm, # type:ignore 

323 handler_args=(groups,), 

324 ) 

325 

326 async def confirm( 

327 self, 

328 form_values: FormValues, 

329 _session: AnyBaseSession, 

330 _ifrom, 

331 groups: list[LegacyMUC], 

332 ): 

333 group = groups[int(form_values["group"])] # type:ignore 

334 return Confirmation( 

335 prompt=f"Are you sure you want to leave the group '{group.name}'?", 

336 handler=self.finish, # type:ignore 

337 handler_args=(group,), 

338 ) 

339 

340 @staticmethod 

341 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC): 

342 await session.on_leave_group(group.legacy_id) 

343 await session.bookmarks.remove(group, reason="You left this group via slidge.")