Coverage for slidge / command / admin.py: 48%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-20 19:56 +0000

1# Commands only accessible for slidge admins 

2import functools 

3import importlib 

4import logging 

5from datetime import datetime 

6from typing import TYPE_CHECKING, Any, ClassVar 

7 

8from slixmpp import JID 

9from slixmpp.exceptions import XMPPError 

10 

11from ..core import config 

12from ..db.models import GatewayUser 

13from ..util.types import AnySession 

14from .base import ( 

15 NODE_PREFIX, 

16 Command, 

17 CommandAccess, 

18 Confirmation, 

19 FormField, 

20 FormSession, 

21 FormValues, 

22 TableResult, 

23) 

24from .categories import ADMINISTRATION 

25 

26NODE_PREFIX = NODE_PREFIX + "admin/" 

27 

28if TYPE_CHECKING: 

29 from slidge.util.types import AnyGateway 

30 

31 

32class AdminCommand(Command[AnySession]): 

33 ACCESS = CommandAccess.ADMIN_ONLY 

34 CATEGORY = ADMINISTRATION 

35 

36 

37class ListUsers(AdminCommand): 

38 NAME = "👤 List registered users" 

39 HELP = "List the users registered to this gateway" 

40 CHAT_COMMAND = "list_users" 

41 NODE = NODE_PREFIX + CHAT_COMMAND 

42 

43 async def run( 

44 self, _session: AnySession | None, _ifrom: JID, *_: str 

45 ) -> TableResult: 

46 items = [] 

47 with self.xmpp.store.session() as orm: 

48 for u in orm.query(GatewayUser).all(): 

49 d = u.registration_date 

50 joined = "" if d is None else d.isoformat(timespec="seconds") 

51 items.append({"jid": u.jid.bare, "joined": joined}) 

52 return TableResult( 

53 description="List of registered users", 

54 fields=[FormField("jid", type="jid-single"), FormField("joined")], 

55 items=items, # type:ignore 

56 ) 

57 

58 

59class SlidgeInfo(AdminCommand): 

60 NAME = "ℹ️ Server information" # noqa:RUF001 

61 HELP = "List the users registered to this gateway" 

62 CHAT_COMMAND = "info" 

63 NODE = NODE_PREFIX + CHAT_COMMAND 

64 ACCESS = CommandAccess.ANY 

65 

66 async def run(self, _session: AnySession | None, _ifrom: JID, *_: str) -> str: 

67 start = self.xmpp.datetime_started 

68 uptime = datetime.now() - start 

69 

70 if uptime.days: 

71 days_ago = f"{uptime.days} day{'s' if uptime.days != 1 else ''}" 

72 else: 

73 days_ago = None 

74 hours, seconds = divmod(uptime.seconds, 3600) 

75 

76 if hours: 

77 hours_ago = f"{hours} hour" 

78 if hours != 1: 

79 hours_ago += "s" 

80 else: 

81 hours_ago = None 

82 

83 minutes, seconds = divmod(seconds, 60) 

84 if minutes: 

85 minutes_ago = f"{minutes} minute" 

86 if minutes != 1: 

87 minutes_ago += "s" 

88 else: 

89 minutes_ago = None 

90 

91 if any((days_ago, hours_ago, minutes_ago)): 

92 seconds_ago = None 

93 else: 

94 seconds_ago = f"{seconds} second" 

95 if seconds != 1: 

96 seconds_ago += "s" 

97 

98 ago = ", ".join( 

99 [a for a in (days_ago, hours_ago, minutes_ago, seconds_ago) if a] 

100 ) 

101 

102 legacy_module = importlib.import_module(config.LEGACY_MODULE) 

103 version = getattr(legacy_module, "__version__", "No version") 

104 

105 import slidge 

106 

107 return ( 

108 f"{self.xmpp.COMPONENT_NAME} (slidge core {slidge.__version__}," 

109 f" {config.LEGACY_MODULE} {version})\n" 

110 f"Up since {start:%Y-%m-%d %H:%M} ({ago} ago)" 

111 ) 

112 

113 

