Coverage for slidge/core/mixins/attachment.py: 85%

363 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-26 19:34 +0000

1import base64 

2import functools 

3import logging 

4import os 

5import re 

6import shutil 

7import stat 

8import tempfile 

9import warnings 

10from datetime import datetime 

11from itertools import chain 

12from mimetypes import guess_extension, guess_type 

13from pathlib import Path 

14from typing import Collection, Optional, Sequence, Union 

15from urllib.parse import quote as urlquote 

16from uuid import uuid4 

17from xml.etree import ElementTree as ET 

18 

19import aiohttp 

20import thumbhash 

21from PIL import Image, ImageOps 

22from slixmpp import JID, Iq, Message 

23from slixmpp.plugins.xep_0264.stanza import Thumbnail 

24from slixmpp.plugins.xep_0447.stanza import StatelessFileSharing 

25 

26from ...db.avatar import avatar_cache 

27from ...db.models import Attachment 

28from ...util.types import ( 

29 LegacyAttachment, 

30 LegacyMessageType, 

31 LegacyThreadType, 

32 MessageReference, 

33) 

34from ...util.util import fix_suffix 

35from .. import config 

36from .message_text import TextMessageMixin 

37 

38 

39class AttachmentMixin(TextMessageMixin): 

40 PRIVILEGED_UPLOAD = False 

41 

42 @property 

43 def __is_component(self) -> bool: 

44 return self.session is NotImplemented 

45 

46 async def __upload( 

47 self, 

48 file_path: Path, 

49 file_name: Optional[str] = None, 

50 content_type: Optional[str] = None, 

51 ) -> str: 

52 assert config.UPLOAD_SERVICE 

53 

54 file_size = file_path.stat().st_size 

55 

56 content_type = content_type or self.xmpp.plugin["xep_0363"].default_content_type 

57 iq_slot = await self.__request_upload_slot( 

58 config.UPLOAD_SERVICE, 

59 file_name or file_path.name, 

60 file_size, 

61 content_type, 

62 ) 

63 slot = iq_slot["http_upload_slot"] 

64 headers = { 

65 "Content-Length": str(file_size), 

66 "Content-Type": content_type, 

67 **{header["name"]: header["value"] for header in slot["put"]["headers"]}, 

68 } 

69 

70 async with aiohttp.ClientSession() as http: 

71 with file_path.open("rb") as fp: 

72 async with http.put( 

73 slot["put"]["url"], data=fp, headers=headers 

74 ) as response: 

75 response.raise_for_status() 

76 return slot["get"]["url"] 

77 

78 async def __request_upload_slot( 

79 self, 

80 upload_service: JID | str, 

81 filename: str, 

82 size: int, 

83 content_type: str, 

84 ) -> Iq: 

85 iq_request = self.xmpp.make_iq_get( 

86 ito=upload_service, ifrom=config.UPLOAD_REQUESTER or self.xmpp.boundjid 

87 ) 

88 request = iq_request["http_upload_request"] 

89 request["filename"] = filename 

90 request["size"] = str(size) 

91 request["content-type"] = content_type 

92 if self.__is_component or not self.PRIVILEGED_UPLOAD: 

93 return await iq_request.send() 

94 

95 assert self.session is not NotImplemented 

96 iq_request.set_from(self.session.user_jid) 

97 return await self.xmpp["xep_0356"].send_privileged_iq(iq_request) 

98 

99 @staticmethod 

100 async def __no_upload( 

101 file_path: Path, 

102 file_name: Optional[str] = None, 

103 legacy_file_id: Optional[Union[str, int]] = None, 

104 ) -> tuple[Path, str]: 

105 file_id = str(uuid4()) if legacy_file_id is None else str(legacy_file_id) 

106 assert config.NO_UPLOAD_PATH is not None 

107 assert config.NO_UPLOAD_URL_PREFIX is not None 

108 destination_dir = Path(config.NO_UPLOAD_PATH) / file_id 

109 

110 if destination_dir.exists(): 

111 log.debug("Dest dir exists: %s", destination_dir) 

112 files = list(f for f in destination_dir.glob("**/*") if f.is_file()) 

113 if len(files) == 1: 

114 log.debug( 

115 "Found the legacy attachment '%s' at '%s'", 

116 legacy_file_id, 

117 files[0], 

118 ) 

119 name = files[0].name 

