Coverage for slidge/util/util.py: 77%
163 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-04 08:17 +0000
1import logging
2import mimetypes
3import re
4import subprocess
5import warnings
6from abc import ABCMeta
7from functools import wraps
8from pathlib import Path
9from time import time
10from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, Type, TypeVar
12try:
13 import emoji
14except ImportError:
15 EMOJI_LIB_AVAILABLE = False
16else:
17 EMOJI_LIB_AVAILABLE = True
19from .types import Mention, ResourceDict
21if TYPE_CHECKING:
22 from ..contact.contact import LegacyContact
24try:
25 import magic
26except ImportError as e:
27 magic = None # type:ignore
28 logging.warning(
29 (
30 "Libmagic is not available: %s. "
31 "It's OK if you don't use fix-filename-suffix-mime-type."
32 ),
33 e,
34 )
37def fix_suffix(path: Path, mime_type: Optional[str], file_name: Optional[str]) -> Path:
38 guessed = magic.from_file(path, mime=True)
39 if guessed == mime_type:
40 log.debug("Magic and given MIME match")
41 else:
42 log.debug("Magic (%s) and given MIME (%s) differ", guessed, mime_type)
43 mime_type = guessed
45 valid_suffix_list = mimetypes.guess_all_extensions(mime_type, strict=False)
47 if file_name:
48 name = Path(file_name)
49 else:
50 name = Path(path.name)
52 suffix = name.suffix
54 if suffix in valid_suffix_list:
55 log.debug("Suffix %s is in %s", suffix, valid_suffix_list)
56 return name
58 valid_suffix = mimetypes.guess_extension(mime_type.split(";")[0], strict=False)
59 if valid_suffix is None:
60 log.debug("No valid suffix found")
61 return name
63 log.debug("Changing suffix of %s to %s", file_name or path.name, valid_suffix)
64 return name.with_suffix(valid_suffix)
67class SubclassableOnce(type):
68 # To allow importing everything, including plugins, during tests
69 TEST_MODE: bool = False
71 def __init__(
72 cls,
73 name: str,
74 bases: tuple[Type[Any], ...],
75 dct: dict[str, Any],
76 ) -> None:
77 for b in bases:
78 if type(b) in (SubclassableOnce, ABCSubclassableOnceAtMost):
79 if hasattr(b, "_subclass") and not cls.TEST_MODE:
80 raise RuntimeError(
81 "This class must be subclassed once at most!",
82 cls,
83 name,
84 bases,
85 dct,
86 )
87 else:
88 log.debug("Setting %s as subclass for %s", cls, b)
89 b._subclass = cls
91 super().__init__(name, bases, dct)
93 def get_self_or_unique_subclass(cls) -> "SubclassableOnce":
94 try:
95 return cls.get_unique_subclass()
96 except AttributeError:
97 return cls
99 def get_unique_subclass(cls) -> "SubclassableOnce":
100 r = getattr(cls, "_subclass", None)
101 if r is None:
102 raise AttributeError("Could not find any subclass", cls)
103 return r
105 def reset_subclass(cls) -> None:
106 try:
107 log.debug("Resetting subclass of %s", cls)
108 delattr(cls, "_subclass")
109 except AttributeError:
110 log.debug("No subclass were registered for %s", cls)
113class ABCSubclassableOnceAtMost(ABCMeta, SubclassableOnce):
114 pass
117def is_valid_phone_number(phone: Optional[str]):
118 if phone is None:
119 return False
120 match = re.match(r"\+\d.*", phone)
121 if match is None:
122 return False
123 return match[0] == phone
126def strip_illegal_chars(s: str):
127 return ILLEGAL_XML_CHARS_RE.sub("", s)
130# from https://stackoverflow.com/a/64570125/5902284 and Link Mauve
131ILLEGAL = [
132 (0x00, 0x08),
133 (0x0B, 0x0C),
134 (0x0E, 0x1F),
135 (0x7F, 0x84),
136 (0x86, 0x9F),
137 (0xFDD0, 0xFDDF),
138 (0xFFFE, 0xFFFF),
139 (0x1FFFE, 0x1FFFF),
140 (0x2FFFE, 0x2FFFF),
141 (0x3FFFE, 0x3FFFF),
142 (0x4FFFE, 0x4FFFF),
143 (0x5FFFE, 0x5FFFF),
144 (0x6FFFE, 0x6FFFF),
145 (0x7FFFE, 0x7FFFF),
146 (0x8FFFE, 0x8FFFF),
147 (0x9FFFE, 0x9FFFF),
148 (0xAFFFE, 0xAFFFF),
149 (0xBFFFE, 0xBFFFF),
150 (0xCFFFE, 0xCFFFF),
151 (0xDFFFE, 0xDFFFF),
152 (0xEFFFE, 0xEFFFF),
153 (0xFFFFE, 0xFFFFF),
154 (0x10FFFE, 0x10FFFF),
155]
157ILLEGAL_RANGES = [rf"{chr(low)}-{chr(high)}" for (low, high) in ILLEGAL]
158XML_ILLEGAL_CHARACTER_REGEX = "[" + "".join(ILLEGAL_RANGES) + "]"
159ILLEGAL_XML_CHARS_RE = re.compile(XML_ILLEGAL_CHARACTER_REGEX)
162# from https://stackoverflow.com/a/35804945/5902284
163def addLoggingLevel(
164 levelName: str = "TRACE", levelNum: int = logging.DEBUG - 5, methodName=None
165) -> None:
166 """
167 Comprehensively adds a new logging level to the `logging` module and the
168 currently configured logging class.
170 `levelName` becomes an attribute of the `logging` module with the value
171 `levelNum`. `methodName` becomes a convenience method for both `logging`
172 itself and the class returned by `logging.getLoggerClass()` (usually just
173 `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is
174 used.
176 To avoid accidental clobberings of existing attributes, this method will
177 raise an `AttributeError` if the level name is already an attribute of the
178 `logging` module or if the method name is already present
180 Example
181 -------
182 >>> addLoggingLevel('TRACE', logging.DEBUG - 5)
183 >>> logging.getLogger(__name__).setLevel("TRACE")
184 >>> logging.getLogger(__name__).trace('that worked')
185 >>> logging.trace('so did this')
186 >>> logging.TRACE
187 5
189 """
190 if not methodName:
191 methodName = levelName.lower()
193 if hasattr(logging, levelName):
194 log.debug("{} already defined in logging module".format(levelName))
195 return
196 if hasattr(logging, methodName):
197 log.debug("{} already defined in logging module".format(methodName))
198 return
199 if hasattr(logging.getLoggerClass(), methodName):
200 log.debug("{} already defined in logger class".format(methodName))
201 return
203 # This method was inspired by the answers to Stack Overflow post
204 # http://stackoverflow.com/q/2183233/2988730, especially
205 # http://stackoverflow.com/a/13638084/2988730
206 def logForLevel(self, message, *args, **kwargs) -> None:
207 if self.isEnabledFor(levelNum):
208 self._log(levelNum, message, args, **kwargs)
210 def logToRoot(message, *args, **kwargs) -> None:
211 logging.log(levelNum, message, *args, **kwargs)
213 logging.addLevelName(levelNum, levelName)
214 setattr(logging, levelName, levelNum)
215 setattr(logging.getLoggerClass(), methodName, logForLevel)
216 setattr(logging, methodName, logToRoot)
219class SlidgeLogger(logging.Logger):
220 def trace(self) -> None:
221 pass
224log = logging.getLogger(__name__)
227def merge_resources(resources: dict[str, ResourceDict]) -> Optional[ResourceDict]:
228 if len(resources) == 0:
229 return None
231 if len(resources) == 1:
232 return next(iter(resources.values()))
234 by_priority = sorted(resources.values(), key=lambda r: r["priority"], reverse=True)
236 if any(r["show"] == "" for r in resources.values()):
237 # if a client is "available", we're "available"
238 show = ""
239 else:
240 for r in by_priority:
241 if r["show"]:
242 show = r["show"]
243 break
244 else:
245 raise RuntimeError()
247 # if there are different statuses, we use the highest priority one,
248 # but we ignore resources without status, even with high priority
249 status = ""
250 for r in by_priority:
251 if r["status"]:
252 status = r["status"]
253 break
255 return {
256 "show": show, # type:ignore
257 "status": status,
258 "priority": 0,
259 }
262def remove_emoji_variation_selector_16(emoji: str):
263 # this is required for compatibility with dino, and maybe other future clients?
264 return bytes(emoji, encoding="utf-8").replace(b"\xef\xb8\x8f", b"").decode()
267def deprecated(name: str, new: Callable):
268 # @functools.wraps
269 def wrapped(*args, **kwargs):
270 warnings.warn(
271 f"{name} is deprecated. Use {new.__name__} instead",
272 category=DeprecationWarning,
273 )
274 return new(*args, **kwargs)
276 return wrapped
279T = TypeVar("T", bound=NamedTuple)
282def dict_to_named_tuple(data: dict, cls: Type[T]) -> T:
283 return cls(*(data.get(f) for f in cls._fields)) # type:ignore
286def replace_mentions(
287 text: str,
288 mentions: Optional[list[Mention]],
289 mapping: Callable[["LegacyContact"], str],
290):
291 if not mentions:
292 return text
294 cursor = 0
295 pieces = []
296 for mention in mentions:
297 pieces.extend([text[cursor : mention.start], mapping(mention.contact)])
298 cursor = mention.end
299 pieces.append(text[cursor:])
300 return "".join(pieces)
303def timeit(func):
304 @wraps(func)
305 async def wrapped(self, *args, **kwargs):
306 start = time()
307 r = await func(self, *args, **kwargs)
308 self.log.debug("%s took %s ms", func.__name__, round((time() - start) * 1000))
309 return r
311 return wrapped
314def strip_leading_emoji(text: str) -> str:
315 if not EMOJI_LIB_AVAILABLE:
316 return text
317 words = text.split(" ")
318 # is_emoji returns False for 🛷️ for obscure reasons,
319 # purely_emoji seems better
320 if len(words) > 1 and emoji.purely_emoji(words[0]):
321 return " ".join(words[1:])
322 return text
325async def noop_coro() -> None:
326 pass