Coverage for slidge/command/user.py: 47%
197 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1# Commands available to users
2from copy import deepcopy
3from typing import TYPE_CHECKING, Any, Optional, Union, cast
5from slixmpp import JID
6from slixmpp.exceptions import XMPPError
8from ..group.room import LegacyMUC
9from ..util.types import AnyBaseSession, LegacyGroupIdType, MucType, UserPreferences
10from .base import (
11 Command,
12 CommandAccess,
13 Confirmation,
14 Form,
15 FormField,
16 FormValues,
17 SearchResult,
18 TableResult,
19)
20from .categories import CONTACTS, GROUPS
22if TYPE_CHECKING:
23 pass
26class Search(Command):
27 NAME = "🔎 Search for contacts"
28 HELP = "Search for contacts via this gateway"
29 CHAT_COMMAND = "find"
30 NODE = CONTACTS.node + "/" + CHAT_COMMAND
31 ACCESS = CommandAccess.USER_LOGGED
32 CATEGORY = CONTACTS
34 async def run(
35 self, session: Optional[AnyBaseSession], _ifrom: JID, *args: str
36 ) -> Union[Form, SearchResult, None]:
37 assert session is not None
38 await session.ready
39 if args:
40 return await session.on_search(
41 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)}
42 )
43 return Form(
44 title=self.xmpp.SEARCH_TITLE,
45 instructions=self.xmpp.SEARCH_INSTRUCTIONS,
46 fields=self.xmpp.SEARCH_FIELDS,
47 handler=self.search,
48 )
50 @staticmethod
51 async def search(
52 form_values: FormValues, session: Optional[AnyBaseSession], _ifrom: JID
53 ) -> SearchResult:
54 assert session is not None
55 results = await session.on_search(form_values) # type: ignore
56 if results is None:
57 raise XMPPError("item-not-found", "No contact was found")
59 return results
62class SyncContacts(Command):
63 NAME = "🔄 Sync XMPP roster"
64 HELP = (
65 "Synchronize your XMPP roster with your legacy contacts. "
66 "Slidge will only add/remove/modify contacts in its dedicated roster group"
67 )
68 CHAT_COMMAND = "sync-contacts"
69 NODE = CONTACTS.node + "/" + CHAT_COMMAND
70 ACCESS = CommandAccess.USER_LOGGED
71 CATEGORY = CONTACTS
73 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_) -> Confirmation:
74 assert session is not None
75 await session.ready
76 return Confirmation(
77 prompt="Are you sure you want to sync your roster?",
78 success=None,
79 handler=self.sync,
80 )
82 async def sync(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str:
83 if session is None:
84 raise RuntimeError
85 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare)
87 contacts = session.contacts.known_contacts()
89 added = 0
90 removed = 0
91 updated = 0
92 for item in roster_iq["roster"]:
93 groups = set(item["groups"])
94 if self.xmpp.ROSTER_GROUP in groups:
95 contact = contacts.pop(item["jid"], None)
96 if contact is None:
97 if len(groups) == 1:
98 await self.xmpp["xep_0356"].set_roster(
99 session.user_jid, {item["jid"]: {"subscription": "remove"}}
100 )
101 removed += 1
102 else:
103 groups.remove(self.xmpp.ROSTER_GROUP)
104 await self.xmpp["xep_0356"].set_roster(
105 session.user_jid,
106 {
107 item["jid"]: {
108 "subscription": item["subscription"],
109 "name": item["name"],
110 "groups": groups,
111 }
112 },
113 )
114 updated += 1
115 else:
116 if contact.name != item["name"]:
117 await contact.add_to_roster(force=True)
118 updated += 1
120 # we popped before so this only acts on slidge contacts not in the xmpp roster
121 for contact in contacts.values():
122 added += 1
123 await contact.add_to_roster()
125 return f"{added} added, {removed} removed, {updated} updated"
128class ListContacts(Command):
129 NAME = HELP = "👤 List your legacy contacts"
130 CHAT_COMMAND = "contacts"
131 NODE = CONTACTS.node + "/" + CHAT_COMMAND
132 ACCESS = CommandAccess.USER_LOGGED
133 CATEGORY = CONTACTS
135 async def run(
136 self, session: Optional[AnyBaseSession], _ifrom: JID, *_
137 ) -> TableResult:
138 assert session is not None
139 await session.ready
140 contacts = sorted(
141 session.contacts, key=lambda c: c.name.casefold() if c.name else ""
142 )
143 return TableResult(
144 description="Your buddies",
145 fields=[FormField("name"), FormField("jid", type="jid-single")],
146 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts],
147 )
150class ListGroups(Command):
151 NAME = HELP = "👥 List your legacy groups"
152 CHAT_COMMAND = "groups"
153 NODE = GROUPS.node + "/" + CHAT_COMMAND
154 ACCESS = CommandAccess.USER_LOGGED
155 CATEGORY = GROUPS
157 async def run(self, session, _ifrom, *_):
158 assert session is not None
159 await session.ready
160 groups = sorted(
161 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold()
162 )
163 return TableResult(
164 description="Your groups",
165 fields=[FormField("name"), FormField("jid", type="jid-single")],
166 items=[{"name": g.name, "jid": g.jid.bare} for g in groups],
167 jids_are_mucs=True,
168 )
171class Login(Command):
172 NAME = "🔐 Re-login to the legacy network"
173 HELP = "Login to the legacy service"
174 CHAT_COMMAND = "re-login"
175 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND
177 ACCESS = CommandAccess.USER_NON_LOGGED
179 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_):
180 assert session is not None
181 if session.is_logging_in:
182 raise XMPPError("bad-request", "You are already logging in.")
183 session.is_logging_in = True
184 try:
185 msg = await session.login()
186 except Exception as e:
187 session.send_gateway_status(f"Re-login failed: {e}", show="dnd")
188 raise XMPPError(
189 "internal-server-error", etype="wait", text=f"Could not login: {e}"
190 )
191 finally:
192 session.is_logging_in = False
193 session.logged = True
194 session.send_gateway_status(msg or "Re-connected", show="chat")
195 session.send_gateway_message(msg or "Re-connected")
196 return msg
199class CreateGroup(Command):
200 NAME = "🆕 New legacy group"
201 HELP = "Create a group on the legacy service"
202 CHAT_COMMAND = "create-group"
203 NODE = GROUPS.node + "/" + CHAT_COMMAND
204 CATEGORY = GROUPS
206 ACCESS = CommandAccess.USER_LOGGED
208 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_):
209 assert session is not None
210 await session.ready
211 contacts = session.contacts.known_contacts(only_friends=True)
212 return Form(
213 title="Create a new group",
214 instructions="Pick contacts that should be part of this new group",
215 fields=[
216 FormField(var="group_name", label="Name of the group", required=True),
217 FormField(
218 var="contacts",
219 label="Contacts to add to the new group",
220 type="list-multi",
221 options=[
222 {"value": str(contact.jid), "label": contact.name}
223 for contact in sorted(contacts.values(), key=lambda c: c.name)
224 ],
225 required=False,
226 ),
227 ],
228 handler=self.finish,
229 )
231 @staticmethod
232 async def finish(form_values: FormValues, session: Optional[AnyBaseSession], *_):
233 assert session is not None
234 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore
235 cast(str, form_values["group_name"]),
236 [
237 await session.contacts.by_jid(JID(j))
238 for j in form_values.get("contacts", []) # type:ignore
239 ],
240 )
241 muc = await session.bookmarks.by_legacy_id(legacy_id)
242 return TableResult(
243 description=f"Your new group: xmpp:{muc.jid}?join",
244 fields=[FormField("name"), FormField("jid", type="jid-single")],
245 items=[{"name": muc.name, "jid": muc.jid}],
246 jids_are_mucs=True,
247 )
250class Preferences(Command):
251 NAME = "⚙️ Preferences"
252 HELP = "Customize the gateway behaviour to your liking"
253 CHAT_COMMAND = "preferences"
254 NODE = "https://slidge.im/command/core/preferences"
255 ACCESS = CommandAccess.USER
257 async def run(
258 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any
259 ) -> Form:
260 fields = deepcopy(self.xmpp.PREFERENCES)
261 assert session is not None
262 current = session.user.preferences
263 for field in fields:
264 field.value = current.get(field.var, field.value) # type:ignore
265 return Form(
266 title="Preferences",
267 instructions=self.HELP,
268 fields=fields,
269 handler=self.finish, # type:ignore
270 handler_kwargs={"previous": current},
271 )
273 async def finish(
274 self,
275 form_values: UserPreferences,
276 session: Optional[AnyBaseSession],
277 *_,
278 previous,
279 ) -> str:
280 assert session is not None
281 if previous == form_values:
282 return "No preference was changed"
284 user = session.user
285 user.preferences.update(form_values) # type:ignore
286 self.xmpp.store.users.update(user)
288 try:
289 await session.on_preferences(previous, form_values) # type:ignore[arg-type]
290 except NotImplementedError:
291 pass
293 if not previous["sync_avatar"] and form_values["sync_avatar"]:
294 await self.xmpp.fetch_user_avatar(session)
295 else:
296 user.avatar_hash = None
298 return "Your preferences have been updated."
301class Unregister(Command):
302 NAME = "❌ Unregister from the gateway"
303 HELP = "Unregister from the gateway"
304 CHAT_COMMAND = "unregister"
305 NODE = "https://slidge.im/command/core/unregister"
306 ACCESS = CommandAccess.USER
308 async def run(
309 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any
310 ) -> Confirmation:
311 return Confirmation(
312 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?",
313 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.",
314 handler=self.unregister,
315 )
317 async def unregister(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str:
318 assert session is not None
319 await self.xmpp.unregister_user(session.user)
320 return "You are not registered anymore. Bye!"
323class LeaveGroup(Command):
324 NAME = HELP = "❌ Leave a legacy group"
325 CHAT_COMMAND = "leave-group"
326 NODE = GROUPS.node + "/" + CHAT_COMMAND
327 ACCESS = CommandAccess.USER_LOGGED
328 CATEGORY = GROUPS
330 async def run(self, session, _ifrom, *_):
331 assert session is not None
332 await session.ready
333 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
334 return Form(
335 title="Leave a group",
336 instructions="Select the group you want to leave",
337 fields=[
338 FormField(
339 "group",
340 "Group name",
341 type="list-single",
342 options=[
343 {"label": g.name, "value": str(i)} for i, g in enumerate(groups)
344 ],
345 )
346 ],
347 handler=self.confirm, # type:ignore
348 handler_args=(groups,),
349 )
351 async def confirm(
352 self,
353 form_values: FormValues,
354 _session: AnyBaseSession,
355 _ifrom,
356 groups: list[LegacyMUC],
357 ):
358 group = groups[int(form_values["group"])] # type:ignore
359 return Confirmation(
360 prompt=f"Are you sure you want to leave the group '{group.name}'?",
361 handler=self.finish, # type:ignore
362 handler_args=(group,),
363 )
365 @staticmethod
366 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC) -> None:
367 await session.on_leave_group(group.legacy_id)
368 await session.bookmarks.remove(group, reason="You left this group via slidge.")
371class InviteInGroups(Command):
372 NAME = "💌 Re-invite me in my groups"
373 HELP = "Ask the gateway to send invitations for all your private groups"
374 CHAT_COMMAND = "re-invite"
375 NODE = GROUPS.node + "/" + CHAT_COMMAND
376 ACCESS = CommandAccess.USER_LOGGED
377 CATEGORY = GROUPS
379 async def run(self, session, _ifrom, *_):
380 assert session is not None
381 await session.ready
382 for muc in session.bookmarks:
383 if muc.type == MucType.GROUP:
384 session.send_gateway_invite(
385 muc, reason="You asked to be re-invited in all groups."
386 )