Coverage for slidge/util/lottie.py: 0%

69 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1""" 

2This module implements vector animated stickers in the lottie format to webp images. 

3 

4 

5""" 

6 

7import asyncio 

8import logging 

9import warnings 

10from pathlib import Path 

11 

12import aiohttp 

13 

14try: 

15 import rlottie_python # type: ignore 

16except ImportError: 

17 rlottie_python = None # type: ignore 

18 

19from ..core import config 

20from .types import LegacyAttachment 

21 

22 

23async def from_url( 

24 url: str, sticker_id: str, http: aiohttp.ClientSession 

25) -> LegacyAttachment: 

26 """ 

27 Get a webp attachment from a URL. 

28 

29 :param url: URL where the lottie sticker can be downloaded. 

30 :param sticker_id: A unique identifier for this sticker. 

31 :param http: The aiohttp.ClientSession used to download the sticker. 

32 

33 :return: A `LegacyAttachment` with the sticker in the webp format if 

34 `config.CONVERT_STICKERS == True`, in the original lottie format 

35 otherwise. 

36 """ 

37 if not config.CONVERT_STICKERS: 

38 return _attachment(sticker_id, url=url) 

39 lottie_path = sticker_path(sticker_id).with_suffix(".json") 

40 await _download(lottie_path, url, http) 

41 webp_path = sticker_path(sticker_id).with_suffix(".webp") 

42 return await convert(lottie_path, webp_path, sticker_id) 

43 

44 

45async def from_path(path: Path, sticker_id: str) -> LegacyAttachment: 

46 """ 

47 Get a webp attachment from a path. 

48 

49 :param path: path to the lottie sticker file. 

50 :param sticker_id: A unique identifier for this sticker. 

51 

52 :return: A `LegacyAttachment` with the sticker in the webp format if 

53 `config.CONVERT_STICKERS == True`, in the original lottie format 

54 otherwise. 

55 """ 

56 if not config.CONVERT_STICKERS: 

57 return _attachment(sticker_id, path=path) 

58 out_path = sticker_path(sticker_id).with_suffix(".webp") 

59 return await convert(path, out_path, sticker_id) 

60 

61 

62def sticker_path(sticker_id="") -> Path: 

63 """ 

64 Get the path where a sticker is meant be stored 

65 

66 :param sticker_id: A unique ID for this sticker 

67 :return: A `path`. It has no suffix, so you might want to use `Path.with_suffix()` 

68 before writing into it. 

69 """ 

70 root = config.HOME_DIR / "lottie" 

71 root.mkdir(exist_ok=True) 

72 return root / sticker_id 

73 

74 

75def _attachment( 

76 sticker_id: str, path: Path | None = None, url: str | None = None 

77) -> LegacyAttachment: 

78 if path is not None and path.suffix == ".webp": 

79 content_type = "image/webp" 

80 else: 

81 content_type = None 

82 return LegacyAttachment( 

83 url=url, 

84 path=path, 

85 legacy_file_id="lottie-" + sticker_id, 

86 disposition="inline", 

87 content_type=content_type, 

88 ) 

89 

90 

91async def _download(path: Path, url: str, http: aiohttp.ClientSession) -> None: 

92 async with _sticker_download_lock: 

93 if path.exists(): 

94 return 

95 with path.open("wb") as fp: 

96 try: 

97 resp = await http.get(url, chunked=True) 

98 async for chunk in resp.content: 

99 fp.write(chunk) 

100 except ValueError as e: 

101 if e.args[0] != "Chunk too big": 

102 raise 

103 # not sure why this happens but it does sometimes 

104 resp = await http.get(url) 

105 fp.write(await resp.content.read()) 

106 

107 

108async def convert( 

109 input_path: Path, 

110 output_path: Path, 

111 sticker_id: str, 

112 width: int = 256, 

113 height: int = 256, 

114) -> LegacyAttachment: 

115 async with _sticker_conversion_lock: 

116 if not output_path.exists(): 

117 if rlottie_python is None: 

118 warnings.warn( 

119 "Cannot convert stickers, rlottie-python is not available." 

120 ) 

121 return _attachment(sticker_id, path=input_path) 

122 

123 log.debug("Converting sticker %s to video", output_path.stem) 

124 if input_path.suffix == ".json": 

125 animation = rlottie_python.LottieAnimation.from_file(str(input_path)) 

126 else: 

127 animation = rlottie_python.LottieAnimation.from_tgs(str(input_path)) 

128 animation.save_animation(str(output_path), width=width, height=height) 

129 

130 return _attachment(sticker_id, path=output_path) 

131 

132 

133async def _main(): 

134 # small entrypoint to easily test that it works 

135 import sys 

136 

137 source, destination = sys.argv[1:] 

138 config.CONVERT_STICKERS = True 

139 await convert(Path(source), Path(destination), "") 

140 

141 

142_sticker_conversion_lock: asyncio.Lock = asyncio.Lock() 

143_sticker_download_lock: asyncio.Lock = asyncio.Lock() 

144 

145 

146log = logging.getLogger(__name__) 

147 

148 

149__all__ = ("from_url",) 

150 

151if __name__ == "__main__": 

152 asyncio.run(_main())