Coverage for slidge/command/admin.py: 48%

110 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1# Commands only accessible for slidge admins 

2import functools 

3import importlib 

4import logging 

5from datetime import datetime 

6from typing import Any, Optional 

7 

8from slixmpp import JID 

9from slixmpp.exceptions import XMPPError 

10 

11from ..core import config 

12from ..util.types import AnyBaseSession 

13from .base import ( 

14 NODE_PREFIX, 

15 Command, 

16 CommandAccess, 

17 Confirmation, 

18 Form, 

19 FormField, 

20 FormValues, 

21 TableResult, 

22) 

23from .categories import ADMINISTRATION 

24 

25NODE_PREFIX = NODE_PREFIX + "admin/" 

26 

27 

28class AdminCommand(Command): 

29 ACCESS = CommandAccess.ADMIN_ONLY 

30 CATEGORY = ADMINISTRATION 

31 

32 

33class ListUsers(AdminCommand): 

34 NAME = "👤 List registered users" 

35 HELP = "List the users registered to this gateway" 

36 CHAT_COMMAND = "list_users" 

37 NODE = NODE_PREFIX + CHAT_COMMAND 

38 

39 async def run(self, _session, _ifrom, *_): 

40 items = [] 

41 for u in self.xmpp.store.users.get_all(): 

42 d = u.registration_date 

43 if d is None: 

44 joined = "" 

45 else: 

46 joined = d.isoformat(timespec="seconds") 

47 items.append({"jid": u.jid.bare, "joined": joined}) 

48 return TableResult( 

49 description="List of registered users", 

50 fields=[FormField("jid", type="jid-single"), FormField("joined")], 

51 items=items, # type:ignore 

52 ) 

53 

54 

55class SlidgeInfo(AdminCommand): 

56 NAME = "ℹ️ Server information" 

57 HELP = "List the users registered to this gateway" 

58 CHAT_COMMAND = "info" 

59 NODE = NODE_PREFIX + CHAT_COMMAND 

60 ACCESS = CommandAccess.ANY 

61 

62 async def run(self, _session, _ifrom, *_): 

63 from slidge.__version__ import __version__ 

64 

65 start = self.xmpp.datetime_started # type:ignore 

66 uptime = datetime.now() - start 

67 

68 if uptime.days: 

69 days_ago = f"{uptime.days} day{'s' if uptime.days != 1 else ''}" 

70 else: 

71 days_ago = None 

72 hours, seconds = divmod(uptime.seconds, 3600) 

73 

74 if hours: 

75 hours_ago = f"{hours} hour" 

76 if hours != 1: 

77 hours_ago += "s" 

78 else: 

79 hours_ago = None 

80 

81 minutes, seconds = divmod(seconds, 60) 

82 if minutes: 

83 minutes_ago = f"{minutes} minute" 

84 if minutes_ago != 1: 

85 minutes_ago += "s" 

86 else: 

87 minutes_ago = None 

88 

89 if any((days_ago, hours_ago, minutes_ago)): 

90 seconds_ago = None 

91 else: 

92 seconds_ago = f"{seconds} second" 

93 if seconds != 1: 

94 seconds_ago += "s" 

95 

96 ago = ", ".join( 

97 [a for a in (days_ago, hours_ago, minutes_ago, seconds_ago) if a] 

98 ) 

99 

100 legacy_module = importlib.import_module(config.LEGACY_MODULE) 

101 version = getattr(legacy_module, "__version__", "No version") 

102 

103 return ( 

104 f"{self.xmpp.COMPONENT_NAME} (slidge core {__version__}," 

105 f" {config.LEGACY_MODULE} {version})\n" 

106 f"Up since {start:%Y-%m-%d %H:%M} ({ago} ago)" 

107 ) 

108 

109 

110class DeleteUser(AdminCommand): 

111 NAME = "❌ Delete a user" 

112 HELP = "Unregister a user from the gateway" 

113 CHAT_COMMAND = "delete_user" 

114 NODE = NODE_PREFIX + CHAT_COMMAND 

115 

116 async def run(self, _session, _ifrom, *_): 

117 return Form( 

118 title="Remove a slidge user", 

119 instructions="Enter the bare JID of the user you want to delete", 

120 fields=[FormField("jid", type="jid-single", label="JID", required=True)], 

121 handler=self.delete, 

122 ) 

123 

124 async def delete( 

125 self, form_values: FormValues, _session: AnyBaseSession, _ifrom: JID 

126 ) -> Confirmation: 

127 jid: JID = form_values.get("jid") # type:ignore 

128 user = self.xmpp.store.users.get(jid) 

129 if user is None: 

130 raise XMPPError("item-not-found", text=f"There is no user '{jid}'") 

131 

132 return Confirmation( 

133 prompt=f"Are you sure you want to unregister '{jid}' from slidge?", 

134 success=f"User {jid} has been deleted", 

135 handler=functools.partial(self.finish, jid=jid), 

136 ) 

137 

138 async def finish( 

139 self, _session: Optional[AnyBaseSession], _ifrom: JID, jid: JID 

140 ) -> None: 

141 user = self.xmpp.store.users.get(jid) 

142 if user is None: 

143 raise XMPPError("bad-request", f"{jid} has no account here!") 

144 await self.xmpp.unregister_user(user) 

145 

146 

147class ChangeLoglevel(AdminCommand): 

148 NAME = "📋 Change the verbosity of the logs" 

149 HELP = "Set the logging level" 

150 CHAT_COMMAND = "loglevel" 

151 NODE = NODE_PREFIX + CHAT_COMMAND 

152 

153 async def run(self, _session, _ifrom, *_): 

154 return Form( 

155 title=self.NAME, 

156 instructions=self.HELP, 

157 fields=[ 

158 FormField( 

159 "level", 

160 label="Log level", 

161 required=True, 

162 type="list-single", 

163 options=[ 

164 {"label": "WARNING (quiet)", "value": str(logging.WARNING)}, 

165 {"label": "INFO (normal)", "value": str(logging.INFO)}, 

166 {"label": "DEBUG (verbose)", "value": str(logging.DEBUG)}, 

167 ], 

168 ) 

169 ], 

170 handler=self.finish, 

171 ) 

172 

173 @staticmethod 

174 async def finish( 

175 form_values: FormValues, _session: AnyBaseSession, _ifrom: JID 

176 ) -> None: 

177 logging.getLogger().setLevel(int(form_values["level"])) # type:ignore 

178 

179 

180class Exec(AdminCommand): 

181 NAME = HELP = "Exec arbitrary python code. SHOULD NEVER BE AVAILABLE IN PROD." 

182 CHAT_COMMAND = "!" 

183 NODE = "exec" 

184 ACCESS = CommandAccess.ADMIN_ONLY 

185 

186 prev_snapshot = None 

187 

188 context = dict[str, Any]() 

189 

190 def __init__(self, xmpp): 

191 super().__init__(xmpp) 

192 

193 async def run(self, session, ifrom: JID, *args): 

194 from contextlib import redirect_stdout 

195 from io import StringIO 

196 

197 f = StringIO() 

198 with redirect_stdout(f): 

199 exec(" ".join(args), self.context) 

200 

201 out = f.getvalue() 

202 if out: 

203 return f"```\n{out}\n```" 

204 else: 

205 return "No output"