120 uu = files[0].parent.name # anti-obvious url trick, see below 

121 return files[0], "/".join([file_id, uu, name]) 

122 else: 

123 log.warning( 

124 ( 

125 "There are several or zero files in %s, " 

126 "slidge doesn't know which one to pick among %s. " 

127 "Removing the dir." 

128 ), 

129 destination_dir, 

130 files, 

131 ) 

132 shutil.rmtree(destination_dir) 

133 

134 log.debug("Did not find a file in: %s", destination_dir) 

135 # let's use a UUID to avoid URLs being too obvious 

136 uu = str(uuid4()) 

137 destination_dir = destination_dir / uu 

138 destination_dir.mkdir(parents=True) 

139 

140 name = file_name or file_path.name 

141 destination = destination_dir / name 

142 method = config.NO_UPLOAD_METHOD 

143 if method == "copy": 

144 shutil.copy2(file_path, destination) 

145 elif method == "hardlink": 

146 os.link(file_path, destination) 

147 elif method == "symlink": 

148 os.symlink(file_path, destination, target_is_directory=True) 

149 elif method == "move": 

150 shutil.move(file_path, destination) 

151 else: 

152 raise RuntimeError("No upload method not recognized", method) 

153 

154 if config.NO_UPLOAD_FILE_READ_OTHERS: 

155 log.debug("Changing perms of %s", destination) 

156 destination.chmod(destination.stat().st_mode | stat.S_IROTH) 

157 uploaded_url = "/".join([file_id, uu, name]) 

158 

159 return destination, uploaded_url 

160 

161 async def __valid_url(self, url: str) -> bool: 

162 async with self.session.http.head(url) as r: 

163 return r.status < 400 

164 

165 async def __get_stored(self, attachment: LegacyAttachment) -> Attachment: 

166 if attachment.legacy_file_id is not None and not self.__is_component: 

167 with self.xmpp.store.session() as orm: 

168 stored = ( 

169 orm.query(Attachment) 

170 .filter_by( 

171 legacy_file_id=str(attachment.legacy_file_id), 

172 user_account_id=self.session.user_pk, 

173 ) 

174 .one_or_none() 

175 ) 

176 if stored is not None: 

177 if not await self.__valid_url(stored.url): 

178 stored.url = None # type:ignore 

179 return stored 

180 return Attachment( 

181 user_account_id=None if self.__is_component else self.session.user_pk, 

182 legacy_file_id=None 

183 if attachment.legacy_file_id is None 

184 else str(attachment.legacy_file_id), 

185 url=attachment.url if config.USE_ATTACHMENT_ORIGINAL_URLS else None, 

186 ) 

187 

188 async def __get_url( 

189 self, attachment: LegacyAttachment, stored: Attachment 

190 ) -> tuple[bool, Path | None, str]: 

191 file_name = attachment.name 

192 content_type = attachment.content_type 

193 file_path = attachment.path 

194 

195 if file_name and len(file_name) > config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH: 

196 log.debug("Trimming long filename: %s", file_name) 

197 base, ext = os.path.splitext(file_name) 

198 file_name = ( 

199 base[: config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH - len(ext)] + ext 

200 ) 

201 

202 if file_path is None: 

203 if file_name is None: 

204 file_name = str(uuid4()) 

205 if content_type is not None: 

206 ext = guess_extension(content_type, strict=False) # type:ignore 

207 if ext is not None: 

208 file_name += ext 

209 temp_dir = Path(tempfile.mkdtemp()) 

210 file_path = temp_dir / file_name 

211 if attachment.url: 

212 async with self.session.http.get(attachment.url) as r: 

213 r.raise_for_status() 

214 with file_path.open("wb") as f: 

215 f.write(await r.read()) 

216 

217 elif attachment.stream is not None: 

218 data = attachment.stream.read() 

219 if data is None: 

220 raise RuntimeError 

221 

222 with file_path.open("wb") as f: 

223 f.write(data) 

224 elif attachment.aio_stream is not None: 

225 # TODO: patch slixmpp to allow this as data source for 

226 # upload_file() so we don't even have to write anything 

227 # to disk. 

228 with file_path.open("wb") as f: 

229 async for chunk in attachment.aio_stream: 

230 f.write(chunk) 

231 elif attachment.data is not None: 

232 with file_path.open("wb") as f: 

