Coverage for slidge / core / mixins / attachment.py: 85%

362 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import base64 

2import functools 

3import logging 

4import os 

5import shutil 

6import stat 

7import tempfile 

8from collections.abc import Collection, Sequence 

9from datetime import datetime 

10from itertools import chain 

11from mimetypes import guess_extension, guess_type 

12from pathlib import Path 

13from typing import Any 

14from urllib.parse import quote as urlquote 

15from uuid import uuid4 

16from xml.etree import ElementTree as ET 

17 

18import aiohttp 

19import thumbhash 

20from PIL import Image, ImageOps 

21from slixmpp import JID, Iq, Message 

22from slixmpp.plugins.xep_0264.stanza import Thumbnail 

23from slixmpp.plugins.xep_0447.stanza import StatelessFileSharing 

24 

25from ...db.avatar import avatar_cache 

26from ...db.models import Attachment 

27from ...util.types import ( 

28 AnyMessageReference, 

29 LegacyAttachment, 

30 LegacyMessageType, 

31 LegacyThreadType, 

32) 

33from ...util.util import fix_suffix 

34from .. import config 

35from .message_text import TextMessageMixin 

36 

37 

38class AttachmentMixin(TextMessageMixin): 

39 PRIVILEGED_UPLOAD = False 

40 

41 @property 

42 def __is_component(self) -> bool: 

43 return self.session is NotImplemented 

44 

45 async def __upload( 

46 self, 

47 file_path: Path, 

48 file_name: str | None = None, 

49 content_type: str | None = None, 

50 ) -> str: 

51 assert config.UPLOAD_SERVICE 

52 

53 file_size = file_path.stat().st_size 

54 

55 content_type = content_type or self.xmpp.plugin["xep_0363"].default_content_type 

56 iq_slot = await self.__request_upload_slot( 

57 config.UPLOAD_SERVICE, 

58 file_name or file_path.name, 

59 file_size, 

60 content_type, 

61 ) 

62 slot = iq_slot["http_upload_slot"] 

63 headers = { 

64 "Content-Length": str(file_size), 

65 "Content-Type": content_type, 

66 **{header["name"]: header["value"] for header in slot["put"]["headers"]}, 

67 } 

68 

69 async with aiohttp.ClientSession() as http: 

70 with file_path.open("rb") as fp: 

71 async with http.put( 

72 slot["put"]["url"], data=fp, headers=headers 

73 ) as response: 

74 response.raise_for_status() 

75 return slot["get"]["url"] # type:ignore[no-any-return] 

76 

77 async def __request_upload_slot( 

78 self, 

79 upload_service: JID | str, 

80 filename: str, 

81 size: int, 

82 content_type: str, 

83 ) -> Iq: 

84 iq_request = self.xmpp.make_iq_get( 

85 ito=upload_service, ifrom=config.UPLOAD_REQUESTER or self.xmpp.boundjid 

86 ) 

87 request = iq_request["http_upload_request"] 

88 request["filename"] = filename 

89 request["size"] = str(size) 

90 request["content-type"] = content_type 

91 if self.__is_component or not self.PRIVILEGED_UPLOAD: 

92 return await iq_request.send() # type:ignore[no-any-return] 

93 

94 assert self.session is not NotImplemented 

95 iq_request.set_from(self.session.user_jid) 

96 return await self.xmpp["xep_0356"].send_privileged_iq(iq_request) # type:ignore[no-any-return] 

97 

98 @staticmethod 

99 async def __no_upload( 

100 file_path: Path, 

101 file_name: str | None = None, 

102 legacy_file_id: str | int | None = None, 

103 ) -> tuple[Path, str]: 

104 file_id = str(uuid4()) if legacy_file_id is None else str(legacy_file_id) 

105 assert config.NO_UPLOAD_PATH is not None 

106 assert config.NO_UPLOAD_URL_PREFIX is not None 

107 destination_dir = Path(config.NO_UPLOAD_PATH) / file_id 

108 

109 if destination_dir.exists(): 

110 log.debug("Dest dir exists: %s", destination_dir) 

111 files = list(f for f in destination_dir.glob("**/*") if f.is_file()) 

112 if len(files) == 1: 

113 log.debug( 

114 "Found the legacy attachment '%s' at '%s'", 

115 legacy_file_id, 

116 files[0], 

117 ) 

