Coverage for slidge / command / user.py: 50%
243 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1# Commands available to users
2import contextlib
3from copy import deepcopy
4from typing import Any, cast
6from slixmpp import JID
7from slixmpp.exceptions import XMPPError
9from slidge.db.meta import JSONSerializable
10from slidge.db.models import Space
12from ..util.types import (
13 AnyMUC,
14 AnySession,
15 MucType,
16 UserPreferences,
17)
18from .base import (
19 Command,
20 CommandAccess,
21 Confirmation,
22 ConfirmationSession,
23 Form,
24 FormField,
25 FormSession,
26 FormValues,
27 SearchResult,
28 TableResult,
29)
30from .categories import CONTACTS, GROUPS, SPACES
33class Search(Command[AnySession]):
34 NAME = "🔎 Search for contacts"
35 HELP = "Search for contacts via this gateway"
36 CHAT_COMMAND = "find"
37 NODE = CONTACTS.node + "/" + CHAT_COMMAND
38 ACCESS = CommandAccess.USER_LOGGED
39 CATEGORY = CONTACTS
41 async def run(
42 self, session: "AnySession | None", _ifrom: JID, *args: str
43 ) -> FormSession[AnySession] | SearchResult | None:
44 assert session is not None
45 await session.ready
46 if args:
47 return await session.on_search(
48 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)}
49 )
50 return FormSession[AnySession](
51 title=self.xmpp.SEARCH_TITLE,
52 instructions=self.xmpp.SEARCH_INSTRUCTIONS,
53 fields=self.xmpp.SEARCH_FIELDS,
54 handler=self.search,
55 )
57 @staticmethod
58 async def search(
59 form_values: FormValues, session: "AnySession | None", _ifrom: JID
60 ) -> SearchResult:
61 assert session is not None
62 results = await session.on_search(form_values) # type: ignore
63 if results is None:
64 raise XMPPError("item-not-found", "No contact was found")
66 return results
69class SyncContacts(Command[AnySession]):
70 NAME = "🔄 Sync XMPP roster"
71 HELP = (
72 "Synchronize your XMPP roster with your legacy contacts. "
73 "Slidge will only add/remove/modify contacts in its dedicated roster group"
74 )
75 CHAT_COMMAND = "sync-contacts"
76 NODE = CONTACTS.node + "/" + CHAT_COMMAND
77 ACCESS = CommandAccess.USER_LOGGED
78 CATEGORY = CONTACTS
80 async def run(
81 self,
82 session: "AnySession | None",
83 _ifrom: JID,
84 *args: str,
85 ) -> ConfirmationSession[AnySession]:
86 assert session is not None
87 await session.ready
88 return ConfirmationSession[AnySession](
89 prompt="Are you sure you want to sync your roster?",
90 success=None,
91 handler=self.sync,
92 )
94 async def sync(self, session: "AnySession | None", _ifrom: JID) -> str:
95 if session is None:
96 raise RuntimeError
97 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare)
99 contacts = session.contacts.known_contacts()
101 added = 0
102 removed = 0
103 updated = 0
104 for item in roster_iq["roster"]:
105 groups = set(item["groups"])
106 if self.xmpp.ROSTER_GROUP in groups:
107 contact = contacts.pop(item["jid"], None)
108 if contact is None:
109 if len(groups) == 1:
110 await self.xmpp["xep_0356"].set_roster(
111 session.user_jid, {item["jid"]: {"subscription": "remove"}}
112 )
113 removed += 1
114 else:
115 groups.remove(self.xmpp.ROSTER_GROUP)
116 await self.xmpp["xep_0356"].set_roster(
117 session.user_jid,
118 {
119 item["jid"]: {
120 "subscription": item["subscription"],
121 "name": item["name"],
122 "groups": groups,
123 }
124 },
125 )
126 updated += 1
127 else:
128 if contact.name != item["name"]:
129 await contact.add_to_roster(force=True)
130 updated += 1
132 # we popped before so this only acts on slidge contacts not in the xmpp roster
133 for contact in contacts.values():
134 added += 1
135 await contact.add_to_roster()
137 return f"{added} added, {removed} removed, {updated} updated"
140class ListContacts(Command[AnySession]):
141 NAME = HELP = "👤 List your legacy contacts"
142 CHAT_COMMAND = "contacts"
143 NODE = CONTACTS.node + "/" + CHAT_COMMAND
144 ACCESS = CommandAccess.USER_LOGGED
145 CATEGORY = CONTACTS
147 async def run(
148 self,
149 session: "AnySession | None",
150 _ifrom: JID,
151 *_: str,
152 ) -> TableResult:
153 assert session is not None
154 await session.ready
155 contacts = sorted(
156 session.contacts, key=lambda c: c.name.casefold() if c.name else ""
157 )
158 return TableResult(
159 description="Your buddies",
160 fields=[FormField("name"), FormField("jid", type="jid-single")],
161 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts],
162 )
165class ListGroups(Command[AnySession]):
166 NAME = HELP = "👥 List your legacy groups"
167 CHAT_COMMAND = "groups"
168 NODE = GROUPS.node + "/" + CHAT_COMMAND
169 ACCESS = CommandAccess.USER_LOGGED
170 CATEGORY = GROUPS
172 async def run(
173 self, session: "AnySession | None", _ifrom: JID, *_: str
174 ) -> TableResult:
175 assert session is not None
176 await session.ready
177 groups: list[AnyMUC] = sorted(
178 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold()
179 )
180 return TableResult(
181 description="Your groups",
182 fields=[FormField("name"), FormField("jid", type="jid-single")],
183 items=[
184 {"name": g.name or str(g.legacy_id), "jid": g.jid.bare} for g in groups
185 ],
186 jids_are_mucs=True,
187 )
190class ListSpaces(Command[AnySession]):
191 NAME = "🌐 List my spaces"
192 HELP = "List the spaces you are part of. Spaces are collections of rooms."
193 CHAT_COMMAND = "spaces"
194 NODE = GROUPS.node + "/" + CHAT_COMMAND
195 ACCESS = CommandAccess.USER_LOGGED
196 CATEGORY = SPACES
198 related_to_spaces = True
200 async def run(
201 self, session: "AnySession | None", _ifrom: JID, *_: str
202 ) -> TableResult:
203 assert session is not None
204 spaces = await _get_updated_spaces(session)
205 return TableResult(
206 description="Your spaces. If your client does not support spaces, use the 'space-rooms' command.",
207 fields=[FormField("name"), FormField("iri")],
208 items=[
209 {
210 "name": s.name or str(s.legacy_id),
211 "iri": f"xmpp:{self.xmpp.boundjid.bare}?;node={await session.bookmarks.space_legacy_id_to_node(s.legacy_id)}",
212 }
213 for s in spaces
214 ],
215 jids_are_mucs=True,
216 )
219class ListRoomsInSpace(Command[AnySession]):
220 NAME = "🌐 List the rooms in a space"
221 HELP = "List the rooms of a space you are part of. Spaces are collections of rooms."
222 CHAT_COMMAND = "space-rooms"
223 NODE = GROUPS.node + "/" + CHAT_COMMAND
224 ACCESS = CommandAccess.USER_LOGGED
225 CATEGORY = SPACES
227 async def run(
228 self, session: "AnySession | None", _ifrom: JID, *_: str
229 ) -> "FormSession[AnySession]":
230 assert session is not None
231 spaces = await _get_updated_spaces(session)
232 return FormSession(
233 title="Your spaces",
234 instructions="Select a space to view its rooms",
235 fields=[
236 FormField(
237 var="space_legacy_id",
238 label="Space",
239 required=True,
240 type="list-single",
241 options=[
242 {
243 "label": s.name or str(s.legacy_id),
244 "value": str(s.legacy_id),
245 }
246 for s in spaces
247 ],
248 )
249 ],
250 handler=self.list_rooms,
251 handler_args=({str(s.legacy_id): s.name for s in spaces},),
252 )
254 async def list_rooms(
255 self,
256 form_values: FormValues,
257 session: "AnySession | None",
258 _ifrom: JID,
259 space_names: dict[str, str],
260 ) -> TableResult:
261 assert session is not None
262 space_legacy_id = form_values.get("space_legacy_id")
263 if space_legacy_id is None:
264 raise XMPPError("bad-request", "You need to specify a space")
265 assert isinstance(space_legacy_id, str)
266 await session.ready
267 with self.xmpp.store.session() as orm:
268 rooms = sorted(
269 list(
270 self.xmpp.store.spaces.get_rooms(
271 orm,
272 session.user_pk,
273 await session.bookmarks.space_legacy_id_to_node(
274 space_legacy_id
275 ),
276 )
277 ),
278 key=lambda r: (r.name or str(r.jid.node)).casefold(),
279 )
280 name = space_names.get(space_legacy_id)
281 return TableResult(
282 fields=[
283 FormField("name", "Name"),
284 FormField("jid", "JID", type="jid-single"),
285 ],
286 description=f"Rooms of '{name or space_legacy_id}'",
287 items=[{"name": r.name or str(r.legacy_id), "jid": r.jid} for r in rooms],
288 jids_are_mucs=True,
289 )
292class Login(Command[AnySession]):
293 NAME = "🔐 Re-login to the legacy network"
294 HELP = "Login to the legacy service"
295 CHAT_COMMAND = "re-login"
296 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND
298 ACCESS = CommandAccess.USER_NON_LOGGED
300 async def run(
301 self,
302 session: "AnySession | None",
303 _ifrom: JID,
304 *_: str,
305 ) -> str:
306 assert session is not None
307 if session.is_logging_in:
308 raise XMPPError("bad-request", "You are already logging in.")
309 session.is_logging_in = True
310 try:
311 msg = await self.xmpp.login_wrap(session)
312 except Exception as e:
313 session.send_gateway_status(f"Re-login failed: {e}", show="dnd")
314 raise XMPPError(
315 "internal-server-error", etype="wait", text=f"Could not login: {e}"
316 )
317 finally:
318 session.is_logging_in = False
319 session.logged = True
320 session.send_gateway_status(msg or "Re-connected", show="chat")
321 session.send_gateway_message(msg or "Re-connected")
322 return msg
325class CreateGroup(Command[AnySession]):
326 NAME = "🆕 New legacy group"
327 HELP = "Create a group on the legacy service"
328 CHAT_COMMAND = "create-group"
329 NODE = GROUPS.node + "/" + CHAT_COMMAND
330 CATEGORY = GROUPS
332 ACCESS = CommandAccess.USER_LOGGED
334 async def run(
335 self,
336 session: "AnySession | None",
337 _ifrom: JID,
338 *_: str,
339 ) -> FormSession[AnySession]:
340 assert session is not None
341 await session.ready
342 contacts = session.contacts.known_contacts(only_friends=True)
343 return FormSession(
344 title="Create a new group",
345 instructions="Pick contacts that should be part of this new group",
346 fields=[
347 FormField(var="group_name", label="Name of the group", required=True),
348 FormField(
349 var="contacts",
350 label="Contacts to add to the new group",
351 type="list-multi",
352 options=[
353 {"value": str(contact.jid), "label": contact.name}
354 for contact in sorted(contacts.values(), key=lambda c: c.name)
355 ],
356 required=False,
357 ),
358 ],
359 handler=self.finish,
360 )
362 @staticmethod
363 async def finish(
364 form_values: FormValues,
365 session: "AnySession | None",
366 *_: Any, # noqa:ANN401
367 ) -> TableResult:
368 assert session is not None
369 legacy_id: str = await session.on_create_group(
370 cast(str, form_values["group_name"]),
371 [
372 await session.contacts.by_jid(JID(j))
373 for j in form_values.get("contacts", []) # type:ignore
374 ],
375 )
376 muc = await session.bookmarks.by_legacy_id(legacy_id)
377 return TableResult(
378 description=f"Your new group: xmpp:{muc.jid}?join",
379 fields=[FormField("name"), FormField("jid", type="jid-single")],
380 items=[{"name": muc.name, "jid": muc.jid}],
381 jids_are_mucs=True,
382 )
385class Preferences(Command[AnySession]):
386 NAME = "⚙️ Preferences"
387 HELP = "Customize the gateway behaviour to your liking"
388 CHAT_COMMAND = "preferences"
389 NODE = "https://slidge.im/command/core/preferences"
390 ACCESS = CommandAccess.USER
392 async def run(
393 self,
394 session: "AnySession | None",
395 _ifrom: JID,
396 *_: str,
397 ) -> FormSession[AnySession]:
398 fields = deepcopy(self.xmpp.PREFERENCES)
399 assert session is not None
400 current = session.user.preferences
401 for field in fields:
402 field.value = current.get(field.var, field.value) # type:ignore
403 return Form(
404 title="Preferences",
405 instructions=self.HELP,
406 fields=fields,
407 handler=self.finish, # type:ignore
408 handler_kwargs={"previous": current},
409 )
411 async def finish(
412 self,
413 form_values: UserPreferences,
414 session: "AnySession | None",
415 *_: Any, # noqa:ANN401
416 previous: JSONSerializable,
417 ) -> str:
418 assert session is not None
419 if previous == form_values:
420 return "No preference was changed"
422 previous = previous.copy()
423 user = session.user
424 user.preferences.update(form_values) # type:ignore
425 self.xmpp.store.users.update(user)
427 with contextlib.suppress(NotImplementedError):
428 await session.on_preferences(previous, form_values) # type:ignore[arg-type]
430 if not previous["sync_avatar"] and form_values["sync_avatar"]:
431 await self.xmpp.fetch_user_avatar(session)
432 else:
433 user.avatar_hash = None
435 return "Your preferences have been updated."
438class Unregister(Command[AnySession]):
439 NAME = "❌ Unregister from the gateway"
440 HELP = "Unregister from the gateway"
441 CHAT_COMMAND = "unregister"
442 NODE = "https://slidge.im/command/core/unregister"
443 ACCESS = CommandAccess.USER
445 async def run(
446 self, session: "AnySession | None", _ifrom: JID, *_: str
447 ) -> ConfirmationSession[AnySession]:
448 return ConfirmationSession[AnySession](
449 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?",
450 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.",
451 handler=self.unregister,
452 )
454 async def unregister(self, session: "AnySession | None", _ifrom: JID) -> str:
455 assert session is not None
456 await self.xmpp.unregister_user(session.user)
457 return "You are not registered anymore. Bye!"
460class LeaveGroup(Command[AnySession]):
461 NAME = HELP = "❌ Leave a legacy group"
462 CHAT_COMMAND = "leave-group"
463 NODE = GROUPS.node + "/" + CHAT_COMMAND
464 ACCESS = CommandAccess.USER_LOGGED
465 CATEGORY = GROUPS
467 async def run(
468 self,
469 session: "AnySession | None",
470 ifrom: JID,
471 *_: str,
472 ) -> FormSession[AnySession]:
473 assert session is not None
474 await session.ready
475 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
476 return FormSession(
477 title="Leave a group",
478 instructions="Select the group you want to leave",
479 fields=[
480 FormField(
481 "group",
482 "Group name",
483 type="list-single",
484 options=[
485 {"label": g.name or str(g.legacy_id), "value": str(i)}
486 for i, g in enumerate(groups)
487 ],
488 )
489 ],
490 handler=self.confirm,
491 handler_args=(groups,),
492 )
494 async def confirm(
495 self,
496 form_values: FormValues,
497 _session: "AnySession | None",
498 _ifrom: JID,
499 groups: list[AnyMUC],
500 ) -> Confirmation:
501 group = groups[int(form_values["group"])] # type:ignore
502 return Confirmation(
503 prompt=f"Are you sure you want to leave the group '{group.name}'?",
504 handler=self.finish,
505 handler_args=(group,),
506 )
508 @staticmethod
509 async def finish(session: AnySession, _ifrom: JID, group: AnyMUC) -> None:
510 await group.on_leave()
511 await session.bookmarks.remove(group, reason="You left this group via slidge.")
514class InviteInGroups(Command[AnySession]):
515 NAME = "💌 Re-invite me in my groups"
516 HELP = "Ask the gateway to send invitations for all your private groups"
517 CHAT_COMMAND = "re-invite"
518 NODE = GROUPS.node + "/" + CHAT_COMMAND
519 ACCESS = CommandAccess.USER_LOGGED
520 CATEGORY = GROUPS
522 async def run(self, session: "AnySession | None", _ifrom: JID, *_: str) -> None:
523 assert session is not None
524 await session.ready
525 for muc in session.bookmarks:
526 if muc.type == MucType.GROUP:
527 session.send_gateway_invite(
528 muc, reason="You asked to be re-invited in all groups."
529 )
532async def _get_updated_spaces(session: AnySession) -> list[Space]:
533 await session.ready
534 with session.xmpp.store.session() as orm:
535 spaces = list(session.xmpp.store.spaces.get_all(orm, session.user_pk))
536 updated_spaces: list[Space] = []
537 for space in spaces:
538 try:
539 updated_spaces.append(await session.bookmarks.update_space_if_needed(space))
540 except Exception:
541 session.log.exception(
542 "Something went wrong trying to update space '%r'", space
543 )
545 return sorted(updated_spaces, key=lambda s: s.name or str(s.legacy_id))