233 f.write(attachment.data) 

234 

235 is_temp = not bool(config.NO_UPLOAD_PATH) 

236 else: 

237 is_temp = False 

238 

239 assert isinstance(file_path, Path) 

240 if config.FIX_FILENAME_SUFFIX_MIME_TYPE: 

241 file_name, content_type = fix_suffix(file_path, content_type, file_name) 

242 attachment.content_type = content_type 

243 attachment.name = file_name 

244 

245 if config.NO_UPLOAD_PATH: 

246 local_path, new_url = await self.__no_upload( 

247 file_path, file_name, stored.legacy_file_id 

248 ) 

249 new_url = (config.NO_UPLOAD_URL_PREFIX or "") + "/" + urlquote(new_url) 

250 else: 

251 local_path = file_path 

252 new_url = await self.__upload(file_path, file_name, content_type) 

253 if stored.legacy_file_id and new_url is not None: 

254 stored.url = new_url 

255 

256 if local_path is not None and local_path.stat().st_size == 0: 

257 raise RuntimeError("File size is 0") 

258 

259 return is_temp, local_path, new_url 

260 

261 async def __set_sims( 

262 self, 

263 msg: Message, 

264 uploaded_url: str, 

265 path: Optional[Path], 

266 attachment: LegacyAttachment, 

267 stored: Attachment, 

268 ) -> Thumbnail | None: 

269 if stored.sims is not None: 

270 ref = self.xmpp["xep_0372"].stanza.Reference(xml=ET.fromstring(stored.sims)) 

271 msg.append(ref) 

272 if ref["sims"]["file"].get_plugin("thumbnail", check=True): 

273 return ref["sims"]["file"]["thumbnail"] 

274 else: 

275 return None 

276 

277 if not path: 

278 return None 

279 

280 ref = self.xmpp["xep_0385"].get_sims( 

281 path, [uploaded_url], attachment.content_type, attachment.caption 

282 ) 

283 if attachment.name: 

284 ref["sims"]["file"]["name"] = attachment.name 

285 thumbnail = None 

286 if attachment.content_type is not None and attachment.content_type.startswith( 

287 "image" 

288 ): 

289 try: 

290 h, x, y = await self.xmpp.loop.run_in_executor( 

291 avatar_cache._thread_pool, get_thumbhash, path 

292 ) 

293 except Exception as e: 

294 log.debug("Could not generate a thumbhash", exc_info=e) 

295 else: 

296 thumbnail = ref["sims"]["file"]["thumbnail"] 

297 thumbnail["width"] = x 

298 thumbnail["height"] = y 

299 thumbnail["media-type"] = "image/thumbhash" 

300 thumbnail["uri"] = "data:image/thumbhash;base64," + urlquote(h) 

301 

302 stored.sims = str(ref) 

303 msg.append(ref) 

304 

305 return thumbnail 

306 

307 def __set_sfs( 

308 self, 

309 msg: Message, 

310 uploaded_url: str, 

311 path: Optional[Path], 

312 attachment: LegacyAttachment, 

313 stored: Attachment, 

314 thumbnail: Optional[Thumbnail] = None, 

315 ) -> None: 

316 if stored.sfs is not None: 

317 msg.append(StatelessFileSharing(xml=ET.fromstring(stored.sfs))) 

318 return 

319 

320 if not path: 

321 return 

322 

323 sfs = self.xmpp["xep_0447"].get_sfs( 

324 path, [uploaded_url], attachment.content_type, attachment.caption 

325 ) 

326 if attachment.name: 

327 sfs["file"]["name"] = attachment.name 

328 if attachment.disposition: 

329 sfs["disposition"] = attachment.disposition 

330 else: 

331 del sfs["disposition"] 

332 if thumbnail is not None: 

333 sfs["file"].append(thumbnail) 

334 stored.sfs = str(sfs) 

335 msg.append(sfs) 

336 

337 async def __set_sfs_and_sims_without_download( 

338 self, msg: Message, attachment: LegacyAttachment 

339 ) -> None: 

340 assert attachment.url is not None 

341 

342 if not any( 

343 ( 

344 attachment.content_type, 

345 attachment.name, 

346 attachment.disposition, 

347 ) 

348 ): 

349 return 

350 

351 sims = self.xmpp.plugin["xep_0385"].stanza.Sims() 

352 ref = self.xmpp["xep_0372"].stanza.Reference() 