118 name = files[0].name 

119 uu = files[0].parent.name # anti-obvious url trick, see below 

120 return files[0], "/".join([file_id, uu, name]) 

121 else: 

122 log.warning( 

123 ( 

124 "There are several or zero files in %s, " 

125 "slidge doesn't know which one to pick among %s. " 

126 "Removing the dir." 

127 ), 

128 destination_dir, 

129 files, 

130 ) 

131 shutil.rmtree(destination_dir) 

132 

133 log.debug("Did not find a file in: %s", destination_dir) 

134 # let's use a UUID to avoid URLs being too obvious 

135 uu = str(uuid4()) 

136 destination_dir = destination_dir / uu 

137 destination_dir.mkdir(parents=True) 

138 

139 name = file_name or file_path.name 

140 destination = destination_dir / name 

141 method = config.NO_UPLOAD_METHOD 

142 if method == "copy": 

143 shutil.copy2(file_path, destination) 

144 elif method == "hardlink": 

145 os.link(file_path, destination) 

146 elif method == "symlink": 

147 os.symlink(file_path, destination, target_is_directory=True) 

148 elif method == "move": 

149 shutil.move(file_path, destination) 

150 else: 

151 raise RuntimeError("No upload method not recognized", method) 

152 

153 if config.NO_UPLOAD_FILE_READ_OTHERS: 

154 log.debug("Changing perms of %s", destination) 

155 destination.chmod(destination.stat().st_mode | stat.S_IROTH) 

156 uploaded_url = "/".join([file_id, uu, name]) 

157 

158 return destination, uploaded_url 

159 

160 async def __valid_url(self, url: str) -> bool: 

161 async with self.session.http.head(url) as r: 

162 return r.status < 400 

163 

164 async def __get_stored(self, attachment: LegacyAttachment) -> Attachment: 

165 if attachment.legacy_file_id is not None and not self.__is_component: 

166 with self.xmpp.store.session() as orm: 

167 stored = ( 

168 orm.query(Attachment) 

169 .filter_by( 

170 legacy_file_id=str(attachment.legacy_file_id), 

171 user_account_id=self.session.user_pk, 

172 ) 

173 .one_or_none() 

174 ) 

175 if stored is not None: 

176 if not await self.__valid_url(stored.url): 

177 stored.url = None # type:ignore 

178 return stored 

179 return Attachment( 

180 user_account_id=None if self.__is_component else self.session.user_pk, 

181 legacy_file_id=None 

182 if attachment.legacy_file_id is None 

183 else str(attachment.legacy_file_id), 

184 url=attachment.url if config.USE_ATTACHMENT_ORIGINAL_URLS else None, 

185 ) 

186 

187 async def __get_url( 

188 self, attachment: LegacyAttachment, stored: Attachment 

189 ) -> tuple[bool, Path | None, str]: 

190 file_name = attachment.name 

191 content_type = attachment.content_type 

192 file_path = attachment.path 

193 

194 if file_name and len(file_name) > config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH: 

195 log.debug("Trimming long filename: %s", file_name) 

196 base, ext = os.path.splitext(file_name) 

197 file_name = ( 

198 base[: config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH - len(ext)] + ext 

199 ) 

200 

201 if file_path is None: 

202 if file_name is None: 

203 file_name = str(uuid4()) 

204 if content_type is not None: 

205 ext = guess_extension(content_type, strict=False) # type:ignore 

206 if ext is not None: 

207 file_name += ext 

208 temp_dir = Path(tempfile.mkdtemp()) 

209 file_path = temp_dir / file_name 

210 if attachment.url: 

211 async with self.session.http.get(attachment.url) as r: 

212 r.raise_for_status() 

213 with file_path.open("wb") as f: 

214 f.write(await r.read()) 

215 

216 elif attachment.stream is not None: 

217 data = attachment.stream.read() 

218 if data is None: 

219 raise RuntimeError 

220 

221 with file_path.open("wb") as f: 

222 f.write(data) 

223 elif attachment.aio_stream is not None: 

224 # TODO: patch slixmpp to allow this as data source for 

225 # upload_file() so we don't even have to write anything 

226 # to disk. 

227 with file_path.open("wb") as f: 

228 async for chunk in attachment.aio_stream: 

229 f.write(chunk) 

230 elif attachment.data is not None: 

