Coverage for slidge/util/util.py: 78%

170 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import logging 

2import mimetypes 

3import re 

4import subprocess 

5import warnings 

6from abc import ABCMeta 

7from functools import wraps 

8from pathlib import Path 

9from time import time 

10from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, Type, TypeVar 

11 

12try: 

13 import emoji 

14except ImportError: 

15 EMOJI_LIB_AVAILABLE = False 

16else: 

17 EMOJI_LIB_AVAILABLE = True 

18 

19from .types import Mention, ResourceDict 

20 

21if TYPE_CHECKING: 

22 from ..contact.contact import LegacyContact 

23 

24try: 

25 import magic 

26except ImportError as e: 

27 magic = None # type:ignore 

28 logging.warning( 

29 ( 

30 "Libmagic is not available: %s. " 

31 "It's OK if you don't use fix-filename-suffix-mime-type." 

32 ), 

33 e, 

34 ) 

35 

36 

37def fix_suffix( 

38 path: Path, mime_type: Optional[str], file_name: Optional[str] 

39) -> tuple[str, str]: 

40 guessed = magic.from_file(path, mime=True) 

41 if guessed == mime_type: 

42 log.debug("Magic and given MIME match") 

43 else: 

44 log.debug("Magic (%s) and given MIME (%s) differ", guessed, mime_type) 

45 mime_type = guessed 

46 

47 valid_suffix_list = mimetypes.guess_all_extensions(mime_type, strict=False) 

48 

49 if file_name: 

50 name = Path(file_name) 

51 else: 

52 name = Path(path.name) 

53 

54 suffix = name.suffix 

55 

56 if suffix in valid_suffix_list: 

57 log.debug("Suffix %s is in %s", suffix, valid_suffix_list) 

58 return str(name), guessed 

59 

60 valid_suffix = mimetypes.guess_extension(mime_type.split(";")[0], strict=False) 

61 if valid_suffix is None: 

62 log.debug("No valid suffix found") 

63 return str(name), guessed 

64 

65 log.debug("Changing suffix of %s to %s", file_name or path.name, valid_suffix) 

66 return str(name.with_suffix(valid_suffix)), guessed 

67 

68 

69class SubclassableOnce(type): 

70 # To allow importing everything, including plugins, during tests 

71 TEST_MODE: bool = False 

72 

73 def __init__( 

74 cls, 

75 name: str, 

76 bases: tuple[Type[Any], ...], 

77 dct: dict[str, Any], 

78 ) -> None: 

79 for b in bases: 

80 if type(b) in (SubclassableOnce, ABCSubclassableOnceAtMost): 

81 if hasattr(b, "_subclass") and not cls.TEST_MODE: 

82 raise RuntimeError( 

83 "This class must be subclassed once at most!", 

84 cls, 

85 name, 

86 bases, 

87 dct, 

88 ) 

89 else: 

90 log.debug("Setting %s as subclass for %s", cls, b) 

91 b._subclass = cls 

92 

93 super().__init__(name, bases, dct) 

94 

95 def get_self_or_unique_subclass(cls) -> "SubclassableOnce": 

96 try: 

97 return cls.get_unique_subclass() 

98 except AttributeError: 

99 return cls 

100 

101 def get_unique_subclass(cls) -> "SubclassableOnce": 

102 r = getattr(cls, "_subclass", None) 

103 if r is None: 

104 raise AttributeError("Could not find any subclass", cls) 

105 return r 

106 

107 def reset_subclass(cls) -> None: 

108 try: 

109 log.debug("Resetting subclass of %s", cls) 

110 delattr(cls, "_subclass") 

111 except AttributeError: 

112 log.debug("No subclass were registered for %s", cls) 

113 

114 

115class ABCSubclassableOnceAtMost(ABCMeta, SubclassableOnce): 

116 pass 

117 

118 

119def is_valid_phone_number(phone: Optional[str]): 

120 if phone is None: 

121 return False 

122 match = re.match(r"\+\d.*", phone) 

123 if match is None: 

124 return False 

