Coverage for slidge / command / admin.py: 47%
114 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1# Commands only accessible for slidge admins
2import functools
3import importlib
4import logging
5from datetime import datetime
6from typing import TYPE_CHECKING, Any, ClassVar
8from slixmpp import JID
9from slixmpp.exceptions import XMPPError
11from ..core import config
12from ..db.models import GatewayUser
13from ..util.types import AnySession
14from .base import (
15 NODE_PREFIX,
16 Command,
17 CommandAccess,
18 Confirmation,
19 FormField,
20 FormSession,
21 FormValues,
22 TableResult,
23)
24from .categories import ADMINISTRATION
26NODE_PREFIX = NODE_PREFIX + "admin/"
28if TYPE_CHECKING:
29 from slidge import BaseGateway
32class AdminCommand(Command[AnySession]):
33 ACCESS = CommandAccess.ADMIN_ONLY
34 CATEGORY = ADMINISTRATION
37class ListUsers(AdminCommand):
38 NAME = "👤 List registered users"
39 HELP = "List the users registered to this gateway"
40 CHAT_COMMAND = "list_users"
41 NODE = NODE_PREFIX + CHAT_COMMAND
43 async def run(
44 self, _session: AnySession | None, _ifrom: JID, *_: str
45 ) -> TableResult:
46 items = []
47 with self.xmpp.store.session() as orm:
48 for u in orm.query(GatewayUser).all():
49 d = u.registration_date
50 if d is None:
51 joined = ""
52 else:
53 joined = d.isoformat(timespec="seconds")
54 items.append({"jid": u.jid.bare, "joined": joined})
55 return TableResult(
56 description="List of registered users",
57 fields=[FormField("jid", type="jid-single"), FormField("joined")],
58 items=items, # type:ignore
59 )
62class SlidgeInfo(AdminCommand):
63 NAME = "ℹ️ Server information" # noqa:RUF001
64 HELP = "List the users registered to this gateway"
65 CHAT_COMMAND = "info"
66 NODE = NODE_PREFIX + CHAT_COMMAND
67 ACCESS = CommandAccess.ANY
69 async def run(self, _session: AnySession | None, _ifrom: JID, *_: str) -> str:
70 start = self.xmpp.datetime_started
71 uptime = datetime.now() - start
73 if uptime.days:
74 days_ago = f"{uptime.days} day{'s' if uptime.days != 1 else ''}"
75 else:
76 days_ago = None
77 hours, seconds = divmod(uptime.seconds, 3600)
79 if hours:
80 hours_ago = f"{hours} hour"
81 if hours != 1:
82 hours_ago += "s"
83 else:
84 hours_ago = None
86 minutes, seconds = divmod(seconds, 60)
87 if minutes:
88 minutes_ago = f"{minutes} minute"
89 if minutes != 1:
90 minutes_ago += "s"
91 else:
92 minutes_ago = None
94 if any((days_ago, hours_ago, minutes_ago)):
95 seconds_ago = None
96 else:
97 seconds_ago = f"{seconds} second"
98 if seconds != 1:
99 seconds_ago += "s"
101 ago = ", ".join(
102 [a for a in (days_ago, hours_ago, minutes_ago, seconds_ago) if a]
103 )
105 legacy_module = importlib.import_module(config.LEGACY_MODULE)
106 version = getattr(legacy_module, "__version__", "No version")
108 import slidge
110 return (
111 f"{self.xmpp.COMPONENT_NAME} (slidge core {slidge.__version__},"
112 f" {config.LEGACY_MODULE} {version})\n"
113 f"Up since {start:%Y-%m-%d %H:%M} ({ago} ago)"
114 )
117class DeleteUser(AdminCommand):
118 NAME = "❌ Delete a user"
119 HELP = "Unregister a user from the gateway"
120 CHAT_COMMAND = "delete_user"
121 NODE = NODE_PREFIX + CHAT_COMMAND
123 async def run(
124 self, _session: AnySession | None, _ifrom: JID, *_: str
125 ) -> FormSession[AnySession]:
126 return FormSession(
127 title="Remove a slidge user",
128 instructions="Enter the bare JID of the user you want to delete",
129 fields=[FormField("jid", type="jid-single", label="JID", required=True)],
130 handler=self.delete,
131 )
133 async def delete(
134 self,
135 form_values: FormValues,
136 _session: AnySession | None,
137 _ifrom: JID,
138 ) -> Confirmation:
139 jid: JID = form_values.get("jid") # type:ignore
140 with self.xmpp.store.session() as orm:
141 user = orm.query(GatewayUser).where(GatewayUser.jid == jid).one_or_none()
142 if user is None:
143 raise XMPPError("item-not-found", text=f"There is no user '{jid}'")
145 return Confirmation(
146 prompt=f"Are you sure you want to unregister '{jid}' from slidge?",
147 success=f"User {jid} has been deleted",
148 handler=functools.partial(self.finish, jid=jid),
149 )
151 async def finish(self, _session: AnySession | None, _ifrom: JID, jid: JID) -> None:
152 with self.xmpp.store.session() as orm:
153 user = orm.query(GatewayUser).where(GatewayUser.jid == jid).one_or_none()
154 if user is None:
155 raise XMPPError("bad-request", f"{jid} has no account here!")
156 await self.xmpp.unregister_user(user, "You have been unregistered by an admin")
159class ChangeLoglevel(AdminCommand):
160 NAME = "📋 Change the verbosity of the logs"
161 HELP = "Set the logging level"
162 CHAT_COMMAND = "loglevel"
163 NODE = NODE_PREFIX + CHAT_COMMAND
165 async def run(
166 self, _session: AnySession | None, _ifrom: JID, *_: str
167 ) -> FormSession[AnySession]:
168 return FormSession(
169 title=self.NAME,
170 instructions=self.HELP,
171 fields=[
172 FormField(
173 "level",
174 label="Log level",
175 required=True,
176 type="list-single",
177 options=[
178 {"label": "WARNING (quiet)", "value": str(logging.WARNING)},
179 {"label": "INFO (normal)", "value": str(logging.INFO)},
180 {"label": "DEBUG (verbose)", "value": str(logging.DEBUG)},
181 ],
182 )
183 ],
184 handler=self.finish,
185 )
187 @staticmethod
188 async def finish(
189 form_values: FormValues,
190 _session: AnySession | None,
191 _ifrom: JID,
192 ) -> None:
193 logging.getLogger().setLevel(int(form_values["level"])) # type:ignore
196class Exec(AdminCommand):
197 NAME = HELP = "Exec arbitrary python code. SHOULD NEVER BE AVAILABLE IN PROD."
198 CHAT_COMMAND = "!"
199 NODE = "exec"
200 ACCESS = CommandAccess.ADMIN_ONLY
202 prev_snapshot = None
204 context: ClassVar[dict[str, Any]] = {}
206 def __init__(self, xmpp: "BaseGateway") -> None:
207 super().__init__(xmpp)
209 async def run(self, session: AnySession | None, ifrom: JID, *args: str) -> str:
210 from contextlib import redirect_stdout
211 from io import StringIO
213 f = StringIO()
214 with redirect_stdout(f):
215 exec(" ".join(args), self.context)
217 out = f.getvalue()
218 if out:
219 return f"```\n{out}\n```"
220 else:
221 return "No output"