231 with file_path.open("wb") as f: 

232 f.write(attachment.data) 

233 

234 is_temp = not bool(config.NO_UPLOAD_PATH) 

235 else: 

236 is_temp = False 

237 

238 assert isinstance(file_path, Path) 

239 if config.FIX_FILENAME_SUFFIX_MIME_TYPE: 

240 file_name, content_type = fix_suffix(file_path, content_type, file_name) 

241 attachment.content_type = content_type 

242 attachment.name = file_name 

243 

244 if config.NO_UPLOAD_PATH: 

245 local_path, new_url = await self.__no_upload( 

246 file_path, file_name, stored.legacy_file_id 

247 ) 

248 new_url = (config.NO_UPLOAD_URL_PREFIX or "") + "/" + urlquote(new_url) 

249 else: 

250 local_path = file_path 

251 new_url = await self.__upload(file_path, file_name, content_type) 

252 if stored.legacy_file_id and new_url is not None: 

253 stored.url = new_url 

254 

255 if local_path is not None and local_path.stat().st_size == 0: 

256 raise RuntimeError("File size is 0") 

257 

258 return is_temp, local_path, new_url 

259 

260 async def __set_sims( 

261 self, 

262 msg: Message, 

263 uploaded_url: str, 

264 path: Path | None, 

265 attachment: LegacyAttachment, 

266 stored: Attachment, 

267 ) -> Thumbnail | None: 

268 if stored.sims is not None: 

269 ref = self.xmpp["xep_0372"].stanza.Reference(xml=ET.fromstring(stored.sims)) 

270 msg.append(ref) 

271 if ref["sims"]["file"].get_plugin("thumbnail", check=True): 

272 return ref["sims"]["file"]["thumbnail"] # type:ignore[no-any-return] 

273 else: 

274 return None 

275 

276 if not path: 

277 return None 

278 

279 ref = self.xmpp["xep_0385"].get_sims( 

280 path, [uploaded_url], attachment.content_type, attachment.caption 

281 ) 

282 if attachment.name: 

283 ref["sims"]["file"]["name"] = attachment.name 

284 thumbnail = None 

285 if attachment.content_type is not None and attachment.content_type.startswith( 

286 "image" 

287 ): 

288 try: 

289 h, x, y = await self.xmpp.loop.run_in_executor( 

290 avatar_cache._thread_pool, get_thumbhash, path 

291 ) 

292 except Exception as e: 

293 log.debug("Could not generate a thumbhash", exc_info=e) 

294 else: 

295 thumbnail = ref["sims"]["file"]["thumbnail"] 

296 thumbnail["width"] = x 

297 thumbnail["height"] = y 

298 thumbnail["media-type"] = "image/thumbhash" 

299 thumbnail["uri"] = "data:image/thumbhash;base64," + urlquote(h) 

300 

301 stored.sims = str(ref) 

302 msg.append(ref) 

303 

304 return thumbnail 

305 

306 def __set_sfs( 

307 self, 

308 msg: Message, 

309 uploaded_url: str, 

310 path: Path | None, 

311 attachment: LegacyAttachment, 

312 stored: Attachment, 

313 thumbnail: Thumbnail | None = None, 

314 ) -> None: 

315 if stored.sfs is not None: 

316 msg.append(StatelessFileSharing(xml=ET.fromstring(stored.sfs))) 

317 return 

318 

319 if not path: 

320 return 

321 

322 sfs = self.xmpp["xep_0447"].get_sfs( 

323 path, [uploaded_url], attachment.content_type, attachment.caption 

324 ) 

325 if attachment.name: 

326 sfs["file"]["name"] = attachment.name 

327 if attachment.disposition: 

328 sfs["disposition"] = attachment.disposition 

329 else: 

330 del sfs["disposition"] 

331 if thumbnail is not None: 

332 sfs["file"].append(thumbnail) 

333 stored.sfs = str(sfs) 

334 msg.append(sfs) 

335 

336 async def __set_sfs_and_sims_without_download( 

337 self, msg: Message, attachment: LegacyAttachment 

338 ) -> None: 

339 assert attachment.url is not None 

340 

341 if not any( 

342 ( 

343 attachment.content_type, 

344 attachment.name, 

345 attachment.disposition, 

346 ) 

347 ): 

348 return 

349 

350 sims = self.xmpp.plugin["xep_0385"].stanza.Sims() 

