Coverage for slidge / command / user.py: 47%
197 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1# Commands available to users
2from copy import deepcopy
3from typing import TYPE_CHECKING, Any, cast
5from slixmpp import JID
6from slixmpp.exceptions import XMPPError
8from slidge.db.meta import JSONSerializable
10from ..util.types import (
11 AnyMUC,
12 AnySession,
13 LegacyGroupIdType,
14 MucType,
15 UserPreferences,
16)
17from .base import (
18 Command,
19 CommandAccess,
20 Confirmation,
21 ConfirmationSession,
22 Form,
23 FormField,
24 FormSession,
25 FormValues,
26 SearchResult,
27 TableResult,
28)
29from .categories import CONTACTS, GROUPS
31if TYPE_CHECKING:
32 pass
35class Search(Command[AnySession]):
36 NAME = "🔎 Search for contacts"
37 HELP = "Search for contacts via this gateway"
38 CHAT_COMMAND = "find"
39 NODE = CONTACTS.node + "/" + CHAT_COMMAND
40 ACCESS = CommandAccess.USER_LOGGED
41 CATEGORY = CONTACTS
43 async def run(
44 self, session: AnySession | None, _ifrom: JID, *args: str
45 ) -> FormSession[AnySession] | SearchResult | None:
46 assert session is not None
47 await session.ready
48 if args:
49 return await session.on_search(
50 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)}
51 )
52 return FormSession[AnySession](
53 title=self.xmpp.SEARCH_TITLE,
54 instructions=self.xmpp.SEARCH_INSTRUCTIONS,
55 fields=self.xmpp.SEARCH_FIELDS,
56 handler=self.search,
57 )
59 @staticmethod
60 async def search(
61 form_values: FormValues, session: AnySession | None, _ifrom: JID
62 ) -> SearchResult:
63 assert session is not None
64 results = await session.on_search(form_values) # type: ignore
65 if results is None:
66 raise XMPPError("item-not-found", "No contact was found")
68 return results
71class SyncContacts(Command[AnySession]):
72 NAME = "🔄 Sync XMPP roster"
73 HELP = (
74 "Synchronize your XMPP roster with your legacy contacts. "
75 "Slidge will only add/remove/modify contacts in its dedicated roster group"
76 )
77 CHAT_COMMAND = "sync-contacts"
78 NODE = CONTACTS.node + "/" + CHAT_COMMAND
79 ACCESS = CommandAccess.USER_LOGGED
80 CATEGORY = CONTACTS
82 async def run(
83 self,
84 session: AnySession | None,
85 _ifrom: JID,
86 *args: str,
87 ) -> ConfirmationSession[AnySession]:
88 assert session is not None
89 await session.ready
90 return ConfirmationSession[AnySession](
91 prompt="Are you sure you want to sync your roster?",
92 success=None,
93 handler=self.sync,
94 )
96 async def sync(self, session: AnySession | None, _ifrom: JID) -> str:
97 if session is None:
98 raise RuntimeError
99 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare)
101 contacts = session.contacts.known_contacts()
103 added = 0
104 removed = 0
105 updated = 0
106 for item in roster_iq["roster"]:
107 groups = set(item["groups"])
108 if self.xmpp.ROSTER_GROUP in groups:
109 contact = contacts.pop(item["jid"], None)
110 if contact is None:
111 if len(groups) == 1:
112 await self.xmpp["xep_0356"].set_roster(
113 session.user_jid, {item["jid"]: {"subscription": "remove"}}
114 )
115 removed += 1
116 else:
117 groups.remove(self.xmpp.ROSTER_GROUP)
118 await self.xmpp["xep_0356"].set_roster(
119 session.user_jid,
120 {
121 item["jid"]: {
122 "subscription": item["subscription"],
123 "name": item["name"],
124 "groups": groups,
125 }
126 },
127 )
128 updated += 1
129 else:
130 if contact.name != item["name"]:
131 await contact.add_to_roster(force=True)
132 updated += 1
134 # we popped before so this only acts on slidge contacts not in the xmpp roster
135 for contact in contacts.values():
136 added += 1
137 await contact.add_to_roster()
139 return f"{added} added, {removed} removed, {updated} updated"
142class ListContacts(Command[AnySession]):
143 NAME = HELP = "👤 List your legacy contacts"
144 CHAT_COMMAND = "contacts"
145 NODE = CONTACTS.node + "/" + CHAT_COMMAND
146 ACCESS = CommandAccess.USER_LOGGED
147 CATEGORY = CONTACTS
149 async def run(
150 self,
151 session: AnySession | None,
152 _ifrom: JID,
153 *_: str,
154 ) -> TableResult:
155 assert session is not None
156 await session.ready
157 contacts = sorted(
158 session.contacts, key=lambda c: c.name.casefold() if c.name else ""
159 )
160 return TableResult(
161 description="Your buddies",
162 fields=[FormField("name"), FormField("jid", type="jid-single")],
163 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts],
164 )
167class ListGroups(Command[AnySession]):
168 NAME = HELP = "👥 List your legacy groups"
169 CHAT_COMMAND = "groups"
170 NODE = GROUPS.node + "/" + CHAT_COMMAND
171 ACCESS = CommandAccess.USER_LOGGED
172 CATEGORY = GROUPS
174 async def run(
175 self, session: AnySession | None, _ifrom: JID, *_: str
176 ) -> TableResult:
177 assert session is not None
178 await session.ready
179 groups: list[AnyMUC] = sorted(
180 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold()
181 )
182 return TableResult(
183 description="Your groups",
184 fields=[FormField("name"), FormField("jid", type="jid-single")],
185 items=[
186 {"name": g.name or str(g.legacy_id), "jid": g.jid.bare} for g in groups
187 ],
188 jids_are_mucs=True,
189 )
192class Login(Command[AnySession]):
193 NAME = "🔐 Re-login to the legacy network"
194 HELP = "Login to the legacy service"
195 CHAT_COMMAND = "re-login"
196 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND
198 ACCESS = CommandAccess.USER_NON_LOGGED
200 async def run(
201 self,
202 session: AnySession | None,
203 _ifrom: JID,
204 *_: str,
205 ) -> str:
206 assert session is not None
207 if session.is_logging_in:
208 raise XMPPError("bad-request", "You are already logging in.")
209 session.is_logging_in = True
210 try:
211 msg = await self.xmpp.login_wrap(session)
212 except Exception as e:
213 session.send_gateway_status(f"Re-login failed: {e}", show="dnd")
214 raise XMPPError(
215 "internal-server-error", etype="wait", text=f"Could not login: {e}"
216 )
217 finally:
218 session.is_logging_in = False
219 session.logged = True
220 session.send_gateway_status(msg or "Re-connected", show="chat")
221 session.send_gateway_message(msg or "Re-connected")
222 return msg
225class CreateGroup(Command[AnySession]):
226 NAME = "🆕 New legacy group"
227 HELP = "Create a group on the legacy service"
228 CHAT_COMMAND = "create-group"
229 NODE = GROUPS.node + "/" + CHAT_COMMAND
230 CATEGORY = GROUPS
232 ACCESS = CommandAccess.USER_LOGGED
234 async def run(
235 self,
236 session: AnySession | None,
237 _ifrom: JID,
238 *_: str,
239 ) -> FormSession[AnySession]:
240 assert session is not None
241 await session.ready
242 contacts = session.contacts.known_contacts(only_friends=True)
243 return FormSession(
244 title="Create a new group",
245 instructions="Pick contacts that should be part of this new group",
246 fields=[
247 FormField(var="group_name", label="Name of the group", required=True),
248 FormField(
249 var="contacts",
250 label="Contacts to add to the new group",
251 type="list-multi",
252 options=[
253 {"value": str(contact.jid), "label": contact.name}
254 for contact in sorted(contacts.values(), key=lambda c: c.name)
255 ],
256 required=False,
257 ),
258 ],
259 handler=self.finish,
260 )
262 @staticmethod
263 async def finish(
264 form_values: FormValues,
265 session: AnySession | None,
266 *_: Any, # noqa:ANN401
267 ) -> TableResult:
268 assert session is not None
269 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore
270 cast(str, form_values["group_name"]),
271 [
272 await session.contacts.by_jid(JID(j))
273 for j in form_values.get("contacts", []) # type:ignore
274 ],
275 )
276 muc = await session.bookmarks.by_legacy_id(legacy_id)
277 return TableResult(
278 description=f"Your new group: xmpp:{muc.jid}?join",
279 fields=[FormField("name"), FormField("jid", type="jid-single")],
280 items=[{"name": muc.name, "jid": muc.jid}],
281 jids_are_mucs=True,
282 )
285class Preferences(Command[AnySession]):
286 NAME = "⚙️ Preferences"
287 HELP = "Customize the gateway behaviour to your liking"
288 CHAT_COMMAND = "preferences"
289 NODE = "https://slidge.im/command/core/preferences"
290 ACCESS = CommandAccess.USER
292 async def run(
293 self,
294 session: AnySession | None,
295 _ifrom: JID,
296 *_: str,
297 ) -> FormSession[AnySession]:
298 fields = deepcopy(self.xmpp.PREFERENCES)
299 assert session is not None
300 current = session.user.preferences
301 for field in fields:
302 field.value = current.get(field.var, field.value) # type:ignore
303 return Form(
304 title="Preferences",
305 instructions=self.HELP,
306 fields=fields,
307 handler=self.finish, # type:ignore
308 handler_kwargs={"previous": current},
309 )
311 async def finish(
312 self,
313 form_values: UserPreferences,
314 session: AnySession | None,
315 *_: Any, # noqa:ANN401
316 previous: JSONSerializable,
317 ) -> str:
318 assert session is not None
319 if previous == form_values:
320 return "No preference was changed"
322 user = session.user
323 user.preferences.update(form_values) # type:ignore
324 self.xmpp.store.users.update(user)
326 try:
327 await session.on_preferences(previous, form_values) # type:ignore[arg-type]
328 except NotImplementedError:
329 pass
331 if not previous["sync_avatar"] and form_values["sync_avatar"]:
332 await self.xmpp.fetch_user_avatar(session)
333 else:
334 user.avatar_hash = None
336 return "Your preferences have been updated."
339class Unregister(Command[AnySession]):
340 NAME = "❌ Unregister from the gateway"
341 HELP = "Unregister from the gateway"
342 CHAT_COMMAND = "unregister"
343 NODE = "https://slidge.im/command/core/unregister"
344 ACCESS = CommandAccess.USER
346 async def run(
347 self, session: AnySession | None, _ifrom: JID, *_: str
348 ) -> ConfirmationSession[AnySession]:
349 return ConfirmationSession[AnySession](
350 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?",
351 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.",
352 handler=self.unregister,
353 )
355 async def unregister(self, session: AnySession | None, _ifrom: JID) -> str:
356 assert session is not None
357 await self.xmpp.unregister_user(session.user)
358 return "You are not registered anymore. Bye!"
361class LeaveGroup(Command[AnySession]):
362 NAME = HELP = "❌ Leave a legacy group"
363 CHAT_COMMAND = "leave-group"
364 NODE = GROUPS.node + "/" + CHAT_COMMAND
365 ACCESS = CommandAccess.USER_LOGGED
366 CATEGORY = GROUPS
368 async def run(
369 self,
370 session: AnySession | None,
371 _ifrom: JID,
372 *_: str,
373 ) -> FormSession[AnySession]:
374 assert session is not None
375 await session.ready
376 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
377 return FormSession(
378 title="Leave a group",
379 instructions="Select the group you want to leave",
380 fields=[
381 FormField(
382 "group",
383 "Group name",
384 type="list-single",
385 options=[
386 {"label": g.name or str(g.legacy_id), "value": str(i)}
387 for i, g in enumerate(groups)
388 ],
389 )
390 ],
391 handler=self.confirm,
392 handler_args=(groups,),
393 )
395 async def confirm(
396 self,
397 form_values: FormValues,
398 _session: AnySession | None,
399 _ifrom: JID,
400 groups: list[AnyMUC],
401 ) -> Confirmation:
402 group = groups[int(form_values["group"])] # type:ignore
403 return Confirmation(
404 prompt=f"Are you sure you want to leave the group '{group.name}'?",
405 handler=self.finish,
406 handler_args=(group,),
407 )
409 @staticmethod
410 async def finish(session: AnySession, _ifrom: JID, group: AnyMUC) -> None:
411 await session.on_leave_group(group.legacy_id)
412 await session.bookmarks.remove(group, reason="You left this group via slidge.")
415class InviteInGroups(Command[AnySession]):
416 NAME = "💌 Re-invite me in my groups"
417 HELP = "Ask the gateway to send invitations for all your private groups"
418 CHAT_COMMAND = "re-invite"
419 NODE = GROUPS.node + "/" + CHAT_COMMAND
420 ACCESS = CommandAccess.USER_LOGGED
421 CATEGORY = GROUPS
423 async def run(self, session: "AnySession | None", _ifrom: JID, *_: str) -> None:
424 assert session is not None
425 await session.ready
426 for muc in session.bookmarks:
427 if muc.type == MucType.GROUP:
428 session.send_gateway_invite(
429 muc, reason="You asked to be re-invited in all groups."
430 )