Coverage for slidge / command / user.py: 50%
244 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1# Commands available to users
2import contextlib
3from copy import deepcopy
4from typing import Any, cast
6from slixmpp import JID
7from slixmpp.exceptions import XMPPError
9from slidge.db.meta import JSONSerializable
10from slidge.db.models import Space
12from ..util.types import (
13 AnyMUC,
14 AnySession,
15 LegacyGroupIdType,
16 MucType,
17 UserPreferences,
18)
19from .base import (
20 Command,
21 CommandAccess,
22 Confirmation,
23 ConfirmationSession,
24 Form,
25 FormField,
26 FormSession,
27 FormValues,
28 SearchResult,
29 TableResult,
30)
31from .categories import CONTACTS, GROUPS, SPACES
34class Search(Command[AnySession]):
35 NAME = "🔎 Search for contacts"
36 HELP = "Search for contacts via this gateway"
37 CHAT_COMMAND = "find"
38 NODE = CONTACTS.node + "/" + CHAT_COMMAND
39 ACCESS = CommandAccess.USER_LOGGED
40 CATEGORY = CONTACTS
42 async def run(
43 self, session: AnySession | None, _ifrom: JID, *args: str
44 ) -> FormSession[AnySession] | SearchResult | None:
45 assert session is not None
46 await session.ready
47 if args:
48 return await session.on_search(
49 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)}
50 )
51 return FormSession[AnySession](
52 title=self.xmpp.SEARCH_TITLE,
53 instructions=self.xmpp.SEARCH_INSTRUCTIONS,
54 fields=self.xmpp.SEARCH_FIELDS,
55 handler=self.search,
56 )
58 @staticmethod
59 async def search(
60 form_values: FormValues, session: AnySession | None, _ifrom: JID
61 ) -> SearchResult:
62 assert session is not None
63 results = await session.on_search(form_values) # type: ignore
64 if results is None:
65 raise XMPPError("item-not-found", "No contact was found")
67 return results
70class SyncContacts(Command[AnySession]):
71 NAME = "🔄 Sync XMPP roster"
72 HELP = (
73 "Synchronize your XMPP roster with your legacy contacts. "
74 "Slidge will only add/remove/modify contacts in its dedicated roster group"
75 )
76 CHAT_COMMAND = "sync-contacts"
77 NODE = CONTACTS.node + "/" + CHAT_COMMAND
78 ACCESS = CommandAccess.USER_LOGGED
79 CATEGORY = CONTACTS
81 async def run(
82 self,
83 session: AnySession | None,
84 _ifrom: JID,
85 *args: str,
86 ) -> ConfirmationSession[AnySession]:
87 assert session is not None
88 await session.ready
89 return ConfirmationSession[AnySession](
90 prompt="Are you sure you want to sync your roster?",
91 success=None,
92 handler=self.sync,
93 )
95 async def sync(self, session: AnySession | None, _ifrom: JID) -> str:
96 if session is None:
97 raise RuntimeError
98 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare)
100 contacts = session.contacts.known_contacts()
102 added = 0
103 removed = 0
104 updated = 0
105 for item in roster_iq["roster"]:
106 groups = set(item["groups"])
107 if self.xmpp.ROSTER_GROUP in groups:
108 contact = contacts.pop(item["jid"], None)
109 if contact is None:
110 if len(groups) == 1:
111 await self.xmpp["xep_0356"].set_roster(
112 session.user_jid, {item["jid"]: {"subscription": "remove"}}
113 )
114 removed += 1
115 else:
116 groups.remove(self.xmpp.ROSTER_GROUP)
117 await self.xmpp["xep_0356"].set_roster(
118 session.user_jid,
119 {
120 item["jid"]: {
121 "subscription": item["subscription"],
122 "name": item["name"],
123 "groups": groups,
124 }
125 },
126 )
127 updated += 1
128 else:
129 if contact.name != item["name"]:
130 await contact.add_to_roster(force=True)
131 updated += 1
133 # we popped before so this only acts on slidge contacts not in the xmpp roster
134 for contact in contacts.values():
135 added += 1
136 await contact.add_to_roster()
138 return f"{added} added, {removed} removed, {updated} updated"
141class ListContacts(Command[AnySession]):
142 NAME = HELP = "👤 List your legacy contacts"
143 CHAT_COMMAND = "contacts"
144 NODE = CONTACTS.node + "/" + CHAT_COMMAND
145 ACCESS = CommandAccess.USER_LOGGED
146 CATEGORY = CONTACTS
148 async def run(
149 self,
150 session: AnySession | None,
151 _ifrom: JID,
152 *_: str,
153 ) -> TableResult:
154 assert session is not None
155 await session.ready
156 contacts = sorted(
157 session.contacts, key=lambda c: c.name.casefold() if c.name else ""
158 )
159 return TableResult(
160 description="Your buddies",
161 fields=[FormField("name"), FormField("jid", type="jid-single")],
162 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts],
163 )
166class ListGroups(Command[AnySession]):
167 NAME = HELP = "👥 List your legacy groups"
168 CHAT_COMMAND = "groups"
169 NODE = GROUPS.node + "/" + CHAT_COMMAND
170 ACCESS = CommandAccess.USER_LOGGED
171 CATEGORY = GROUPS
173 async def run(
174 self, session: AnySession | None, _ifrom: JID, *_: str
175 ) -> TableResult:
176 assert session is not None
177 await session.ready
178 groups: list[AnyMUC] = sorted(
179 session.bookmarks, key=lambda g: (g.name or g.jid.node).casefold()
180 )
181 return TableResult(
182 description="Your groups",
183 fields=[FormField("name"), FormField("jid", type="jid-single")],
184 items=[
185 {"name": g.name or str(g.legacy_id), "jid": g.jid.bare} for g in groups
186 ],
187 jids_are_mucs=True,
188 )
191class ListSpaces(Command[AnySession]):
192 NAME = "🌐 List my spaces"
193 HELP = "List the spaces you are part of. Spaces are collections of rooms."
194 CHAT_COMMAND = "spaces"
195 NODE = GROUPS.node + "/" + CHAT_COMMAND
196 ACCESS = CommandAccess.USER_LOGGED
197 CATEGORY = SPACES
199 related_to_spaces = True
201 async def run(
202 self, session: "AnySession | None", _ifrom: JID, *_: str
203 ) -> TableResult:
204 assert session is not None
205 spaces = await _get_updated_spaces(session)
206 return TableResult(
207 description="Your spaces. If your client does not support spaces, use the 'space-rooms' command.",
208 fields=[FormField("name"), FormField("iri")],
209 items=[
210 {
211 "name": s.name or str(s.legacy_id),
212 "iri": f"xmpp:{self.xmpp.boundjid.bare}?;node={await session.bookmarks.space_legacy_id_to_node(s.legacy_id)}",
213 }
214 for s in spaces
215 ],
216 jids_are_mucs=True,
217 )
220class ListRoomsInSpace(Command[AnySession]):
221 NAME = "🌐 List the rooms in a space"
222 HELP = "List the rooms of a space you are part of. Spaces are collections of rooms."
223 CHAT_COMMAND = "space-rooms"
224 NODE = GROUPS.node + "/" + CHAT_COMMAND
225 ACCESS = CommandAccess.USER_LOGGED
226 CATEGORY = SPACES
228 async def run(
229 self, session: "AnySession | None", _ifrom: JID, *_: str
230 ) -> "FormSession[AnySession]":
231 assert session is not None
232 spaces = await _get_updated_spaces(session)
233 return FormSession(
234 title="Your spaces",
235 instructions="Select a space to view its rooms",
236 fields=[
237 FormField(
238 var="space_legacy_id",
239 label="Space",
240 required=True,
241 type="list-single",
242 options=[
243 {
244 "label": s.name or str(s.legacy_id),
245 "value": str(s.legacy_id),
246 }
247 for s in spaces
248 ],
249 )
250 ],
251 handler=self.list_rooms,
252 handler_args=({str(s.legacy_id): s.name for s in spaces},),
253 )
255 async def list_rooms(
256 self,
257 form_values: FormValues,
258 session: AnySession | None,
259 _ifrom: JID,
260 space_names: dict[str, str],
261 ) -> TableResult:
262 assert session is not None
263 space_legacy_id_str = form_values.get("space_legacy_id")
264 if space_legacy_id_str is None:
265 raise XMPPError("bad-request", "You need to specify a space")
266 assert isinstance(space_legacy_id_str, str)
267 await session.ready
268 space_legacy_id = self.xmpp.LEGACY_ROOM_ID_TYPE(space_legacy_id_str)
269 with self.xmpp.store.session() as orm:
270 rooms = sorted(
271 list(
272 self.xmpp.store.spaces.get_rooms(
273 orm,
274 session.user_pk,
275 await session.bookmarks.space_legacy_id_to_node(
276 space_legacy_id
277 ),
278 )
279 ),
280 key=lambda r: (r.name or str(r.jid.node)).casefold(),
281 )
282 name = space_names.get(space_legacy_id_str)
283 return TableResult(
284 fields=[
285 FormField("name", "Name"),
286 FormField("jid", "JID", type="jid-single"),
287 ],
288 description=f"Rooms of '{name or space_legacy_id}'",
289 items=[{"name": r.name or str(r.legacy_id), "jid": r.jid} for r in rooms],
290 jids_are_mucs=True,
291 )
294class Login(Command[AnySession]):
295 NAME = "🔐 Re-login to the legacy network"
296 HELP = "Login to the legacy service"
297 CHAT_COMMAND = "re-login"
298 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND
300 ACCESS = CommandAccess.USER_NON_LOGGED
302 async def run(
303 self,
304 session: AnySession | None,
305 _ifrom: JID,
306 *_: str,
307 ) -> str:
308 assert session is not None
309 if session.is_logging_in:
310 raise XMPPError("bad-request", "You are already logging in.")
311 session.is_logging_in = True
312 try:
313 msg = await self.xmpp.login_wrap(session)
314 except Exception as e:
315 session.send_gateway_status(f"Re-login failed: {e}", show="dnd")
316 raise XMPPError(
317 "internal-server-error", etype="wait", text=f"Could not login: {e}"
318 )
319 finally:
320 session.is_logging_in = False
321 session.logged = True
322 session.send_gateway_status(msg or "Re-connected", show="chat")
323 session.send_gateway_message(msg or "Re-connected")
324 return msg
327class CreateGroup(Command[AnySession]):
328 NAME = "🆕 New legacy group"
329 HELP = "Create a group on the legacy service"
330 CHAT_COMMAND = "create-group"
331 NODE = GROUPS.node + "/" + CHAT_COMMAND
332 CATEGORY = GROUPS
334 ACCESS = CommandAccess.USER_LOGGED
336 async def run(
337 self,
338 session: AnySession | None,
339 _ifrom: JID,
340 *_: str,
341 ) -> FormSession[AnySession]:
342 assert session is not None
343 await session.ready
344 contacts = session.contacts.known_contacts(only_friends=True)
345 return FormSession(
346 title="Create a new group",
347 instructions="Pick contacts that should be part of this new group",
348 fields=[
349 FormField(var="group_name", label="Name of the group", required=True),
350 FormField(
351 var="contacts",
352 label="Contacts to add to the new group",
353 type="list-multi",
354 options=[
355 {"value": str(contact.jid), "label": contact.name}
356 for contact in sorted(contacts.values(), key=lambda c: c.name)
357 ],
358 required=False,
359 ),
360 ],
361 handler=self.finish,
362 )
364 @staticmethod
365 async def finish(
366 form_values: FormValues,
367 session: AnySession | None,
368 *_: Any, # noqa:ANN401
369 ) -> TableResult:
370 assert session is not None
371 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore
372 cast(str, form_values["group_name"]),
373 [
374 await session.contacts.by_jid(JID(j))
375 for j in form_values.get("contacts", []) # type:ignore
376 ],
377 )
378 muc = await session.bookmarks.by_legacy_id(legacy_id)
379 return TableResult(
380 description=f"Your new group: xmpp:{muc.jid}?join",
381 fields=[FormField("name"), FormField("jid", type="jid-single")],
382 items=[{"name": muc.name, "jid": muc.jid}],
383 jids_are_mucs=True,
384 )
387class Preferences(Command[AnySession]):
388 NAME = "⚙️ Preferences"
389 HELP = "Customize the gateway behaviour to your liking"
390 CHAT_COMMAND = "preferences"
391 NODE = "https://slidge.im/command/core/preferences"
392 ACCESS = CommandAccess.USER
394 async def run(
395 self,
396 session: AnySession | None,
397 _ifrom: JID,
398 *_: str,
399 ) -> FormSession[AnySession]:
400 fields = deepcopy(self.xmpp.PREFERENCES)
401 assert session is not None
402 current = session.user.preferences
403 for field in fields:
404 field.value = current.get(field.var, field.value) # type:ignore
405 return Form(
406 title="Preferences",
407 instructions=self.HELP,
408 fields=fields,
409 handler=self.finish, # type:ignore
410 handler_kwargs={"previous": current},
411 )
413 async def finish(
414 self,
415 form_values: UserPreferences,
416 session: AnySession | None,
417 *_: Any, # noqa:ANN401
418 previous: JSONSerializable,
419 ) -> str:
420 assert session is not None
421 if previous == form_values:
422 return "No preference was changed"
424 previous = previous.copy()
425 user = session.user
426 user.preferences.update(form_values) # type:ignore
427 self.xmpp.store.users.update(user)
429 with contextlib.suppress(NotImplementedError):
430 await session.on_preferences(previous, form_values) # type:ignore[arg-type]
432 if not previous["sync_avatar"] and form_values["sync_avatar"]:
433 await self.xmpp.fetch_user_avatar(session)
434 else:
435 user.avatar_hash = None
437 return "Your preferences have been updated."
440class Unregister(Command[AnySession]):
441 NAME = "❌ Unregister from the gateway"
442 HELP = "Unregister from the gateway"
443 CHAT_COMMAND = "unregister"
444 NODE = "https://slidge.im/command/core/unregister"
445 ACCESS = CommandAccess.USER
447 async def run(
448 self, session: AnySession | None, _ifrom: JID, *_: str
449 ) -> ConfirmationSession[AnySession]:
450 return ConfirmationSession[AnySession](
451 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?",
452 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.",
453 handler=self.unregister,
454 )
456 async def unregister(self, session: AnySession | None, _ifrom: JID) -> str:
457 assert session is not None
458 await self.xmpp.unregister_user(session.user)
459 return "You are not registered anymore. Bye!"
462class LeaveGroup(Command[AnySession]):
463 NAME = HELP = "❌ Leave a legacy group"
464 CHAT_COMMAND = "leave-group"
465 NODE = GROUPS.node + "/" + CHAT_COMMAND
466 ACCESS = CommandAccess.USER_LOGGED
467 CATEGORY = GROUPS
469 async def run(
470 self,
471 session: AnySession | None,
472 _ifrom: JID,
473 *_: str,
474 ) -> FormSession[AnySession]:
475 assert session is not None
476 await session.ready
477 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
478 return FormSession(
479 title="Leave a group",
480 instructions="Select the group you want to leave",
481 fields=[
482 FormField(
483 "group",
484 "Group name",
485 type="list-single",
486 options=[
487 {"label": g.name or str(g.legacy_id), "value": str(i)}
488 for i, g in enumerate(groups)
489 ],
490 )
491 ],
492 handler=self.confirm,
493 handler_args=(groups,),
494 )
496 async def confirm(
497 self,
498 form_values: FormValues,
499 _session: AnySession | None,
500 _ifrom: JID,
501 groups: list[AnyMUC],
502 ) -> Confirmation:
503 group = groups[int(form_values["group"])] # type:ignore
504 return Confirmation(
505 prompt=f"Are you sure you want to leave the group '{group.name}'?",
506 handler=self.finish,
507 handler_args=(group,),
508 )
510 @staticmethod
511 async def finish(session: AnySession, _ifrom: JID, group: AnyMUC) -> None:
512 await session.on_leave_group(group.legacy_id)
513 await session.bookmarks.remove(group, reason="You left this group via slidge.")
516class InviteInGroups(Command[AnySession]):
517 NAME = "💌 Re-invite me in my groups"
518 HELP = "Ask the gateway to send invitations for all your private groups"
519 CHAT_COMMAND = "re-invite"
520 NODE = GROUPS.node + "/" + CHAT_COMMAND
521 ACCESS = CommandAccess.USER_LOGGED
522 CATEGORY = GROUPS
524 async def run(self, session: "AnySession | None", _ifrom: JID, *_: str) -> None:
525 assert session is not None
526 await session.ready
527 for muc in session.bookmarks:
528 if muc.type == MucType.GROUP:
529 session.send_gateway_invite(
530 muc, reason="You asked to be re-invited in all groups."
531 )
534async def _get_updated_spaces(session: AnySession) -> list[Space]:
535 await session.ready
536 with session.xmpp.store.session() as orm:
537 spaces = list(session.xmpp.store.spaces.get_all(orm, session.user_pk))
538 updated_spaces: list[Space] = []
539 for space in spaces:
540 try:
541 updated_spaces.append(await session.bookmarks.update_space_if_needed(space))
542 except Exception:
543 session.log.exception(
544 "Something went wrong trying to update space '%r'", space
545 )
547 return sorted(updated_spaces, key=lambda s: s.name or str(s.legacy_id))