351 ref = self.xmpp["xep_0372"].stanza.Reference() 

352 

353 ref["uri"] = attachment.url 

354 ref["type"] = "data" 

355 sims["sources"].append(ref) 

356 sims.enable("file") 

357 

358 xep_0447_stanza = self.xmpp.plugin["xep_0447"].stanza 

359 sfs = xep_0447_stanza.StatelessFileSharing() 

360 url_data = xep_0447_stanza.UrlData() 

361 url_data["target"] = attachment.url 

362 sfs["sources"].append(url_data) 

363 sfs.enable("file") 

364 

365 if attachment.content_type: 

366 sims["file"]["media-type"] = attachment.content_type 

367 sfs["file"]["media-type"] = attachment.content_type 

368 if attachment.caption: 

369 sims["file"]["desc"] = attachment.caption 

370 sfs["file"]["desc"] = attachment.caption 

371 if attachment.name: 

372 sims["file"]["name"] = attachment.name 

373 sfs["file"]["name"] = attachment.name 

374 if attachment.disposition: 

375 sfs["disposition"] = attachment.disposition 

376 

377 msg.append(sims) 

378 msg.append(sfs) 

379 

380 def __send_url( 

381 self, 

382 msg: Message, 

383 legacy_msg_id: LegacyMessageType, 

384 uploaded_url: str, 

385 caption: str | None = None, 

386 carbon: bool = False, 

387 when: datetime | None = None, 

388 correction: bool = False, 

389 **kwargs: Any, # noqa:ANN401 

390 ) -> list[Message]: 

391 msg["oob"]["url"] = uploaded_url 

392 msg["body"] = uploaded_url 

393 if msg.get_plugin("sfs", check=True): 

394 msg["fallback"].enable("body") 

395 msg["fallback"]["for"] = self.xmpp.plugin["xep_0447"].stanza.NAMESPACE 

396 if caption: 

397 if correction: 

398 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

399 else: 

400 self._set_msg_id(msg, legacy_msg_id) 

401 m1 = self._send(msg, carbon=carbon, correction=correction, **kwargs) 

402 m2 = self.send_text( 

403 caption, legacy_msg_id=None, when=when, carbon=carbon, **kwargs 

404 ) 

405 return [m1, m2] if m2 else [m1] 

406 else: 

407 if correction: 

408 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

409 else: 

410 self._set_msg_id(msg, legacy_msg_id) 

411 return [self._send(msg, carbon=carbon, **kwargs)] 

412 

413 def __get_base_message( 

414 self, 

415 legacy_msg_id: LegacyMessageType | None = None, 

416 reply_to: AnyMessageReference | None = None, 

417 when: datetime | None = None, 

418 thread: LegacyThreadType | None = None, 

419 carbon: bool = False, 

420 correction: bool = False, 

421 mto: JID | None = None, 

422 ) -> Message: 

423 if correction: 

424 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

425 if xmpp_ids: 

426 original_xmpp_id = xmpp_ids[0] 

427 for xmpp_id in xmpp_ids: 

428 if xmpp_id == original_xmpp_id: 

429 continue 

430 self.retract(xmpp_id, thread) 

431 

432 if reply_to is not None and reply_to.body: 

433 # We cannot have a "quote fallback" for attachments since most (all?) 

434 # XMPP clients will only treat a message as an attachment if the 

435 # body is the URL and nothing else. 

436 reply_to_for_attachment: AnyMessageReference | None = AnyMessageReference( 

437 reply_to.legacy_id, reply_to.author 

438 ) 

439 else: 

440 reply_to_for_attachment = reply_to 

441 

442 return self._make_message( 

443 when=when, 

444 reply_to=reply_to_for_attachment, 

445 carbon=carbon, 

446 mto=mto, 

447 thread=thread, 

448 ) 

449 

450 async def send_file( 

451 self, 

452 attachment: LegacyAttachment | Path | str, 

453 legacy_msg_id: LegacyMessageType | None = None, 

454 *, 

455 reply_to: AnyMessageReference | None = None, 

456 when: datetime | None = None, 

457 thread: LegacyThreadType | None = None, 

458 **kwargs: Any, # noqa:ANN401 

459 ) -> tuple[str | None, list[Message]]: 

