Coverage for slidge / command / user.py: 47%
197 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-02-15 09:02 +0000
1# Commands available to users
2from copy import deepcopy
3from typing import TYPE_CHECKING, Any, cast
5from slixmpp import JID
6from slixmpp.exceptions import XMPPError
8from ..group.room import LegacyMUC
9from ..util.types import AnyBaseSession, LegacyGroupIdType, MucType, UserPreferences
10from .base import (
11 Command,
12 CommandAccess,
13 Confirmation,
14 Form,
15 FormField,
16 FormValues,
17 SearchResult,
18 TableResult,
19)
20from .categories import CONTACTS, GROUPS
22if TYPE_CHECKING:
23 pass
26class Search(Command):
27 NAME = "🔎 Search for contacts"
28 HELP = "Search for contacts via this gateway"
29 CHAT_COMMAND = "find"
30 NODE = CONTACTS.node + "/" + CHAT_COMMAND
31 ACCESS = CommandAccess.USER_LOGGED
32 CATEGORY = CONTACTS
34 async def run(
35 self, session: AnyBaseSession | None, _ifrom: JID, *args: str
36 ) -> Form | SearchResult | None:
37 assert session is not None
38 await session.ready
39 if args:
40 return await session.on_search(
41 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)}
42 )
43 return Form(
44 title=self.xmpp.SEARCH_TITLE,
45 instructions=self.xmpp.SEARCH_INSTRUCTIONS,
46 fields=self.xmpp.SEARCH_FIELDS,
47 handler=self.search,
48 )
50 @staticmethod
51 async def search(
52 form_values: FormValues, session: AnyBaseSession | None, _ifrom: JID
53 ) -> SearchResult:
54 assert session is not None
55 results = await session.on_search(form_values) # type: ignore
56 if results is None:
57 raise XMPPError("item-not-found", "No contact was found")
59 return results
62class SyncContacts(Command):
63 NAME = "🔄 Sync XMPP roster"
64 HELP = (
65 "Synchronize your XMPP roster with your legacy contacts. "
66 "Slidge will only add/remove/modify contacts in its dedicated roster group"
67 )
68 CHAT_COMMAND = "sync-contacts"
69 NODE = CONTACTS.node + "/" + CHAT_COMMAND
70 ACCESS = CommandAccess.USER_LOGGED
71 CATEGORY = CONTACTS
73 async def run(self, session: AnyBaseSession | None, _ifrom, *_) -> Confirmation:
74 assert session is not None
75 await session.ready
76 return Confirmation(
77 prompt="Are you sure you want to sync your roster?",
78 success=None,
79 handler=self.sync,
80 )
82 async def sync(self, session: AnyBaseSession | None, _ifrom: JID) -> str:
83 if session is None:
84 raise RuntimeError
85 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare)
87 contacts = session.contacts.known_contacts()
89 added = 0
90 removed = 0
91 updated = 0
92 for item in roster_iq["roster"]:
93 groups = set(item["groups"])
94 if self.xmpp.ROSTER_GROUP in groups:
95 contact = contacts.pop(item["jid"], None)
96 if contact is None:
97 if len(groups) == 1:
98 await self.xmpp["xep_0356"].set_roster(
99 session.user_jid, {item["jid"]: {"subscription": "remove"}}
100 )
101 removed += 1
102 else:
103 groups.remove(self.xmpp.ROSTER_GROUP)
104 await self.xmpp["xep_0356"].set_roster(
105 session.user_jid,
106 {
107 item["jid"]: {
108 "subscription": item["subscription"],
109 "name": item["name"],
110 "groups": groups,
111 }
112 },
113 )
114 updated += 1
115 else:
116 if contact.name != item["name"]:
117 await contact.add_to_roster(force=True)
118 updated += 1
120 # we popped before so this only acts on slidge contacts not in the xmpp roster
121 for contact in contacts.values():
122 added += 1
123 await contact.add_to_roster()
125 return f"{added} added, {removed} removed, {updated} updated"
128class ListContacts(Command):
129 NAME = HELP = "👤 List your legacy contacts"
130 CHAT_COMMAND = "contacts"
131 NODE = CONTACTS.node + "/" + CHAT_COMMAND
132 ACCESS = CommandAccess.USER_LOGGED
133 CATEGORY = CONTACTS
135 async def run(self, session: AnyBaseSession | None, _ifrom: JID, *_) -> TableResult:
136 assert session is not None
137 await session.ready
138 contacts = sorted(
139 session.contacts, key=lambda c: c.name.casefold() if c.name else ""
140 )
141 return TableResult(
142 description="Your buddies",
143 fields=[FormField("name"), FormField("jid", type="jid-single")],
144 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts],
145 )
148class ListGroups(Command):
149 NAME = HELP = "👥 List your legacy groups"
150 CHAT_COMMAND = "groups"
151 NODE = GROUPS.node + "/" + CHAT_COMMAND
152 ACCESS = CommandAccess.USER_LOGGED
153 CATEGORY = GROUPS
155 async def run(self, session, _ifrom, *_):
156 assert session is not None
157 await session.ready
158 groups = sorted(
159 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold()
160 )
161 return TableResult(
162 description="Your groups",
163 fields=[FormField("name"), FormField("jid", type="jid-single")],
164 items=[{"name": g.name, "jid": g.jid.bare} for g in groups],
165 jids_are_mucs=True,
166 )
169class Login(Command):
170 NAME = "🔐 Re-login to the legacy network"
171 HELP = "Login to the legacy service"
172 CHAT_COMMAND = "re-login"
173 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND
175 ACCESS = CommandAccess.USER_NON_LOGGED
177 async def run(self, session: AnyBaseSession | None, _ifrom, *_):
178 assert session is not None
179 if session.is_logging_in:
180 raise XMPPError("bad-request", "You are already logging in.")
181 session.is_logging_in = True
182 try:
183 msg = await self.xmpp.login_wrap(session)
184 except Exception as e:
185 session.send_gateway_status(f"Re-login failed: {e}", show="dnd")
186 raise XMPPError(
187 "internal-server-error", etype="wait", text=f"Could not login: {e}"
188 )
189 finally:
190 session.is_logging_in = False
191 session.logged = True
192 session.send_gateway_status(msg or "Re-connected", show="chat")
193 session.send_gateway_message(msg or "Re-connected")
194 return msg
197class CreateGroup(Command):
198 NAME = "🆕 New legacy group"
199 HELP = "Create a group on the legacy service"
200 CHAT_COMMAND = "create-group"
201 NODE = GROUPS.node + "/" + CHAT_COMMAND
202 CATEGORY = GROUPS
204 ACCESS = CommandAccess.USER_LOGGED
206 async def run(self, session: AnyBaseSession | None, _ifrom, *_):
207 assert session is not None
208 await session.ready
209 contacts = session.contacts.known_contacts(only_friends=True)
210 return Form(
211 title="Create a new group",
212 instructions="Pick contacts that should be part of this new group",
213 fields=[
214 FormField(var="group_name", label="Name of the group", required=True),
215 FormField(
216 var="contacts",
217 label="Contacts to add to the new group",
218 type="list-multi",
219 options=[
220 {"value": str(contact.jid), "label": contact.name}
221 for contact in sorted(contacts.values(), key=lambda c: c.name)
222 ],
223 required=False,
224 ),
225 ],
226 handler=self.finish,
227 )
229 @staticmethod
230 async def finish(form_values: FormValues, session: AnyBaseSession | None, *_):
231 assert session is not None
232 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore
233 cast(str, form_values["group_name"]),
234 [
235 await session.contacts.by_jid(JID(j))
236 for j in form_values.get("contacts", []) # type:ignore
237 ],
238 )
239 muc = await session.bookmarks.by_legacy_id(legacy_id)
240 return TableResult(
241 description=f"Your new group: xmpp:{muc.jid}?join",
242 fields=[FormField("name"), FormField("jid", type="jid-single")],
243 items=[{"name": muc.name, "jid": muc.jid}],
244 jids_are_mucs=True,
245 )
248class Preferences(Command):
249 NAME = "⚙️ Preferences"
250 HELP = "Customize the gateway behaviour to your liking"
251 CHAT_COMMAND = "preferences"
252 NODE = "https://slidge.im/command/core/preferences"
253 ACCESS = CommandAccess.USER
255 async def run(self, session: AnyBaseSession | None, _ifrom: JID, *_: Any) -> Form:
256 fields = deepcopy(self.xmpp.PREFERENCES)
257 assert session is not None
258 current = session.user.preferences
259 for field in fields:
260 field.value = current.get(field.var, field.value) # type:ignore
261 return Form(
262 title="Preferences",
263 instructions=self.HELP,
264 fields=fields,
265 handler=self.finish, # type:ignore
266 handler_kwargs={"previous": current},
267 )
269 async def finish(
270 self,
271 form_values: UserPreferences,
272 session: AnyBaseSession | None,
273 *_,
274 previous,
275 ) -> str:
276 assert session is not None
277 if previous == form_values:
278 return "No preference was changed"
280 user = session.user
281 user.preferences.update(form_values) # type:ignore
282 self.xmpp.store.users.update(user)
284 try:
285 await session.on_preferences(previous, form_values) # type:ignore[arg-type]
286 except NotImplementedError:
287 pass
289 if not previous["sync_avatar"] and form_values["sync_avatar"]:
290 await self.xmpp.fetch_user_avatar(session)
291 else:
292 user.avatar_hash = None
294 return "Your preferences have been updated."
297class Unregister(Command):
298 NAME = "❌ Unregister from the gateway"
299 HELP = "Unregister from the gateway"
300 CHAT_COMMAND = "unregister"
301 NODE = "https://slidge.im/command/core/unregister"
302 ACCESS = CommandAccess.USER
304 async def run(
305 self, session: AnyBaseSession | None, _ifrom: JID, *_: Any
306 ) -> Confirmation:
307 return Confirmation(
308 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?",
309 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.",
310 handler=self.unregister,
311 )
313 async def unregister(self, session: AnyBaseSession | None, _ifrom: JID) -> str:
314 assert session is not None
315 await self.xmpp.unregister_user(session.user)
316 return "You are not registered anymore. Bye!"
319class LeaveGroup(Command):
320 NAME = HELP = "❌ Leave a legacy group"
321 CHAT_COMMAND = "leave-group"
322 NODE = GROUPS.node + "/" + CHAT_COMMAND
323 ACCESS = CommandAccess.USER_LOGGED
324 CATEGORY = GROUPS
326 async def run(self, session, _ifrom, *_):
327 assert session is not None
328 await session.ready
329 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
330 return Form(
331 title="Leave a group",
332 instructions="Select the group you want to leave",
333 fields=[
334 FormField(
335 "group",
336 "Group name",
337 type="list-single",
338 options=[
339 {"label": g.name, "value": str(i)} for i, g in enumerate(groups)
340 ],
341 )
342 ],
343 handler=self.confirm, # type:ignore
344 handler_args=(groups,),
345 )
347 async def confirm(
348 self,
349 form_values: FormValues,
350 _session: AnyBaseSession,
351 _ifrom,
352 groups: list[LegacyMUC],
353 ):
354 group = groups[int(form_values["group"])] # type:ignore
355 return Confirmation(
356 prompt=f"Are you sure you want to leave the group '{group.name}'?",
357 handler=self.finish, # type:ignore
358 handler_args=(group,),
359 )
361 @staticmethod
362 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC) -> None:
363 await session.on_leave_group(group.legacy_id)
364 await session.bookmarks.remove(group, reason="You left this group via slidge.")
367class InviteInGroups(Command):
368 NAME = "💌 Re-invite me in my groups"
369 HELP = "Ask the gateway to send invitations for all your private groups"
370 CHAT_COMMAND = "re-invite"
371 NODE = GROUPS.node + "/" + CHAT_COMMAND
372 ACCESS = CommandAccess.USER_LOGGED
373 CATEGORY = GROUPS
375 async def run(self, session, _ifrom, *_):
376 assert session is not None
377 await session.ready
378 for muc in session.bookmarks:
379 if muc.type == MucType.GROUP:
380 session.send_gateway_invite(
381 muc, reason="You asked to be re-invited in all groups."
382 )