114class DeleteUser(AdminCommand): 

115 NAME = "❌ Delete a user" 

116 HELP = "Unregister a user from the gateway" 

117 CHAT_COMMAND = "delete_user" 

118 NODE = NODE_PREFIX + CHAT_COMMAND 

119 

120 async def run( 

121 self, _session: AnySession | None, _ifrom: JID, *_: str 

122 ) -> FormSession[AnySession]: 

123 return FormSession( 

124 title="Remove a slidge user", 

125 instructions="Enter the bare JID of the user you want to delete", 

126 fields=[FormField("jid", type="jid-single", label="JID", required=True)], 

127 handler=self.delete, 

128 ) 

129 

130 async def delete( 

131 self, 

132 form_values: FormValues, 

133 _session: AnySession | None, 

134 _ifrom: JID, 

135 ) -> Confirmation: 

136 jid: JID = form_values.get("jid") # type:ignore 

137 with self.xmpp.store.session() as orm: 

138 user = orm.query(GatewayUser).where(GatewayUser.jid == jid).one_or_none() 

139 if user is None: 

140 raise XMPPError("item-not-found", text=f"There is no user '{jid}'") 

141 

142 return Confirmation( 

143 prompt=f"Are you sure you want to unregister '{jid}' from slidge?", 

144 success=f"User {jid} has been deleted", 

145 handler=functools.partial(self.finish, jid=jid), 

146 ) 

147 

148 async def finish(self, _session: AnySession | None, _ifrom: JID, jid: JID) -> None: 

149 with self.xmpp.store.session() as orm: 

150 user = orm.query(GatewayUser).where(GatewayUser.jid == jid).one_or_none() 

151 if user is None: 

152 raise XMPPError("bad-request", f"{jid} has no account here!") 

153 await self.xmpp.unregister_user(user, "You have been unregistered by an admin") 

154 

155 

156class ChangeLoglevel(AdminCommand): 

157 NAME = "📋 Change the verbosity of the logs" 

158 HELP = "Set the logging level" 

159 CHAT_COMMAND = "loglevel" 

160 NODE = NODE_PREFIX + CHAT_COMMAND 

161 

162 async def run( 

163 self, _session: AnySession | None, _ifrom: JID, *_: str 

164 ) -> FormSession[AnySession]: 

165 return FormSession( 

166 title=self.NAME, 

167 instructions=self.HELP, 

168 fields=[ 

169 FormField( 

170 "level", 

171 label="Log level", 

172 required=True, 

173 type="list-single", 

174 options=[ 

175 {"label": "WARNING (quiet)", "value": str(logging.WARNING)}, 

176 {"label": "INFO (normal)", "value": str(logging.INFO)}, 

177 {"label": "DEBUG (verbose)", "value": str(logging.DEBUG)}, 

178 ], 

179 ) 

180 ], 

181 handler=self.finish, 

182 ) 

183 

184 @staticmethod 

185 async def finish( 

186 form_values: FormValues, 

187 _session: AnySession | None, 

188 _ifrom: JID, 

189 ) -> None: 

190 logging.getLogger().setLevel(int(form_values["level"])) # type:ignore 

191 

192 

193class Exec(AdminCommand): 

194 NAME = HELP = "Exec arbitrary python code. SHOULD NEVER BE AVAILABLE IN PROD." 

195 CHAT_COMMAND = "!" 

196 NODE = "exec" 

197 ACCESS = CommandAccess.ADMIN_ONLY 

198 

199 prev_snapshot = None 

200 

201 context: ClassVar[dict[str, Any]] = {} 

202 

203 def __init__(self, xmpp: "AnyGateway") -> None: 

204 super().__init__(xmpp) 

205 

206 async def run(self, session: AnySession | None, ifrom: JID, *args: str) -> str: 

207 from contextlib import redirect_stdout 

208 from io import StringIO 

209 

210 f = StringIO() 

211 with redirect_stdout(f): 

212 exec(" ".join(args), self.context) 

213 

214 out = f.getvalue() 

215 if out: 

216 return f"```\n{out}\n```" 

217 else: 

218 return "No output"