460 """ 

461 Send a single file from this :term:`XMPP Entity`. 

462 

463 :param attachment: The file to send. 

464 Ideally, a :class:`.LegacyAttachment` with a unique ``legacy_file_id`` 

465 attribute set, to optimise potential future reuses. 

466 It can also be: 

467 - a :class:`pathlib.Path` instance to point to a local file, or 

468 - a ``str``, representing a fetchable HTTP URL. 

469 :param legacy_msg_id: If you want to be able to transport read markers from the gateway 

470 user to the legacy network, specify this 

471 :param reply_to: Quote another message (:xep:`0461`) 

472 :param when: when the file was sent, for a "delay" tag (:xep:`0203`) 

473 :param thread: 

474 """ 

475 coro = self.__send_file( 

476 attachment, 

477 legacy_msg_id, 

478 reply_to=reply_to, 

479 when=when, 

480 thread=thread, 

481 **kwargs, 

482 ) 

483 if self.__is_component: 

484 return await coro 

485 elif not isinstance(attachment, LegacyAttachment): 

486 return await coro 

487 elif attachment.legacy_file_id is None: 

488 return await coro 

489 else: 

490 # prevents race conditions where we download the same thing several time 

491 # and end up attempting to insert it twice in the DB, raising an 

492 # IntegrityError. 

493 async with self.session.lock(("attachment", attachment.legacy_file_id)): 

494 return await coro 

495 

496 async def __send_file( 

497 self, 

498 attachment: LegacyAttachment | Path | str, 

499 legacy_msg_id: LegacyMessageType | None = None, 

500 *, 

501 reply_to: AnyMessageReference | None = None, 

502 when: datetime | None = None, 

503 thread: LegacyThreadType | None = None, 

504 **kwargs: Any, # noqa:ANN401 

505 ) -> tuple[str | None, list[Message]]: 

506 store_multi = kwargs.pop("store_multi", True) 

507 carbon = kwargs.pop("carbon", False) 

508 mto = kwargs.pop("mto", None) 

509 correction = kwargs.get("correction", False) 

510 

511 msg = self.__get_base_message( 

512 legacy_msg_id, reply_to, when, thread, carbon, correction, mto 

513 ) 

514 

515 if isinstance(attachment, str): 

516 attachment = LegacyAttachment(url=attachment) 

517 elif isinstance(attachment, Path): 

518 attachment = LegacyAttachment(path=attachment) 

519 

520 stored = await self.__get_stored(attachment) 

521 

522 if attachment.content_type is None and ( 

523 name := (attachment.name or attachment.url or attachment.path) 

524 ): 

525 attachment.content_type, _ = guess_type(name) 

526 

527 if stored.url: 

528 is_temp = False 

529 local_path = None 

530 new_url = stored.url 

531 else: 

532 try: 

533 is_temp, local_path, new_url = await self.__get_url(attachment, stored) 

534 except Exception as e: 

535 log.error("Error with attachment: %s: %s", attachment, e) 

536 log.debug("", exc_info=e) 

537 msg["body"] = ( 

538 f"/me tried to send a file ({attachment.format_for_user()}), " 

539 f"but something went wrong: {e}. " 

540 ) 

541 self._set_msg_id(msg, legacy_msg_id) 

542 return None, [self._send(msg, **kwargs)] 

543 assert new_url is not None 

544 

545 stored.url = new_url 

546 if config.USE_ATTACHMENT_ORIGINAL_URLS and attachment.url: 

547 await self.__set_sfs_and_sims_without_download(msg, attachment) 

548 else: 

549 thumbnail = await self.__set_sims( 

550 msg, new_url, local_path, attachment, stored 

551 ) 

552 self.__set_sfs(msg, new_url, local_path, attachment, stored, thumbnail) 

553 

554 if not self.__is_component: 

555 with self.xmpp.store.session(expire_on_commit=False) as orm: 

556 orm.add(stored) 

557 orm.commit() 

558 

559 if is_temp and isinstance(local_path, Path): 

560 local_path.unlink() 

561 local_path.parent.rmdir() 

562 

563 msgs = self.__send_url( 

564 msg, legacy_msg_id, new_url, attachment.caption, carbon, when, **kwargs 

565 ) 

566 if not self.__is_component: 

567 if store_multi: 

568 self.__store_multi(legacy_msg_id, msgs) 

569 return new_url, msgs 

570 

