Coverage for slidge/command/user.py: 49%
173 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1# Commands available to users
2from copy import deepcopy
3from typing import TYPE_CHECKING, Any, Optional, Union, cast
5from slixmpp import JID
6from slixmpp.exceptions import XMPPError
8from ..group.room import LegacyMUC
9from ..util.types import AnyBaseSession, LegacyGroupIdType, UserPreferences
10from .base import (
11 Command,
12 CommandAccess,
13 Confirmation,
14 Form,
15 FormField,
16 FormValues,
17 SearchResult,
18 TableResult,
19)
20from .categories import CONTACTS, GROUPS
22if TYPE_CHECKING:
23 pass
26class Search(Command):
27 NAME = "🔎 Search for contacts"
28 HELP = "Search for contacts via this gateway"
29 CHAT_COMMAND = "find"
30 NODE = CONTACTS.node + "/" + CHAT_COMMAND
31 ACCESS = CommandAccess.USER_LOGGED
32 CATEGORY = CONTACTS
34 async def run(
35 self, session: Optional[AnyBaseSession], _ifrom: JID, *args: str
36 ) -> Union[Form, SearchResult, None]:
37 if args:
38 assert session is not None
39 return await session.on_search(
40 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)}
41 )
42 return Form(
43 title=self.xmpp.SEARCH_TITLE,
44 instructions=self.xmpp.SEARCH_INSTRUCTIONS,
45 fields=self.xmpp.SEARCH_FIELDS,
46 handler=self.search,
47 )
49 @staticmethod
50 async def search(
51 form_values: FormValues, session: Optional[AnyBaseSession], _ifrom: JID
52 ) -> SearchResult:
53 assert session is not None
54 results = await session.on_search(form_values) # type: ignore
55 if results is None:
56 raise XMPPError("item-not-found", "No contact was found")
58 return results
61class SyncContacts(Command):
62 NAME = "🔄 Sync XMPP roster"
63 HELP = (
64 "Synchronize your XMPP roster with your legacy contacts. "
65 "Slidge will only add/remove/modify contacts in its dedicated roster group"
66 )
67 CHAT_COMMAND = "sync-contacts"
68 NODE = CONTACTS.node + "/" + CHAT_COMMAND
69 ACCESS = CommandAccess.USER_LOGGED
70 CATEGORY = CONTACTS
72 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_) -> Confirmation:
73 return Confirmation(
74 prompt="Are you sure you want to sync your roster?",
75 success=None,
76 handler=self.sync,
77 )
79 async def sync(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str:
80 if session is None:
81 raise RuntimeError
82 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare)
84 contacts = session.contacts.known_contacts()
86 added = 0
87 removed = 0
88 updated = 0
89 for item in roster_iq["roster"]:
90 groups = set(item["groups"])
91 if self.xmpp.ROSTER_GROUP in groups:
92 contact = contacts.pop(item["jid"], None)
93 if contact is None:
94 if len(groups) == 1:
95 await self.xmpp["xep_0356"].set_roster(
96 session.user_jid, {item["jid"]: {"subscription": "remove"}}
97 )
98 removed += 1
99 else:
100 groups.remove(self.xmpp.ROSTER_GROUP)
101 await self.xmpp["xep_0356"].set_roster(
102 session.user_jid,
103 {
104 item["jid"]: {
105 "subscription": item["subscription"],
106 "name": item["name"],
107 "groups": groups,
108 }
109 },
110 )
111 updated += 1
112 else:
113 if contact.name != item["name"]:
114 await contact.add_to_roster(force=True)
115 updated += 1
117 # we popped before so this only acts on slidge contacts not in the xmpp roster
118 for contact in contacts.values():
119 added += 1
120 await contact.add_to_roster()
122 return f"{added} added, {removed} removed, {updated} updated"
125class ListContacts(Command):
126 NAME = HELP = "👤 List your legacy contacts"
127 CHAT_COMMAND = "contacts"
128 NODE = CONTACTS.node + "/" + CHAT_COMMAND
129 ACCESS = CommandAccess.USER_LOGGED
130 CATEGORY = CONTACTS
132 async def run(
133 self, session: Optional[AnyBaseSession], _ifrom: JID, *_
134 ) -> TableResult:
135 assert session is not None
136 contacts = sorted(
137 session.contacts, key=lambda c: c.name.casefold() if c.name else ""
138 )
139 return TableResult(
140 description="Your buddies",
141 fields=[FormField("name"), FormField("jid", type="jid-single")],
142 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts],
143 )
146class ListGroups(Command):
147 NAME = HELP = "👥 List your legacy groups"
148 CHAT_COMMAND = "groups"
149 NODE = GROUPS.node + "/" + CHAT_COMMAND
150 ACCESS = CommandAccess.USER_LOGGED
151 CATEGORY = GROUPS
153 async def run(self, session, _ifrom, *_):
154 assert session is not None
155 await session.bookmarks.fill()
156 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
157 return TableResult(
158 description="Your groups",
159 fields=[FormField("name"), FormField("jid", type="jid-single")],
160 items=[{"name": g.name, "jid": g.jid.bare} for g in groups],
161 jids_are_mucs=True,
162 )
165class Login(Command):
166 NAME = "🔐 Re-login to the legacy network"
167 HELP = "Login to the legacy service"
168 CHAT_COMMAND = "re-login"
169 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND
171 ACCESS = CommandAccess.USER_NON_LOGGED
173 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_):
174 assert session is not None
175 if session.is_logging_in:
176 raise XMPPError("bad-request", "You are already logging in.")
177 session.is_logging_in = True
178 try:
179 msg = await session.login()
180 except Exception as e:
181 session.send_gateway_status(f"Re-login failed: {e}", show="dnd")
182 raise XMPPError(
183 "internal-server-error", etype="wait", text=f"Could not login: {e}"
184 )
185 finally:
186 session.is_logging_in = False
187 session.logged = True
188 session.send_gateway_status(msg or "Re-connected", show="chat")
189 session.send_gateway_message(msg or "Re-connected")
190 return msg
193class CreateGroup(Command):
194 NAME = "🆕 New legacy group"
195 HELP = "Create a group on the legacy service"
196 CHAT_COMMAND = "create-group"
197 NODE = GROUPS.node + "/" + CHAT_COMMAND
198 CATEGORY = GROUPS
200 ACCESS = CommandAccess.USER_LOGGED
202 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_):
203 assert session is not None
204 contacts = session.contacts.known_contacts(only_friends=True)
205 return Form(
206 title="Create a new group",
207 instructions="Pick contacts that should be part of this new group",
208 fields=[
209 FormField(var="group_name", label="Name of the group", required=True),
210 FormField(
211 var="contacts",
212 label="Contacts to add to the new group",
213 type="list-multi",
214 options=[
215 {"value": str(contact.jid), "label": contact.name}
216 for contact in sorted(contacts.values(), key=lambda c: c.name)
217 ],
218 required=False,
219 ),
220 ],
221 handler=self.finish,
222 )
224 @staticmethod
225 async def finish(form_values: FormValues, session: Optional[AnyBaseSession], *_):
226 assert session is not None
227 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore
228 cast(str, form_values["group_name"]),
229 [
230 await session.contacts.by_jid(JID(j))
231 for j in form_values.get("contacts", []) # type:ignore
232 ],
233 )
234 muc = await session.bookmarks.by_legacy_id(legacy_id)
235 return TableResult(
236 description=f"Your new group: xmpp:{muc.jid}?join",
237 fields=[FormField("name"), FormField("jid", type="jid-single")],
238 items=[{"name": muc.name, "jid": muc.jid}],
239 jids_are_mucs=True,
240 )
243class Preferences(Command):
244 NAME = "⚙️ Preferences"
245 HELP = "Customize the gateway behaviour to your liking"
246 CHAT_COMMAND = "preferences"
247 NODE = "https://slidge.im/command/core/preferences"
248 ACCESS = CommandAccess.USER
250 async def run(
251 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any
252 ) -> Form:
253 fields = deepcopy(self.xmpp.PREFERENCES)
254 assert session is not None
255 current = session.user.preferences
256 for field in fields:
257 field.value = current.get(field.var, field.value) # type:ignore
258 return Form(
259 title="Preferences",
260 instructions=self.HELP,
261 fields=fields,
262 handler=self.finish, # type:ignore
263 )
265 async def finish(
266 self, form_values: UserPreferences, session: Optional[AnyBaseSession], *_
267 ) -> str:
268 assert session is not None
269 user = session.user
270 user.preferences.update(form_values) # type:ignore
271 if form_values["sync_avatar"]:
272 await self.xmpp.fetch_user_avatar(session)
273 else:
274 user.avatar_hash = None
276 self.xmpp.store.users.update(user)
277 return "Your preferences have been updated."
280class Unregister(Command):
281 NAME = "❌ Unregister from the gateway"
282 HELP = "Unregister from the gateway"
283 CHAT_COMMAND = "unregister"
284 NODE = "https://slidge.im/command/core/unregister"
285 ACCESS = CommandAccess.USER
287 async def run(
288 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any
289 ) -> Confirmation:
290 return Confirmation(
291 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?",
292 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.",
293 handler=self.unregister,
294 )
296 async def unregister(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str:
297 assert session is not None
298 await self.xmpp.unregister_user(session.user)
299 return "You are not registered anymore. Bye!"
302class LeaveGroup(Command):
303 NAME = HELP = "❌ Leave a legacy group"
304 CHAT_COMMAND = "leave-group"
305 NODE = GROUPS.node + "/" + CHAT_COMMAND
306 ACCESS = CommandAccess.USER_LOGGED
307 CATEGORY = GROUPS
309 async def run(self, session, _ifrom, *_):
310 assert session is not None
311 await session.bookmarks.fill()
312 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
313 return Form(
314 title="Leave a group",
315 instructions="Select the group you want to leave",
316 fields=[
317 FormField(
318 "group",
319 "Group name",
320 type="list-single",
321 options=[
322 {"label": g.name, "value": str(i)} for i, g in enumerate(groups)
323 ],
324 )
325 ],
326 handler=self.confirm, # type:ignore
327 handler_args=(groups,),
328 )
330 async def confirm(
331 self,
332 form_values: FormValues,
333 _session: AnyBaseSession,
334 _ifrom,
335 groups: list[LegacyMUC],
336 ):
337 group = groups[int(form_values["group"])] # type:ignore
338 return Confirmation(
339 prompt=f"Are you sure you want to leave the group '{group.name}'?",
340 handler=self.finish, # type:ignore
341 handler_args=(group,),
342 )
344 @staticmethod
345 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC) -> None:
346 await session.on_leave_group(group.legacy_id)
347 await session.bookmarks.remove(group, reason="You left this group via slidge.")