353 

354 ref["uri"] = attachment.url 

355 ref["type"] = "data" 

356 sims["sources"].append(ref) 

357 sims.enable("file") 

358 

359 xep_0447_stanza = self.xmpp.plugin["xep_0447"].stanza 

360 sfs = xep_0447_stanza.StatelessFileSharing() 

361 url_data = xep_0447_stanza.UrlData() 

362 url_data["target"] = attachment.url 

363 sfs["sources"].append(url_data) 

364 sfs.enable("file") 

365 

366 if attachment.content_type: 

367 sims["file"]["media-type"] = attachment.content_type 

368 sfs["file"]["media-type"] = attachment.content_type 

369 if attachment.caption: 

370 sims["file"]["desc"] = attachment.caption 

371 sfs["file"]["desc"] = attachment.caption 

372 if attachment.name: 

373 sims["file"]["name"] = attachment.name 

374 sfs["file"]["name"] = attachment.name 

375 if attachment.disposition: 

376 sfs["disposition"] = attachment.disposition 

377 

378 msg.append(sims) 

379 msg.append(sfs) 

380 

381 def __send_url( 

382 self, 

383 msg: Message, 

384 legacy_msg_id: LegacyMessageType, 

385 uploaded_url: str, 

386 caption: Optional[str] = None, 

387 carbon: bool = False, 

388 when: Optional[datetime] = None, 

389 correction: bool = False, 

390 **kwargs, 

391 ) -> list[Message]: 

392 msg["oob"]["url"] = uploaded_url 

393 msg["body"] = uploaded_url 

394 if msg.get_plugin("sfs", check=True): 

395 msg["fallback"].enable("body") 

396 msg["fallback"]["for"] = self.xmpp.plugin["xep_0447"].stanza.NAMESPACE 

397 if caption: 

398 if correction: 

399 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

400 else: 

401 self._set_msg_id(msg, legacy_msg_id) 

402 m1 = self._send(msg, carbon=carbon, correction=correction, **kwargs) 

403 m2 = self.send_text( 

404 caption, legacy_msg_id=None, when=when, carbon=carbon, **kwargs 

405 ) 

406 return [m1, m2] if m2 else [m1] 

407 else: 

408 if correction: 

409 msg["replace"]["id"] = self._replace_id(legacy_msg_id) 

410 else: 

411 self._set_msg_id(msg, legacy_msg_id) 

412 return [self._send(msg, carbon=carbon, **kwargs)] 

413 

414 def __get_base_message( 

415 self, 

416 legacy_msg_id: Optional[LegacyMessageType] = None, 

417 reply_to: Optional[MessageReference] = None, 

418 when: Optional[datetime] = None, 

419 thread: Optional[LegacyThreadType] = None, 

420 carbon: bool = False, 

421 correction: bool = False, 

422 mto: Optional[JID] = None, 

423 ) -> Message: 

424 if correction: 

425 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id) 

426 if xmpp_ids: 

427 original_xmpp_id = xmpp_ids[0] 

428 for xmpp_id in xmpp_ids: 

429 if xmpp_id == original_xmpp_id: 

430 continue 

431 self.retract(xmpp_id, thread) 

432 

433 if reply_to is not None and reply_to.body: 

434 # We cannot have a "quote fallback" for attachments since most (all?) 

435 # XMPP clients will only treat a message as an attachment if the 

436 # body is the URL and nothing else. 

437 reply_to_for_attachment: MessageReference | None = MessageReference( 

438 reply_to.legacy_id, reply_to.author 

439 ) 

440 else: 

441 reply_to_for_attachment = reply_to 

442 

443 return self._make_message( 

444 when=when, 

445 reply_to=reply_to_for_attachment, 

446 carbon=carbon, 

447 mto=mto, 

448 thread=thread, 

449 ) 

450 

451 async def send_file( 

452 self, 

453 attachment: LegacyAttachment | Path | str, 

454 legacy_msg_id: Optional[LegacyMessageType] = None, 

455 *, 

456 reply_to: Optional[MessageReference] = None, 

457 when: Optional[datetime] = None, 

458 thread: Optional[LegacyThreadType] = None, 

459 **kwargs, 

460 ) -> tuple[Optional[str], list[Message]]: 