125 return match[0] == phone 

126 

127 

128def strip_illegal_chars(s: str): 

129 return ILLEGAL_XML_CHARS_RE.sub("", s) 

130 

131 

132# from https://stackoverflow.com/a/64570125/5902284 and Link Mauve 

133ILLEGAL = [ 

134 (0x00, 0x08), 

135 (0x0B, 0x0C), 

136 (0x0E, 0x1F), 

137 (0x7F, 0x84), 

138 (0x86, 0x9F), 

139 (0xFDD0, 0xFDDF), 

140 (0xFFFE, 0xFFFF), 

141 (0x1FFFE, 0x1FFFF), 

142 (0x2FFFE, 0x2FFFF), 

143 (0x3FFFE, 0x3FFFF), 

144 (0x4FFFE, 0x4FFFF), 

145 (0x5FFFE, 0x5FFFF), 

146 (0x6FFFE, 0x6FFFF), 

147 (0x7FFFE, 0x7FFFF), 

148 (0x8FFFE, 0x8FFFF), 

149 (0x9FFFE, 0x9FFFF), 

150 (0xAFFFE, 0xAFFFF), 

151 (0xBFFFE, 0xBFFFF), 

152 (0xCFFFE, 0xCFFFF), 

153 (0xDFFFE, 0xDFFFF), 

154 (0xEFFFE, 0xEFFFF), 

155 (0xFFFFE, 0xFFFFF), 

156 (0x10FFFE, 0x10FFFF), 

157] 

158 

159ILLEGAL_RANGES = [rf"{chr(low)}-{chr(high)}" for (low, high) in ILLEGAL] 

160XML_ILLEGAL_CHARACTER_REGEX = "[" + "".join(ILLEGAL_RANGES) + "]" 

161ILLEGAL_XML_CHARS_RE = re.compile(XML_ILLEGAL_CHARACTER_REGEX) 

162 

163 

164# from https://stackoverflow.com/a/35804945/5902284 

165def addLoggingLevel( 

166 levelName: str = "TRACE", levelNum: int = logging.DEBUG - 5, methodName=None 

167) -> None: 

168 """ 

169 Comprehensively adds a new logging level to the `logging` module and the 

170 currently configured logging class. 

171 

172 `levelName` becomes an attribute of the `logging` module with the value 

173 `levelNum`. `methodName` becomes a convenience method for both `logging` 

174 itself and the class returned by `logging.getLoggerClass()` (usually just 

175 `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is 

176 used. 

177 

178 To avoid accidental clobberings of existing attributes, this method will 

179 raise an `AttributeError` if the level name is already an attribute of the 

180 `logging` module or if the method name is already present 

181 

182 Example 

183 ------- 

184 >>> addLoggingLevel('TRACE', logging.DEBUG - 5) 

185 >>> logging.getLogger(__name__).setLevel("TRACE") 

186 >>> logging.getLogger(__name__).trace('that worked') 

187 >>> logging.trace('so did this') 

188 >>> logging.TRACE 

189 5 

190 

191 """ 

192 if not methodName: 

193 methodName = levelName.lower() 

194 

195 if hasattr(logging, levelName): 

196 log.debug("{} already defined in logging module".format(levelName)) 

197 return 

198 if hasattr(logging, methodName): 

199 log.debug("{} already defined in logging module".format(methodName)) 

200 return 

201 if hasattr(logging.getLoggerClass(), methodName): 

202 log.debug("{} already defined in logger class".format(methodName)) 

203 return 

204 

205 # This method was inspired by the answers to Stack Overflow post 

206 # http://stackoverflow.com/q/2183233/2988730, especially 

207 # http://stackoverflow.com/a/13638084/2988730 

208 def logForLevel(self, message, *args, **kwargs) -> None: 

209 if self.isEnabledFor(levelNum): 

210 self._log(levelNum, message, args, **kwargs) 

211 

212 def logToRoot(message, *args, **kwargs) -> None: 

213 logging.log(levelNum, message, *args, **kwargs) 

214 

