Coverage for slidge/command/user.py: 49%
171 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1# Commands available to users
2from copy import deepcopy
3from typing import TYPE_CHECKING, Any, Optional, Union, cast
5from slixmpp import JID # type:ignore[attr-defined]
6from slixmpp.exceptions import XMPPError
8from ..group.room import LegacyMUC
9from ..util.types import AnyBaseSession, LegacyGroupIdType, UserPreferences
10from .base import (
11 Command,
12 CommandAccess,
13 Confirmation,
14 Form,
15 FormField,
16 FormValues,
17 SearchResult,
18 TableResult,
19)
20from .categories import CONTACTS, GROUPS
22if TYPE_CHECKING:
23 pass
26class Search(Command):
27 NAME = "🔎 Search for contacts"
28 HELP = "Search for contacts via this gateway"
29 CHAT_COMMAND = "find"
30 NODE = CONTACTS.node + "/" + CHAT_COMMAND
31 ACCESS = CommandAccess.USER_LOGGED
32 CATEGORY = CONTACTS
34 async def run(
35 self, session: Optional[AnyBaseSession], _ifrom: JID, *args: str
36 ) -> Union[Form, SearchResult, None]:
37 if args:
38 assert session is not None
39 return await session.on_search(
40 {self.xmpp.SEARCH_FIELDS[0].var: " ".join(args)}
41 )
42 return Form(
43 title=self.xmpp.SEARCH_TITLE,
44 instructions=self.xmpp.SEARCH_INSTRUCTIONS,
45 fields=self.xmpp.SEARCH_FIELDS,
46 handler=self.search,
47 )
49 @staticmethod
50 async def search(
51 form_values: FormValues, session: Optional[AnyBaseSession], _ifrom: JID
52 ) -> SearchResult:
53 assert session is not None
54 results = await session.on_search(form_values) # type: ignore
55 if results is None:
56 raise XMPPError("item-not-found", "No contact was found")
58 return results
61class SyncContacts(Command):
62 NAME = "🔄 Sync XMPP roster"
63 HELP = (
64 "Synchronize your XMPP roster with your legacy contacts. "
65 "Slidge will only add/remove/modify contacts in its dedicated roster group"
66 )
67 CHAT_COMMAND = "sync-contacts"
68 NODE = CONTACTS.node + "/" + CHAT_COMMAND
69 ACCESS = CommandAccess.USER_LOGGED
70 CATEGORY = CONTACTS
72 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_) -> Confirmation:
73 return Confirmation(
74 prompt="Are you sure you want to sync your roster?",
75 success=None,
76 handler=self.sync,
77 )
79 async def sync(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str:
80 if session is None:
81 raise RuntimeError
82 roster_iq = await self.xmpp["xep_0356"].get_roster(session.user_jid.bare)
84 contacts = session.contacts.known_contacts()
86 added = 0
87 removed = 0
88 updated = 0
89 for item in roster_iq["roster"]:
90 groups = set(item["groups"])
91 if self.xmpp.ROSTER_GROUP in groups:
92 contact = contacts.pop(item["jid"], None)
93 if contact is None:
94 if len(groups) == 1:
95 await self.xmpp["xep_0356"].set_roster(
96 session.user_jid, {item["jid"]: {"subscription": "remove"}}
97 )
98 removed += 1
99 else:
100 groups.remove(self.xmpp.ROSTER_GROUP)
101 await self.xmpp["xep_0356"].set_roster(
102 session.user_jid,
103 {
104 item["jid"]: {
105 "subscription": item["subscription"],
106 "name": item["name"],
107 "groups": groups,
108 }
109 },
110 )
111 updated += 1
112 else:
113 if contact.name != item["name"]:
114 await contact.add_to_roster(force=True)
115 updated += 1
117 # we popped before so this only acts on slidge contacts not in the xmpp roster
118 for contact in contacts.values():
119 added += 1
120 await contact.add_to_roster()
122 return f"{added} added, {removed} removed, {updated} updated"
125class ListContacts(Command):
126 NAME = HELP = "👤 List your legacy contacts"
127 CHAT_COMMAND = "contacts"
128 NODE = CONTACTS.node + "/" + CHAT_COMMAND
129 ACCESS = CommandAccess.USER_LOGGED
130 CATEGORY = CONTACTS
132 async def run(
133 self, session: Optional[AnyBaseSession], _ifrom: JID, *_
134 ) -> TableResult:
135 assert session is not None
136 contacts = sorted(
137 session.contacts, key=lambda c: c.name.casefold() if c.name else ""
138 )
139 return TableResult(
140 description="Your buddies",
141 fields=[FormField("name"), FormField("jid", type="jid-single")],
142 items=[{"name": c.name, "jid": c.jid.bare} for c in contacts],
143 )
146class ListGroups(Command):
147 NAME = HELP = "👥 List your legacy groups"
148 CHAT_COMMAND = "groups"
149 NODE = GROUPS.node + "/" + CHAT_COMMAND
150 ACCESS = CommandAccess.USER_LOGGED
151 CATEGORY = GROUPS
153 async def run(self, session, _ifrom, *_):
154 assert session is not None
155 await session.bookmarks.fill()
156 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
157 return TableResult(
158 description="Your groups",
159 fields=[FormField("name"), FormField("jid", type="jid-single")],
160 items=[{"name": g.name, "jid": g.jid.bare} for g in groups],
161 jids_are_mucs=True,
162 )
165class Login(Command):
166 NAME = "🔐 Re-login to the legacy network"
167 HELP = "Login to the legacy service"
168 CHAT_COMMAND = "re-login"
169 NODE = "https://slidge.im/command/core/" + CHAT_COMMAND
171 ACCESS = CommandAccess.USER_NON_LOGGED
173 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_):
174 assert session is not None
175 try:
176 msg = await session.login()
177 except Exception as e:
178 session.send_gateway_status(f"Re-login failed: {e}", show="dnd")
179 raise XMPPError(
180 "internal-server-error", etype="wait", text=f"Could not login: {e}"
181 )
182 session.logged = True
183 session.send_gateway_status(msg or "Re-connected", show="chat")
184 session.send_gateway_message(msg or "Re-connected")
185 return msg
188class CreateGroup(Command):
189 NAME = "🆕 New legacy group"
190 HELP = "Create a group on the legacy service"
191 CHAT_COMMAND = "create-group"
192 NODE = GROUPS.node + "/" + CHAT_COMMAND
193 CATEGORY = GROUPS
195 ACCESS = CommandAccess.USER_LOGGED
197 async def run(self, session: Optional[AnyBaseSession], _ifrom, *_):
198 assert session is not None
199 contacts = session.contacts.known_contacts(only_friends=True)
200 return Form(
201 title="Create a new group",
202 instructions="Pick contacts that should be part of this new group",
203 fields=[
204 FormField(var="group_name", label="Name of the group", required=True),
205 FormField(
206 var="contacts",
207 label="Contacts to add to the new group",
208 type="list-multi",
209 options=[
210 {"value": str(contact.jid), "label": contact.name}
211 for contact in sorted(contacts.values(), key=lambda c: c.name)
212 ],
213 required=False,
214 ),
215 ],
216 handler=self.finish,
217 )
219 @staticmethod
220 async def finish(form_values: FormValues, session: Optional[AnyBaseSession], *_):
221 assert session is not None
222 legacy_id: LegacyGroupIdType = await session.on_create_group( # type:ignore
223 cast(str, form_values["group_name"]),
224 [
225 await session.contacts.by_jid(JID(j))
226 for j in form_values.get("contacts", []) # type:ignore
227 ],
228 )
229 muc = await session.bookmarks.by_legacy_id(legacy_id)
230 return TableResult(
231 description=f"Your new group: xmpp:{muc.jid}?join",
232 fields=[FormField("name"), FormField("jid", type="jid-single")],
233 items=[{"name": muc.name, "jid": muc.jid}],
234 jids_are_mucs=True,
235 )
238class Preferences(Command):
239 NAME = "⚙️ Preferences"
240 HELP = "Customize the gateway behaviour to your liking"
241 CHAT_COMMAND = "preferences"
242 NODE = "https://slidge.im/command/core/preferences"
243 ACCESS = CommandAccess.USER
245 async def run(
246 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any
247 ) -> Form:
248 fields = deepcopy(self.xmpp.PREFERENCES)
249 assert session is not None
250 current = session.user.preferences
251 for field in fields:
252 field.value = current.get(field.var) # type:ignore
253 return Form(
254 title="Preferences",
255 instructions=self.HELP,
256 fields=fields,
257 handler=self.finish, # type:ignore
258 )
260 async def finish(
261 self, form_values: UserPreferences, session: Optional[AnyBaseSession], *_
262 ) -> str:
263 assert session is not None
264 user = session.user
265 user.preferences.update(form_values) # type:ignore
266 self.xmpp.store.users.update(user)
267 if form_values["sync_avatar"]:
268 await self.xmpp.fetch_user_avatar(session)
269 else:
270 session.xmpp.store.users.set_avatar_hash(session.user_pk, None)
271 return "Your preferences have been updated."
274class Unregister(Command):
275 NAME = "❌ Unregister from the gateway"
276 HELP = "Unregister from the gateway"
277 CHAT_COMMAND = "unregister"
278 NODE = "https://slidge.im/command/core/unregister"
279 ACCESS = CommandAccess.USER
281 async def run(
282 self, session: Optional[AnyBaseSession], _ifrom: JID, *_: Any
283 ) -> Confirmation:
284 return Confirmation(
285 prompt=f"Are you sure you want to unregister from '{self.xmpp.boundjid}'?",
286 success=f"You are not registered to '{self.xmpp.boundjid}' anymore.",
287 handler=self.unregister,
288 )
290 async def unregister(self, session: Optional[AnyBaseSession], _ifrom: JID) -> str:
291 assert session is not None
292 user = self.xmpp.store.users.get(session.user_jid)
293 assert user is not None
294 await self.xmpp.unregister_user(user)
295 return "You are not registered anymore. Bye!"
298class LeaveGroup(Command):
299 NAME = HELP = "❌ Leave a legacy group"
300 CHAT_COMMAND = "leave-group"
301 NODE = GROUPS.node + "/" + CHAT_COMMAND
302 ACCESS = CommandAccess.USER_LOGGED
303 CATEGORY = GROUPS
305 async def run(self, session, _ifrom, *_):
306 assert session is not None
307 await session.bookmarks.fill()
308 groups = sorted(session.bookmarks, key=lambda g: g.DISCO_NAME.casefold())
309 return Form(
310 title="Leave a group",
311 instructions="Select the group you want to leave",
312 fields=[
313 FormField(
314 "group",
315 "Group name",
316 type="list-single",
317 options=[
318 {"label": g.name, "value": str(i)} for i, g in enumerate(groups)
319 ],
320 )
321 ],
322 handler=self.confirm, # type:ignore
323 handler_args=(groups,),
324 )
326 async def confirm(
327 self,
328 form_values: FormValues,
329 _session: AnyBaseSession,
330 _ifrom,
331 groups: list[LegacyMUC],
332 ):
333 group = groups[int(form_values["group"])] # type:ignore
334 return Confirmation(
335 prompt=f"Are you sure you want to leave the group '{group.name}'?",
336 handler=self.finish, # type:ignore
337 handler_args=(group,),
338 )
340 @staticmethod
341 async def finish(session: AnyBaseSession, _ifrom, group: LegacyMUC):
342 await session.on_leave_group(group.legacy_id)
343 await session.bookmarks.remove(group, reason="You left this group via slidge.")