461 """ 

462 Send a single file from this :term:`XMPP Entity`. 

463 

464 :param attachment: The file to send. 

465 Ideally, a :class:`.LegacyAttachment` with a unique ``legacy_file_id`` 

466 attribute set, to optimise potential future reuses. 

467 It can also be: 

468 - a :class:`pathlib.Path` instance to point to a local file, or 

469 - a ``str``, representing a fetchable HTTP URL. 

470 :param legacy_msg_id: If you want to be able to transport read markers from the gateway 

471 user to the legacy network, specify this 

472 :param reply_to: Quote another message (:xep:`0461`) 

473 :param when: when the file was sent, for a "delay" tag (:xep:`0203`) 

474 :param thread: 

475 """ 

476 coro = self.__send_file( 

477 attachment, 

478 legacy_msg_id, 

479 reply_to=reply_to, 

480 when=when, 

481 thread=thread, 

482 **kwargs, 

483 ) 

484 if self.__is_component: 

485 return await coro 

486 elif not isinstance(attachment, LegacyAttachment): 

487 return await coro 

488 elif attachment.legacy_file_id is None: 

489 return await coro 

490 else: 

491 # prevents race conditions where we download the same thing several time 

492 # and end up attempting to insert it twice in the DB, raising an 

493 # IntegrityError. 

494 async with self.session.lock(("attachment", attachment.legacy_file_id)): 

495 return await coro 

496 

497 async def __send_file( 

498 self, 

499 attachment: LegacyAttachment | Path | str, 

500 legacy_msg_id: Optional[LegacyMessageType] = None, 

501 *, 

502 reply_to: Optional[MessageReference] = None, 

503 when: Optional[datetime] = None, 

504 thread: Optional[LegacyThreadType] = None, 

505 **kwargs, 

506 ) -> tuple[Optional[str], list[Message]]: 

507 store_multi = kwargs.pop("store_multi", True) 

508 carbon = kwargs.pop("carbon", False) 

509 mto = kwargs.pop("mto", None) 

510 correction = kwargs.get("correction", False) 

511 

512 msg = self.__get_base_message( 

513 legacy_msg_id, reply_to, when, thread, carbon, correction, mto 

514 ) 

515 

516 if isinstance(attachment, str): 

517 attachment = LegacyAttachment(url=attachment) 

518 elif isinstance(attachment, Path): 

519 attachment = LegacyAttachment(path=attachment) 

520 

521 stored = await self.__get_stored(attachment) 

522 

523 if attachment.content_type is None and ( 

524 name := (attachment.name or attachment.url or attachment.path) 

525 ): 

526 attachment.content_type, _ = guess_type(name) 

527 

528 if stored.url: 

529 is_temp = False 

530 local_path = None 

531 new_url = stored.url 

532 else: 

533 try: 

534 is_temp, local_path, new_url = await self.__get_url(attachment, stored) 

535 except Exception as e: 

536 log.error("Error with attachment: %s: %s", attachment, e) 

537 log.debug("", exc_info=e) 

538 msg["body"] = ( 

539 f"/me tried to send a file ({attachment.format_for_user()}), " 

540 f"but something went wrong: {e}. " 

541 ) 

542 self._set_msg_id(msg, legacy_msg_id) 

543 return None, [self._send(msg, **kwargs)] 

544 assert new_url is not None 

545 

546 stored.url = new_url 

547 if config.USE_ATTACHMENT_ORIGINAL_URLS and attachment.url: 

548 await self.__set_sfs_and_sims_without_download(msg, attachment) 

549 else: 

550 thumbnail = await self.__set_sims( 

551 msg, new_url, local_path, attachment, stored 

552 ) 

553 self.__set_sfs(msg, new_url, local_path, attachment, stored, thumbnail) 

554 

555 if not self.__is_component: 

556 with self.xmpp.store.session(expire_on_commit=False) as orm: 

557 orm.add(stored) 

558 orm.commit() 

559 

560 if is_temp and isinstance(local_path, Path): 

561 local_path.unlink() 

562 local_path.parent.rmdir() 

563 

564 msgs = self.__send_url( 

565 msg, legacy_msg_id, new_url, attachment.caption, carbon, when, **kwargs 

566 ) 

567 if not self.__is_component: 

568 if store_multi: 

569 self.__store_multi(legacy_msg_id, msgs) 

570 return new_url, msgs 

571 