571 def __send_body( 

572 self, 

573 body: str | None = None, 

574 legacy_msg_id: LegacyMessageType | None = None, 

575 reply_to: AnyMessageReference | None = None, 

576 when: datetime | None = None, 

577 thread: LegacyThreadType | None = None, 

578 **kwargs: Any, # noqa:ANN401 

579 ) -> Message | None: 

580 if body: 

581 return self.send_text( 

582 body, 

583 legacy_msg_id, 

584 reply_to=reply_to, 

585 when=when, 

586 thread=thread, 

587 **kwargs, 

588 ) 

589 else: 

590 return None 

591 

592 async def send_files( 

593 self, 

594 attachments: Collection[LegacyAttachment], 

595 legacy_msg_id: LegacyMessageType | None = None, 

596 body: str | None = None, 

597 *, 

598 reply_to: AnyMessageReference | None = None, 

599 when: datetime | None = None, 

600 thread: LegacyThreadType | None = None, 

601 body_first: bool = False, 

602 correction: bool = False, 

603 correction_event_id: LegacyMessageType | None = None, 

604 **kwargs: Any, # noqa:ANN401 

605 ) -> None: 

606 # TODO: once the epic XEP-0385 vs XEP-0447 battle is over, pick 

607 # one and stop sending several attachments this way 

608 # we attach the legacy_message ID to the last message we send, because 

609 # we don't want several messages with the same ID (especially for MUC MAM) 

610 if not attachments and not body: 

611 # ignoring empty message 

612 return 

613 body_msg_id = ( 

614 legacy_msg_id if body_needs_msg_id(attachments, body, body_first) else None 

615 ) 

616 send_body = functools.partial( 

617 self.__send_body, 

618 body=body, 

619 reply_to=reply_to, 

620 when=when, 

621 thread=thread, 

622 correction=correction, 

623 legacy_msg_id=body_msg_id, 

624 correction_event_id=correction_event_id, 

625 **kwargs, 

626 ) 

627 all_msgs = [] 

628 if body_first: 

629 all_msgs.append(send_body()) 

630 for i, attachment in enumerate(attachments): 

631 if i == 0 and body_msg_id is None: 

632 legacy = legacy_msg_id 

633 else: 

634 legacy = None 

635 _url, msgs = await self.send_file( 

636 attachment, 

637 legacy, 

638 reply_to=reply_to, 

639 when=when, 

640 thread=thread, 

641 store_multi=False, 

642 **kwargs, 

643 ) 

644 all_msgs.extend(msgs) 

645 if not body_first: 

646 all_msgs.append(send_body()) 

647 self.__store_multi(legacy_msg_id, all_msgs) 

648 

649 def __store_multi( 

650 self, 

651 legacy_msg_id: LegacyMessageType | None, 

652 all_msgs: Sequence[Message | None], 

653 ) -> None: 

654 if legacy_msg_id is None: 

655 return 

656 ids = [] 

657 for msg in all_msgs: 

658 if not msg: 

659 continue 

660 if stanza_id := msg.get_plugin("stanza_id", check=True): 

661 ids.append(stanza_id["id"]) 

662 else: 

663 ids.append(msg.get_id()) 

664 with self.xmpp.store.session() as orm: 

665 self.xmpp.store.id_map.set_msg( 

666 orm, self._recipient_pk(), str(legacy_msg_id), ids, self.is_participant 

667 ) 

668 orm.commit() 

669 

670 

671def body_needs_msg_id( 

672 attachments: Collection[LegacyAttachment], body: str | None, body_first: bool 

673) -> bool: 

674 if attachments: 

675 return bool(body and body_first) 

676 else: 

677 return True 

678 

679 

680def get_thumbhash(path: Path) -> tuple[str, int, int]: 

681 with path.open("rb") as fp: 

682 img = Image.open(fp) 

683 width, height = img.size 

684 img = img.convert("RGBA") 

685 if width > 100 or height > 100: 

686 img.thumbnail((100, 100)) 

687 img = ImageOps.exif_transpose(img) 

688 rgba_2d = list(img.getdata()) 

689 rgba = list(chain(*rgba_2d)) 

690 ints = thumbhash.rgba_to_thumb_hash(img.width, img.height, rgba) 

691 return base64.b64encode(bytes(ints)).decode(), width, height 

692 

693 

694log = logging.getLogger(__name__)