Coverage for slidge/command/admin.py: 47%

114 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-04 08:17 +0000

1# Commands only accessible for slidge admins 

2import functools 

3import importlib 

4import logging 

5from datetime import datetime 

6from typing import Any, Optional 

7 

8from slixmpp import JID 

9from slixmpp.exceptions import XMPPError 

10 

11from ..core import config 

12from ..db.models import GatewayUser 

13from ..util.types import AnyBaseSession 

14from .base import ( 

15 NODE_PREFIX, 

16 Command, 

17 CommandAccess, 

18 Confirmation, 

19 Form, 

20 FormField, 

21 FormValues, 

22 TableResult, 

23) 

24from .categories import ADMINISTRATION 

25 

26NODE_PREFIX = NODE_PREFIX + "admin/" 

27 

28 

29class AdminCommand(Command): 

30 ACCESS = CommandAccess.ADMIN_ONLY 

31 CATEGORY = ADMINISTRATION 

32 

33 

34class ListUsers(AdminCommand): 

35 NAME = "👤 List registered users" 

36 HELP = "List the users registered to this gateway" 

37 CHAT_COMMAND = "list_users" 

38 NODE = NODE_PREFIX + CHAT_COMMAND 

39 

40 async def run(self, _session, _ifrom, *_): 

41 items = [] 

42 with self.xmpp.store.session() as orm: 

43 for u in orm.query(GatewayUser).all(): 

44 d = u.registration_date 

45 if d is None: 

46 joined = "" 

47 else: 

48 joined = d.isoformat(timespec="seconds") 

49 items.append({"jid": u.jid.bare, "joined": joined}) 

50 return TableResult( 

51 description="List of registered users", 

52 fields=[FormField("jid", type="jid-single"), FormField("joined")], 

53 items=items, # type:ignore 

54 ) 

55 

56 

57class SlidgeInfo(AdminCommand): 

58 NAME = "ℹ️ Server information" 

59 HELP = "List the users registered to this gateway" 

60 CHAT_COMMAND = "info" 

61 NODE = NODE_PREFIX + CHAT_COMMAND 

62 ACCESS = CommandAccess.ANY 

63 

64 async def run(self, _session, _ifrom, *_) -> str: 

65 start = self.xmpp.datetime_started # type:ignore 

66 uptime = datetime.now() - start 

67 

68 if uptime.days: 

69 days_ago = f"{uptime.days} day{'s' if uptime.days != 1 else ''}" 

70 else: 

71 days_ago = None 

72 hours, seconds = divmod(uptime.seconds, 3600) 

73 

74 if hours: 

75 hours_ago = f"{hours} hour" 

76 if hours != 1: 

77 hours_ago += "s" 

78 else: 

79 hours_ago = None 

80 

81 minutes, seconds = divmod(seconds, 60) 

82 if minutes: 

83 minutes_ago = f"{minutes} minute" 

84 if minutes_ago != 1: 

85 minutes_ago += "s" 

86 else: 

87 minutes_ago = None 

88 

89 if any((days_ago, hours_ago, minutes_ago)): 

90 seconds_ago = None 

91 else: 

92 seconds_ago = f"{seconds} second" 

93 if seconds != 1: 

94 seconds_ago += "s" 

95 

96 ago = ", ".join( 

97 [a for a in (days_ago, hours_ago, minutes_ago, seconds_ago) if a] 

98 ) 

99 

100 legacy_module = importlib.import_module(config.LEGACY_MODULE) 

101 version = getattr(legacy_module, "__version__", "No version") 

102 

103 import slidge 

104 

105 return ( 

106 f"{self.xmpp.COMPONENT_NAME} (slidge core {slidge.__version__}," 

107 f" {config.LEGACY_MODULE} {version})\n" 

108 f"Up since {start:%Y-%m-%d %H:%M} ({ago} ago)" 

109 ) 

110 

111 

112class DeleteUser(AdminCommand): 

113 NAME = "❌ Delete a user" 

114 HELP = "Unregister a user from the gateway" 

115 CHAT_COMMAND = "delete_user" 

116 NODE = NODE_PREFIX + CHAT_COMMAND 

117 

118 async def run(self, _session, _ifrom, *_): 

119 return Form( 

120 title="Remove a slidge user", 

121 instructions="Enter the bare JID of the user you want to delete", 

122 fields=[FormField("jid", type="jid-single", label="JID", required=True)], 

123 handler=self.delete, 

124 ) 

125 

126 async def delete( 

127 self, form_values: FormValues, _session: AnyBaseSession, _ifrom: JID 

128 ) -> Confirmation: 

129 jid: JID = form_values.get("jid") # type:ignore 

130 with self.xmpp.store.session() as orm: 

131 user = orm.query(GatewayUser).one_or_none() 

132 if user is None: 

133 raise XMPPError("item-not-found", text=f"There is no user '{jid}'") 

134 

135 return Confirmation( 

136 prompt=f"Are you sure you want to unregister '{jid}' from slidge?", 

137 success=f"User {jid} has been deleted", 

138 handler=functools.partial(self.finish, jid=jid), 

139 ) 

140 

141 async def finish( 

142 self, _session: Optional[AnyBaseSession], _ifrom: JID, jid: JID 

143 ) -> None: 

144 with self.xmpp.store.session() as orm: 

145 user = orm.query(GatewayUser).one_or_none() 

146 if user is None: 

147 raise XMPPError("bad-request", f"{jid} has no account here!") 

148 await self.xmpp.unregister_user(user) 

149 

150 

151class ChangeLoglevel(AdminCommand): 

152 NAME = "📋 Change the verbosity of the logs" 

153 HELP = "Set the logging level" 

154 CHAT_COMMAND = "loglevel" 

155 NODE = NODE_PREFIX + CHAT_COMMAND 

156 

157 async def run(self, _session, _ifrom, *_): 

158 return Form( 

159 title=self.NAME, 

160 instructions=self.HELP, 

161 fields=[ 

162 FormField( 

163 "level", 

164 label="Log level", 

165 required=True, 

166 type="list-single", 

167 options=[ 

168 {"label": "WARNING (quiet)", "value": str(logging.WARNING)}, 

169 {"label": "INFO (normal)", "value": str(logging.INFO)}, 

170 {"label": "DEBUG (verbose)", "value": str(logging.DEBUG)}, 

171 ], 

172 ) 

173 ], 

174 handler=self.finish, 

175 ) 

176 

177 @staticmethod 

178 async def finish( 

179 form_values: FormValues, _session: AnyBaseSession, _ifrom: JID 

180 ) -> None: 

181 logging.getLogger().setLevel(int(form_values["level"])) # type:ignore 

182 

183 

184class Exec(AdminCommand): 

185 NAME = HELP = "Exec arbitrary python code. SHOULD NEVER BE AVAILABLE IN PROD." 

186 CHAT_COMMAND = "!" 

187 NODE = "exec" 

188 ACCESS = CommandAccess.ADMIN_ONLY 

189 

190 prev_snapshot = None 

191 

192 context = dict[str, Any]() 

193 

194 def __init__(self, xmpp) -> None: 

195 super().__init__(xmpp) 

196 

197 async def run(self, session, ifrom: JID, *args) -> str: 

198 from contextlib import redirect_stdout 

199 from io import StringIO 

200 

201 f = StringIO() 

202 with redirect_stdout(f): 

203 exec(" ".join(args), self.context) 

204 

205 out = f.getvalue() 

206 if out: 

207 return f"```\n{out}\n```" 

208 else: 

209 return "No output"