Coverage for slidge/util/util.py: 78%
170 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import logging
2import mimetypes
3import re
4import subprocess
5import warnings
6from abc import ABCMeta
7from functools import wraps
8from pathlib import Path
9from time import time
10from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, Type, TypeVar
12try:
13 import emoji
14except ImportError:
15 EMOJI_LIB_AVAILABLE = False
16else:
17 EMOJI_LIB_AVAILABLE = True
19from .types import Mention, ResourceDict
21if TYPE_CHECKING:
22 from ..contact.contact import LegacyContact
24try:
25 import magic
26except ImportError as e:
27 magic = None # type:ignore
28 logging.warning(
29 (
30 "Libmagic is not available: %s. "
31 "It's OK if you don't use fix-filename-suffix-mime-type."
32 ),
33 e,
34 )
37def fix_suffix(
38 path: Path, mime_type: Optional[str], file_name: Optional[str]
39) -> tuple[str, str]:
40 guessed = magic.from_file(path, mime=True)
41 if guessed == mime_type:
42 log.debug("Magic and given MIME match")
43 else:
44 log.debug("Magic (%s) and given MIME (%s) differ", guessed, mime_type)
45 mime_type = guessed
47 valid_suffix_list = mimetypes.guess_all_extensions(mime_type, strict=False)
49 if file_name:
50 name = Path(file_name)
51 else:
52 name = Path(path.name)
54 suffix = name.suffix
56 if suffix in valid_suffix_list:
57 log.debug("Suffix %s is in %s", suffix, valid_suffix_list)
58 return str(name), guessed
60 valid_suffix = mimetypes.guess_extension(mime_type.split(";")[0], strict=False)
61 if valid_suffix is None:
62 log.debug("No valid suffix found")
63 return str(name), guessed
65 log.debug("Changing suffix of %s to %s", file_name or path.name, valid_suffix)
66 return str(name.with_suffix(valid_suffix)), guessed
69class SubclassableOnce(type):
70 # To allow importing everything, including plugins, during tests
71 TEST_MODE: bool = False
73 def __init__(
74 cls,
75 name: str,
76 bases: tuple[Type[Any], ...],
77 dct: dict[str, Any],
78 ) -> None:
79 for b in bases:
80 if type(b) in (SubclassableOnce, ABCSubclassableOnceAtMost):
81 if hasattr(b, "_subclass") and not cls.TEST_MODE:
82 raise RuntimeError(
83 "This class must be subclassed once at most!",
84 cls,
85 name,
86 bases,
87 dct,
88 )
89 else:
90 log.debug("Setting %s as subclass for %s", cls, b)
91 b._subclass = cls
93 super().__init__(name, bases, dct)
95 def get_self_or_unique_subclass(cls) -> "SubclassableOnce":
96 try:
97 return cls.get_unique_subclass()
98 except AttributeError:
99 return cls
101 def get_unique_subclass(cls) -> "SubclassableOnce":
102 r = getattr(cls, "_subclass", None)
103 if r is None:
104 raise AttributeError("Could not find any subclass", cls)
105 return r
107 def reset_subclass(cls) -> None:
108 try:
109 log.debug("Resetting subclass of %s", cls)
110 delattr(cls, "_subclass")
111 except AttributeError:
112 log.debug("No subclass were registered for %s", cls)
115class ABCSubclassableOnceAtMost(ABCMeta, SubclassableOnce):
116 pass
119def is_valid_phone_number(phone: Optional[str]):
120 if phone is None:
121 return False
122 match = re.match(r"\+\d.*", phone)
123 if match is None:
124 return False
125 return match[0] == phone
128def strip_illegal_chars(s: str):
129 return ILLEGAL_XML_CHARS_RE.sub("", s)
132# from https://stackoverflow.com/a/64570125/5902284 and Link Mauve
133ILLEGAL = [
134 (0x00, 0x08),
135 (0x0B, 0x0C),
136 (0x0E, 0x1F),
137 (0x7F, 0x84),
138 (0x86, 0x9F),
139 (0xFDD0, 0xFDDF),
140 (0xFFFE, 0xFFFF),
141 (0x1FFFE, 0x1FFFF),
142 (0x2FFFE, 0x2FFFF),
143 (0x3FFFE, 0x3FFFF),
144 (0x4FFFE, 0x4FFFF),
145 (0x5FFFE, 0x5FFFF),
146 (0x6FFFE, 0x6FFFF),
147 (0x7FFFE, 0x7FFFF),
148 (0x8FFFE, 0x8FFFF),
149 (0x9FFFE, 0x9FFFF),
150 (0xAFFFE, 0xAFFFF),
151 (0xBFFFE, 0xBFFFF),
152 (0xCFFFE, 0xCFFFF),
153 (0xDFFFE, 0xDFFFF),
154 (0xEFFFE, 0xEFFFF),
155 (0xFFFFE, 0xFFFFF),
156 (0x10FFFE, 0x10FFFF),
157]
159ILLEGAL_RANGES = [rf"{chr(low)}-{chr(high)}" for (low, high) in ILLEGAL]
160XML_ILLEGAL_CHARACTER_REGEX = "[" + "".join(ILLEGAL_RANGES) + "]"
161ILLEGAL_XML_CHARS_RE = re.compile(XML_ILLEGAL_CHARACTER_REGEX)
164# from https://stackoverflow.com/a/35804945/5902284
165def addLoggingLevel(
166 levelName: str = "TRACE", levelNum: int = logging.DEBUG - 5, methodName=None
167) -> None:
168 """
169 Comprehensively adds a new logging level to the `logging` module and the
170 currently configured logging class.
172 `levelName` becomes an attribute of the `logging` module with the value
173 `levelNum`. `methodName` becomes a convenience method for both `logging`
174 itself and the class returned by `logging.getLoggerClass()` (usually just
175 `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is
176 used.
178 To avoid accidental clobberings of existing attributes, this method will
179 raise an `AttributeError` if the level name is already an attribute of the
180 `logging` module or if the method name is already present
182 Example
183 -------
184 >>> addLoggingLevel('TRACE', logging.DEBUG - 5)
185 >>> logging.getLogger(__name__).setLevel("TRACE")
186 >>> logging.getLogger(__name__).trace('that worked')
187 >>> logging.trace('so did this')
188 >>> logging.TRACE
189 5
191 """
192 if not methodName:
193 methodName = levelName.lower()
195 if hasattr(logging, levelName):
196 log.debug("{} already defined in logging module".format(levelName))
197 return
198 if hasattr(logging, methodName):
199 log.debug("{} already defined in logging module".format(methodName))
200 return
201 if hasattr(logging.getLoggerClass(), methodName):
202 log.debug("{} already defined in logger class".format(methodName))
203 return
205 # This method was inspired by the answers to Stack Overflow post
206 # http://stackoverflow.com/q/2183233/2988730, especially
207 # http://stackoverflow.com/a/13638084/2988730
208 def logForLevel(self, message, *args, **kwargs) -> None:
209 if self.isEnabledFor(levelNum):
210 self._log(levelNum, message, args, **kwargs)
212 def logToRoot(message, *args, **kwargs) -> None:
213 logging.log(levelNum, message, *args, **kwargs)
215 logging.addLevelName(levelNum, levelName)
216 setattr(logging, levelName, levelNum)
217 setattr(logging.getLoggerClass(), methodName, logForLevel)
218 setattr(logging, methodName, logToRoot)
221class SlidgeLogger(logging.Logger):
222 def trace(self) -> None:
223 pass
226log = logging.getLogger(__name__)
229def merge_resources(resources: dict[str, ResourceDict]) -> Optional[ResourceDict]:
230 if len(resources) == 0:
231 return None
233 if len(resources) == 1:
234 return next(iter(resources.values()))
236 by_priority = sorted(resources.values(), key=lambda r: r["priority"], reverse=True)
238 if any(r["show"] == "" for r in resources.values()):
239 # if a client is "available", we're "available"
240 show = ""
241 else:
242 for r in by_priority:
243 if r["show"]:
244 show = r["show"]
245 break
246 else:
247 raise RuntimeError()
249 # if there are different statuses, we use the highest priority one,
250 # but we ignore resources without status, even with high priority
251 status = ""
252 for r in by_priority:
253 if r["status"]:
254 status = r["status"]
255 break
257 return {
258 "show": show, # type:ignore
259 "status": status,
260 "priority": 0,
261 }
264def remove_emoji_variation_selector_16(emoji: str):
265 # this is required for compatibility with dino, and maybe other future clients?
266 return bytes(emoji, encoding="utf-8").replace(b"\xef\xb8\x8f", b"").decode()
269def deprecated(name: str, new: Callable):
270 # @functools.wraps
271 def wrapped(*args, **kwargs):
272 warnings.warn(
273 f"{name} is deprecated. Use {new.__name__} instead",
274 category=DeprecationWarning,
275 )
276 return new(*args, **kwargs)
278 return wrapped
281T = TypeVar("T", bound=NamedTuple)
284def dict_to_named_tuple(data: dict, cls: Type[T]) -> T:
285 return cls(*(data.get(f) for f in cls._fields)) # type:ignore
288def replace_mentions(
289 text: str,
290 mentions: Optional[list[Mention]],
291 mapping: Callable[[Mention], str],
292):
293 if not mentions:
294 return text
296 cursor = 0
297 pieces = []
298 for mention in mentions:
299 try:
300 new_text = mapping(mention)
301 except Exception as exc:
302 log.debug("Attempting slidge <= 0.3.3 compatibility: %s", exc)
303 new_text = mapping(mention.contact) # type:ignore
304 pieces.extend([text[cursor : mention.start], new_text])
305 cursor = mention.end
306 pieces.append(text[cursor:])
307 return "".join(pieces)
310def timeit(func):
311 @wraps(func)
312 async def wrapped(self, *args, **kwargs):
313 start = time()
314 r = await func(self, *args, **kwargs)
315 self.log.debug("%s took %s ms", func.__name__, round((time() - start) * 1000))
316 return r
318 return wrapped
321def strip_leading_emoji(text: str) -> str:
322 if not EMOJI_LIB_AVAILABLE:
323 return text
324 words = text.split(" ")
325 # is_emoji returns False for 🛷️ for obscure reasons,
326 # purely_emoji seems better
327 if len(words) > 1 and emoji.purely_emoji(words[0]):
328 return " ".join(words[1:])
329 return text
332async def noop_coro() -> None:
333 pass
336def add_quote_prefix(text: str):
337 """
338 Return multi-line text with leading quote marks (i.e. the ">" character).
339 """
340 return "\n".join(("> " + x).strip() for x in text.split("\n")).strip()