572 def __send_body( 

573 self, 

574 body: Optional[str] = None, 

575 legacy_msg_id: Optional[LegacyMessageType] = None, 

576 reply_to: Optional[MessageReference] = None, 

577 when: Optional[datetime] = None, 

578 thread: Optional[LegacyThreadType] = None, 

579 **kwargs, 

580 ) -> Optional[Message]: 

581 if body: 

582 return self.send_text( 

583 body, 

584 legacy_msg_id, 

585 reply_to=reply_to, 

586 when=when, 

587 thread=thread, 

588 **kwargs, 

589 ) 

590 else: 

591 return None 

592 

593 async def send_files( 

594 self, 

595 attachments: Collection[LegacyAttachment], 

596 legacy_msg_id: Optional[LegacyMessageType] = None, 

597 body: Optional[str] = None, 

598 *, 

599 reply_to: Optional[MessageReference] = None, 

600 when: Optional[datetime] = None, 

601 thread: Optional[LegacyThreadType] = None, 

602 body_first: bool = False, 

603 correction: bool = False, 

604 correction_event_id: Optional[LegacyMessageType] = None, 

605 **kwargs, 

606 ) -> None: 

607 # TODO: once the epic XEP-0385 vs XEP-0447 battle is over, pick 

608 # one and stop sending several attachments this way 

609 # we attach the legacy_message ID to the last message we send, because 

610 # we don't want several messages with the same ID (especially for MUC MAM) 

611 if not attachments and not body: 

612 # ignoring empty message 

613 return 

614 body_msg_id = ( 

615 legacy_msg_id if body_needs_msg_id(attachments, body, body_first) else None 

616 ) 

617 send_body = functools.partial( 

618 self.__send_body, 

619 body=body, 

620 reply_to=reply_to, 

621 when=when, 

622 thread=thread, 

623 correction=correction, 

624 legacy_msg_id=body_msg_id, 

625 correction_event_id=correction_event_id, 

626 **kwargs, 

627 ) 

628 all_msgs = [] 

629 if body_first: 

630 all_msgs.append(send_body()) 

631 for i, attachment in enumerate(attachments): 

632 if i == 0 and body_msg_id is None: 

633 legacy = legacy_msg_id 

634 else: 

635 legacy = None 

636 _url, msgs = await self.send_file( 

637 attachment, 

638 legacy, 

639 reply_to=reply_to, 

640 when=when, 

641 thread=thread, 

642 store_multi=False, 

643 **kwargs, 

644 ) 

645 all_msgs.extend(msgs) 

646 if not body_first: 

647 all_msgs.append(send_body()) 

648 self.__store_multi(legacy_msg_id, all_msgs) 

649 

650 def __store_multi( 

651 self, 

652 legacy_msg_id: Optional[LegacyMessageType], 

653 all_msgs: Sequence[Optional[Message]], 

654 ) -> None: 

655 if legacy_msg_id is None: 

656 return 

657 ids = [] 

658 for msg in all_msgs: 

659 if not msg: 

660 continue 

661 if stanza_id := msg.get_plugin("stanza_id", check=True): 

662 ids.append(stanza_id["id"]) 

663 else: 

664 ids.append(msg.get_id()) 

665 with self.xmpp.store.session() as orm: 

666 self.xmpp.store.id_map.set_msg( 

667 orm, self._recipient_pk(), str(legacy_msg_id), ids, self.is_participant 

668 ) 

669 orm.commit() 

670 

671 

672def body_needs_msg_id( 

673 attachments: Collection[LegacyAttachment], body: str | None, body_first: bool 

674) -> bool: 

675 if attachments: 

676 return bool(body and body_first) 

677 else: 

678 return True 

679 

680 

681def get_thumbhash(path: Path) -> tuple[str, int, int]: 

682 with path.open("rb") as fp: 

683 img = Image.open(fp) 

684 width, height = img.size 

685 img = img.convert("RGBA") 

686 if width > 100 or height > 100: 

687 img.thumbnail((100, 100)) 

688 img = ImageOps.exif_transpose(img) 

689 rgba_2d = list(img.getdata()) 

690 rgba = list(chain(*rgba_2d)) 

691 ints = thumbhash.rgba_to_thumb_hash(img.width, img.height, rgba) 

692 return base64.b64encode(bytes(ints)).decode(), width, height 

693 

694 

695log = logging.getLogger(__name__)