Coverage for slidge / command / admin.py: 48%
112 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1# Commands only accessible for slidge admins
2import functools
3import importlib
4import logging
5from datetime import datetime
6from typing import TYPE_CHECKING, Any, ClassVar
8from slixmpp import JID
9from slixmpp.exceptions import XMPPError
11from ..core import config
12from ..db.models import GatewayUser
13from ..util.types import AnySession
14from .base import (
15 NODE_PREFIX,
16 Command,
17 CommandAccess,
18 Confirmation,
19 FormField,
20 FormSession,
21 FormValues,
22 TableResult,
23)
24from .categories import ADMINISTRATION
26NODE_PREFIX = NODE_PREFIX + "admin/"
28if TYPE_CHECKING:
29 from slidge.util.types import AnyGateway
32class AdminCommand(Command[AnySession]):
33 ACCESS = CommandAccess.ADMIN_ONLY
34 CATEGORY = ADMINISTRATION
37class ListUsers(AdminCommand):
38 NAME = "👤 List registered users"
39 HELP = "List the users registered to this gateway"
40 CHAT_COMMAND = "list_users"
41 NODE = NODE_PREFIX + CHAT_COMMAND
43 async def run(
44 self, _session: "AnySession | None", _ifrom: JID, *_: str
45 ) -> TableResult:
46 items = []
47 with self.xmpp.store.session() as orm:
48 for u in orm.query(GatewayUser).all():
49 d = u.registration_date
50 joined = "" if d is None else d.isoformat(timespec="seconds")
51 items.append({"jid": u.jid.bare, "joined": joined})
52 return TableResult(
53 description="List of registered users",
54 fields=[FormField("jid", type="jid-single"), FormField("joined")],
55 items=items, # type:ignore
56 )
59class SlidgeInfo(AdminCommand):
60 NAME = "ℹ️ Server information" # noqa:RUF001
61 HELP = "List the users registered to this gateway"
62 CHAT_COMMAND = "info"
63 NODE = NODE_PREFIX + CHAT_COMMAND
64 ACCESS = CommandAccess.ANY
66 async def run(self, _session: "AnySession | None", _ifrom: JID, *_: str) -> str:
67 start = self.xmpp.datetime_started
68 uptime = datetime.now() - start
70 if uptime.days:
71 days_ago = f"{uptime.days} day{'s' if uptime.days != 1 else ''}"
72 else:
73 days_ago = None
74 hours, seconds = divmod(uptime.seconds, 3600)
76 if hours:
77 hours_ago = f"{hours} hour"
78 if hours != 1:
79 hours_ago += "s"
80 else:
81 hours_ago = None
83 minutes, seconds = divmod(seconds, 60)
84 if minutes:
85 minutes_ago = f"{minutes} minute"
86 if minutes != 1:
87 minutes_ago += "s"
88 else:
89 minutes_ago = None
91 if any((days_ago, hours_ago, minutes_ago)):
92 seconds_ago = None
93 else:
94 seconds_ago = f"{seconds} second"
95 if seconds != 1:
96 seconds_ago += "s"
98 ago = ", ".join(
99 [a for a in (days_ago, hours_ago, minutes_ago, seconds_ago) if a]
100 )
102 legacy_module = importlib.import_module(config.LEGACY_MODULE)
103 version = getattr(legacy_module, "__version__", "No version")
105 import slidge
107 return (
108 f"{self.xmpp.COMPONENT_NAME} (slidge core {slidge.__version__},"
109 f" {config.LEGACY_MODULE} {version})\n"
110 f"Up since {start:%Y-%m-%d %H:%M} ({ago} ago)"
111 )
114class DeleteUser(AdminCommand):
115 NAME = "❌ Delete a user"
116 HELP = "Unregister a user from the gateway"
117 CHAT_COMMAND = "delete_user"
118 NODE = NODE_PREFIX + CHAT_COMMAND
120 async def run(
121 self, _session: "AnySession | None", _ifrom: JID, *_: str
122 ) -> FormSession[AnySession]:
123 return FormSession(
124 title="Remove a slidge user",
125 instructions="Enter the bare JID of the user you want to delete",
126 fields=[FormField("jid", type="jid-single", label="JID", required=True)],
127 handler=self.delete,
128 )
130 async def delete(
131 self,
132 form_values: FormValues,
133 _session: "AnySession | None",
134 _ifrom: JID,
135 ) -> Confirmation:
136 jid: JID = form_values.get("jid") # type:ignore
137 with self.xmpp.store.session() as orm:
138 user = orm.query(GatewayUser).where(GatewayUser.jid == jid).one_or_none()
139 if user is None:
140 raise XMPPError("item-not-found", text=f"There is no user '{jid}'")
142 return Confirmation(
143 prompt=f"Are you sure you want to unregister '{jid}' from slidge?",
144 success=f"User {jid} has been deleted",
145 handler=functools.partial(self.finish, jid=jid),
146 )
148 async def finish(
149 self, _session: "AnySession | None", _ifrom: JID, jid: JID
150 ) -> None:
151 with self.xmpp.store.session() as orm:
152 user = orm.query(GatewayUser).where(GatewayUser.jid == jid).one_or_none()
153 if user is None:
154 raise XMPPError("bad-request", f"{jid} has no account here!")
155 await self.xmpp.unregister_user(user, "You have been unregistered by an admin")
158class ChangeLoglevel(AdminCommand):
159 NAME = "📋 Change the verbosity of the logs"
160 HELP = "Set the logging level"
161 CHAT_COMMAND = "loglevel"
162 NODE = NODE_PREFIX + CHAT_COMMAND
164 async def run(
165 self, _session: "AnySession | None", _ifrom: JID, *_: str
166 ) -> FormSession[AnySession]:
167 return FormSession(
168 title=self.NAME,
169 instructions=self.HELP,
170 fields=[
171 FormField(
172 "level",
173 label="Log level",
174 required=True,
175 type="list-single",
176 options=[
177 {"label": "WARNING (quiet)", "value": str(logging.WARNING)},
178 {"label": "INFO (normal)", "value": str(logging.INFO)},
179 {"label": "DEBUG (verbose)", "value": str(logging.DEBUG)},
180 ],
181 )
182 ],
183 handler=self.finish,
184 )
186 @staticmethod
187 async def finish(
188 form_values: FormValues,
189 _session: "AnySession | None",
190 _ifrom: JID,
191 ) -> None:
192 logging.getLogger().setLevel(int(form_values["level"])) # type:ignore
195class Exec(AdminCommand):
196 NAME = HELP = "Exec arbitrary python code. SHOULD NEVER BE AVAILABLE IN PROD."
197 CHAT_COMMAND = "!"
198 NODE = "exec"
199 ACCESS = CommandAccess.ADMIN_ONLY
201 prev_snapshot = None
203 context: ClassVar[dict[str, Any]] = {}
205 def __init__(self, xmpp: "AnyGateway") -> None:
206 super().__init__(xmpp)
208 async def run(self, session: "AnySession | None", ifrom: JID, *args: str) -> str:
209 from contextlib import redirect_stdout
210 from io import StringIO
212 f = StringIO()
213 with redirect_stdout(f):
214 exec(" ".join(args), self.context)
216 out = f.getvalue()
217 if out:
218 return f"```\n{out}\n```"
219 else:
220 return "No output"