215 logging.addLevelName(levelNum, levelName) 

216 setattr(logging, levelName, levelNum) 

217 setattr(logging.getLoggerClass(), methodName, logForLevel) 

218 setattr(logging, methodName, logToRoot) 

219 

220 

221class SlidgeLogger(logging.Logger): 

222 def trace(self) -> None: 

223 pass 

224 

225 

226log = logging.getLogger(__name__) 

227 

228 

229def merge_resources(resources: dict[str, ResourceDict]) -> Optional[ResourceDict]: 

230 if len(resources) == 0: 

231 return None 

232 

233 if len(resources) == 1: 

234 return next(iter(resources.values())) 

235 

236 by_priority = sorted(resources.values(), key=lambda r: r["priority"], reverse=True) 

237 

238 if any(r["show"] == "" for r in resources.values()): 

239 # if a client is "available", we're "available" 

240 show = "" 

241 else: 

242 for r in by_priority: 

243 if r["show"]: 

244 show = r["show"] 

245 break 

246 else: 

247 raise RuntimeError() 

248 

249 # if there are different statuses, we use the highest priority one, 

250 # but we ignore resources without status, even with high priority 

251 status = "" 

252 for r in by_priority: 

253 if r["status"]: 

254 status = r["status"] 

255 break 

256 

257 return { 

258 "show": show, # type:ignore 

259 "status": status, 

260 "priority": 0, 

261 } 

262 

263 

264def remove_emoji_variation_selector_16(emoji: str): 

265 # this is required for compatibility with dino, and maybe other future clients? 

266 return bytes(emoji, encoding="utf-8").replace(b"\xef\xb8\x8f", b"").decode() 

267 

268 

269def deprecated(name: str, new: Callable): 

270 # @functools.wraps 

271 def wrapped(*args, **kwargs): 

272 warnings.warn( 

273 f"{name} is deprecated. Use {new.__name__} instead", 

274 category=DeprecationWarning, 

275 ) 

276 return new(*args, **kwargs) 

277 

278 return wrapped 

279 

280 

281T = TypeVar("T", bound=NamedTuple) 

282 

283 

284def dict_to_named_tuple(data: dict, cls: Type[T]) -> T: 

285 return cls(*(data.get(f) for f in cls._fields)) # type:ignore 

286 

287 

288def replace_mentions( 

289 text: str, 

290 mentions: Optional[list[Mention]], 

291 mapping: Callable[[Mention], str], 

292): 

293 if not mentions: 

294 return text 

295 

296 cursor = 0 

297 pieces = [] 

298 for mention in mentions: 

299 try: 

300 new_text = mapping(mention) 

301 except Exception as exc: 

302 log.debug("Attempting slidge <= 0.3.3 compatibility: %s", exc) 

303 new_text = mapping(mention.contact) # type:ignore 

304 pieces.extend([text[cursor : mention.start], new_text]) 

305 cursor = mention.end 

306 pieces.append(text[cursor:]) 

307 return "".join(pieces) 

308 

309 

310def timeit(func): 

311 @wraps(func) 

312 async def wrapped(self, *args, **kwargs): 

313 start = time() 

314 r = await func(self, *args, **kwargs) 

315 self.log.debug("%s took %s ms", func.__name__, round((time() - start) * 1000)) 

316 return r 

317 

318 return wrapped 

319 

320 

321def strip_leading_emoji(text: str) -> str: 

322 if not EMOJI_LIB_AVAILABLE: 

323 return text 

324 words = text.split(" ") 

325 # is_emoji returns False for 🛷️ for obscure reasons, 

326 # purely_emoji seems better 

327 if len(words) > 1 and emoji.purely_emoji(words[0]): 

328 return " ".join(words[1:]) 

329 return text 

330 

331 

332async def noop_coro() -> None: 

333 pass 

334 

335 

336def add_quote_prefix(text: str): 

337 """ 

338 Return multi-line text with leading quote marks (i.e. the ">" character). 

339 """ 

340 return "\n".join(("> " + x).strip() for x in text.split("\n")).strip()