Coverage for slidge / core / mixins / attachment.py: 85%
357 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-20 19:56 +0000
1import base64
2import functools
3import logging
4import os
5import shutil
6import stat
7import tempfile
8from collections.abc import Collection, Sequence
9from datetime import datetime
10from itertools import chain
11from mimetypes import guess_extension, guess_type
12from pathlib import Path
13from typing import Any
14from urllib.parse import quote as urlquote
15from uuid import uuid4
16from xml.etree import ElementTree as ET
18import aiohttp
19import thumbhash
20from PIL import Image, ImageOps
21from slixmpp import JID, Iq, Message
22from slixmpp.plugins.xep_0264.stanza import Thumbnail
23from slixmpp.plugins.xep_0447.stanza import StatelessFileSharing
25from ...db.avatar import avatar_cache
26from ...db.models import Attachment
27from ...util.types import (
28 AnyMessageReference,
29 LegacyAttachment,
30 LegacyMessageType,
31 LegacyThreadType,
32)
33from ...util.util import fix_suffix
34from .. import config
35from .message_text import TextMessageMixin
38class AttachmentMixin(TextMessageMixin):
39 PRIVILEGED_UPLOAD = False
41 @property
42 def __is_component(self) -> bool:
43 return self.session is NotImplemented
45 async def __upload(
46 self,
47 file_path: Path,
48 file_name: str | None = None,
49 content_type: str | None = None,
50 ) -> str:
51 assert config.UPLOAD_SERVICE
53 file_size = file_path.stat().st_size
55 content_type = content_type or self.xmpp.plugin["xep_0363"].default_content_type
56 iq_slot = await self.__request_upload_slot(
57 config.UPLOAD_SERVICE,
58 file_name or file_path.name,
59 file_size,
60 content_type,
61 )
62 slot = iq_slot["http_upload_slot"]
63 headers = {
64 "Content-Length": str(file_size),
65 "Content-Type": content_type,
66 **{header["name"]: header["value"] for header in slot["put"]["headers"]},
67 }
69 async with aiohttp.ClientSession() as http:
70 with file_path.open("rb") as fp:
71 async with http.put(
72 slot["put"]["url"], data=fp, headers=headers
73 ) as response:
74 response.raise_for_status()
75 return slot["get"]["url"] # type:ignore[no-any-return]
77 async def __request_upload_slot(
78 self,
79 upload_service: JID | str,
80 filename: str,
81 size: int,
82 content_type: str,
83 ) -> Iq:
84 iq_request = self.xmpp.make_iq_get(
85 ito=upload_service, ifrom=config.UPLOAD_REQUESTER or self.xmpp.boundjid
86 )
87 request = iq_request["http_upload_request"]
88 request["filename"] = filename
89 request["size"] = str(size)
90 request["content-type"] = content_type
91 if self.__is_component or not self.PRIVILEGED_UPLOAD:
92 return await iq_request.send() # type:ignore[no-any-return]
94 assert self.session is not NotImplemented
95 iq_request.set_from(self.session.user_jid)
96 return await self.xmpp["xep_0356"].send_privileged_iq(iq_request) # type:ignore[no-any-return]
98 @staticmethod
99 async def __no_upload(
100 file_path: Path,
101 file_name: str | None = None,
102 legacy_file_id: str | int | None = None,
103 ) -> tuple[Path, str]:
104 file_id = str(uuid4()) if legacy_file_id is None else str(legacy_file_id)
105 assert config.NO_UPLOAD_PATH is not None
106 assert config.NO_UPLOAD_URL_PREFIX is not None
107 destination_dir = Path(config.NO_UPLOAD_PATH) / file_id
109 if destination_dir.exists():
110 log.debug("Dest dir exists: %s", destination_dir)
111 files = list(f for f in destination_dir.glob("**/*") if f.is_file())
112 if len(files) == 1:
113 log.debug(
114 "Found the legacy attachment '%s' at '%s'",
115 legacy_file_id,
116 files[0],
117 )
118 name = files[0].name
119 uu = files[0].parent.name # anti-obvious url trick, see below
120 return files[0], "/".join([file_id, uu, name])
121 else:
122 log.warning(
123 (
124 "There are several or zero files in %s, "
125 "slidge doesn't know which one to pick among %s. "
126 "Removing the dir."
127 ),
128 destination_dir,
129 files,
130 )
131 shutil.rmtree(destination_dir)
133 log.debug("Did not find a file in: %s", destination_dir)
134 # let's use a UUID to avoid URLs being too obvious
135 uu = str(uuid4())
136 destination_dir = destination_dir / uu
137 destination_dir.mkdir(parents=True)
139 name = file_name or file_path.name
140 destination = destination_dir / name
141 method = config.NO_UPLOAD_METHOD
142 if method == "copy":
143 shutil.copy2(file_path, destination)
144 elif method == "hardlink":
145 os.link(file_path, destination)
146 elif method == "symlink":
147 os.symlink(file_path, destination, target_is_directory=True)
148 elif method == "move":
149 shutil.move(file_path, destination)
150 else:
151 raise RuntimeError("No upload method not recognized", method)
153 if config.NO_UPLOAD_FILE_READ_OTHERS:
154 log.debug("Changing perms of %s", destination)
155 destination.chmod(destination.stat().st_mode | stat.S_IROTH)
156 uploaded_url = "/".join([file_id, uu, name])
158 return destination, uploaded_url
160 async def __valid_url(self, url: str) -> bool:
161 async with self.session.http.head(url) as r:
162 return r.status < 400
164 async def __get_stored(self, attachment: LegacyAttachment) -> Attachment:
165 if attachment.legacy_file_id is not None and not self.__is_component:
166 with self.xmpp.store.session() as orm:
167 stored = (
168 orm.query(Attachment)
169 .filter_by(
170 legacy_file_id=str(attachment.legacy_file_id),
171 user_account_id=self.session.user_pk,
172 )
173 .one_or_none()
174 )
175 if stored is not None:
176 if not await self.__valid_url(stored.url):
177 stored.url = None # type:ignore
178 return stored
179 return Attachment(
180 user_account_id=None if self.__is_component else self.session.user_pk,
181 legacy_file_id=None
182 if attachment.legacy_file_id is None
183 else str(attachment.legacy_file_id),
184 url=attachment.url if config.USE_ATTACHMENT_ORIGINAL_URLS else None,
185 )
187 async def __get_url(
188 self, attachment: LegacyAttachment, stored: Attachment
189 ) -> tuple[bool, Path | None, str]:
190 file_name = attachment.name
191 content_type = attachment.content_type
192 file_path = attachment.path
194 if file_name and len(file_name) > config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH:
195 log.debug("Trimming long filename: %s", file_name)
196 base, ext = os.path.splitext(file_name)
197 file_name = (
198 base[: config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH - len(ext)] + ext
199 )
201 if file_path is None:
202 if file_name is None:
203 file_name = str(uuid4())
204 if content_type is not None:
205 ext = guess_extension(content_type, strict=False) # type:ignore
206 if ext is not None:
207 file_name += ext
208 temp_dir = Path(tempfile.mkdtemp())
209 file_path = temp_dir / file_name
210 if attachment.url:
211 async with self.session.http.get(attachment.url) as r:
212 r.raise_for_status()
213 with file_path.open("wb") as f:
214 f.write(await r.read())
216 elif attachment.stream is not None:
217 data = attachment.stream.read()
218 if data is None:
219 raise RuntimeError
221 with file_path.open("wb") as f:
222 f.write(data)
223 elif attachment.aio_stream is not None:
224 # TODO: patch slixmpp to allow this as data source for
225 # upload_file() so we don't even have to write anything
226 # to disk.
227 with file_path.open("wb") as f:
228 async for chunk in attachment.aio_stream:
229 f.write(chunk)
230 elif attachment.data is not None:
231 with file_path.open("wb") as f:
232 f.write(attachment.data)
234 is_temp = not bool(config.NO_UPLOAD_PATH)
235 else:
236 is_temp = False
238 assert isinstance(file_path, Path)
239 if config.FIX_FILENAME_SUFFIX_MIME_TYPE:
240 file_name, content_type = fix_suffix(file_path, content_type, file_name)
241 attachment.content_type = content_type
242 attachment.name = file_name
244 if config.NO_UPLOAD_PATH:
245 local_path, new_url = await self.__no_upload(
246 file_path, file_name, stored.legacy_file_id
247 )
248 new_url = (config.NO_UPLOAD_URL_PREFIX or "") + "/" + urlquote(new_url)
249 else:
250 local_path = file_path
251 new_url = await self.__upload(file_path, file_name, content_type)
252 if stored.legacy_file_id and new_url is not None:
253 stored.url = new_url
255 if local_path is not None and local_path.stat().st_size == 0:
256 raise RuntimeError("File size is 0")
258 return is_temp, local_path, new_url
260 async def __set_sims(
261 self,
262 msg: Message,
263 uploaded_url: str,
264 path: Path | None,
265 attachment: LegacyAttachment,
266 stored: Attachment,
267 ) -> Thumbnail | None:
268 if stored.sims is not None:
269 ref = self.xmpp["xep_0372"].stanza.Reference(xml=ET.fromstring(stored.sims))
270 msg.append(ref)
271 if "thumbnail" in ref["sims"]["file"]:
272 return ref["sims"]["file"]["thumbnail"] # type:ignore[no-any-return]
273 else:
274 return None
276 if not path:
277 return None
279 ref = self.xmpp["xep_0385"].get_sims(
280 path, [uploaded_url], attachment.content_type, attachment.caption
281 )
282 if attachment.name:
283 ref["sims"]["file"]["name"] = attachment.name
284 thumbnail = None
285 if attachment.content_type is not None and attachment.content_type.startswith(
286 "image"
287 ):
288 try:
289 h, x, y = await self.xmpp.loop.run_in_executor(
290 avatar_cache._thread_pool, get_thumbhash, path
291 )
292 except Exception as e:
293 log.debug("Could not generate a thumbhash", exc_info=e)
294 else:
295 thumbnail = ref["sims"]["file"]["thumbnail"]
296 thumbnail["width"] = x
297 thumbnail["height"] = y
298 thumbnail["media-type"] = "image/thumbhash"
299 thumbnail["uri"] = "data:image/thumbhash;base64," + urlquote(h)
301 stored.sims = str(ref)
302 msg.append(ref)
304 return thumbnail
306 def __set_sfs(
307 self,
308 msg: Message,
309 uploaded_url: str,
310 path: Path | None,
311 attachment: LegacyAttachment,
312 stored: Attachment,
313 thumbnail: Thumbnail | None = None,
314 ) -> None:
315 if stored.sfs is not None:
316 msg.append(StatelessFileSharing(xml=ET.fromstring(stored.sfs)))
317 return
319 if not path:
320 return
322 sfs = self.xmpp["xep_0447"].get_sfs(
323 path, [uploaded_url], attachment.content_type, attachment.caption
324 )
325 if attachment.name:
326 sfs["file"]["name"] = attachment.name
327 if attachment.disposition:
328 sfs["disposition"] = attachment.disposition
329 else:
330 del sfs["disposition"]
331 if thumbnail is not None:
332 sfs["file"].append(thumbnail)
333 stored.sfs = str(sfs)
334 msg.append(sfs)
336 async def __set_sfs_and_sims_without_download(
337 self, msg: Message, attachment: LegacyAttachment
338 ) -> None:
339 assert attachment.url is not None
341 if not any(
342 (
343 attachment.content_type,
344 attachment.name,
345 attachment.disposition,
346 )
347 ):
348 return
350 sims = self.xmpp.plugin["xep_0385"].stanza.Sims()
351 ref = self.xmpp["xep_0372"].stanza.Reference()
353 ref["uri"] = attachment.url
354 ref["type"] = "data"
355 sims["sources"].append(ref)
356 sims.enable("file")
358 xep_0447_stanza = self.xmpp.plugin["xep_0447"].stanza
359 sfs = xep_0447_stanza.StatelessFileSharing()
360 url_data = xep_0447_stanza.UrlData()
361 url_data["target"] = attachment.url
362 sfs["sources"].append(url_data)
363 sfs.enable("file")
365 if attachment.content_type:
366 sims["file"]["media-type"] = attachment.content_type
367 sfs["file"]["media-type"] = attachment.content_type
368 if attachment.caption:
369 sims["file"]["desc"] = attachment.caption
370 sfs["file"]["desc"] = attachment.caption
371 if attachment.name:
372 sims["file"]["name"] = attachment.name
373 sfs["file"]["name"] = attachment.name
374 if attachment.disposition:
375 sfs["disposition"] = attachment.disposition
377 msg.append(sims)
378 msg.append(sfs)
380 def __send_url(
381 self,
382 msg: Message,
383 legacy_msg_id: LegacyMessageType,
384 uploaded_url: str,
385 caption: str | None = None,
386 carbon: bool = False,
387 when: datetime | None = None,
388 correction: bool = False,
389 **kwargs: Any, # noqa:ANN401
390 ) -> list[Message]:
391 msg["oob"]["url"] = uploaded_url
392 msg["body"] = uploaded_url
393 if "sfs" in msg:
394 msg["fallback"].enable("body")
395 msg["fallback"]["for"] = self.xmpp.plugin["xep_0447"].stanza.NAMESPACE
396 if caption:
397 if correction:
398 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
399 else:
400 self._set_msg_id(msg, legacy_msg_id)
401 m1 = self._send(msg, carbon=carbon, correction=correction, **kwargs)
402 m2 = self.send_text(
403 caption, legacy_msg_id=None, when=when, carbon=carbon, **kwargs
404 )
405 return [m1, m2] if m2 else [m1]
406 else:
407 if correction:
408 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
409 else:
410 self._set_msg_id(msg, legacy_msg_id)
411 return [self._send(msg, carbon=carbon, **kwargs)]
413 def __get_base_message(
414 self,
415 legacy_msg_id: LegacyMessageType | None = None,
416 reply_to: AnyMessageReference | None = None,
417 when: datetime | None = None,
418 thread: LegacyThreadType | None = None,
419 carbon: bool = False,
420 correction: bool = False,
421 mto: JID | None = None,
422 ) -> Message:
423 if correction:
424 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
425 if xmpp_ids:
426 original_xmpp_id = xmpp_ids[0]
427 for xmpp_id in xmpp_ids:
428 if xmpp_id == original_xmpp_id:
429 continue
430 self.retract(xmpp_id, thread)
432 if reply_to is not None and reply_to.body:
433 # We cannot have a "quote fallback" for attachments since most (all?)
434 # XMPP clients will only treat a message as an attachment if the
435 # body is the URL and nothing else.
436 reply_to_for_attachment: AnyMessageReference | None = AnyMessageReference(
437 reply_to.legacy_id, reply_to.author
438 )
439 else:
440 reply_to_for_attachment = reply_to
442 return self._make_message(
443 when=when,
444 reply_to=reply_to_for_attachment,
445 carbon=carbon,
446 mto=mto,
447 thread=thread,
448 )
450 async def send_file(
451 self,
452 attachment: LegacyAttachment | Path | str,
453 legacy_msg_id: LegacyMessageType | None = None,
454 *,
455 reply_to: AnyMessageReference | None = None,
456 when: datetime | None = None,
457 thread: LegacyThreadType | None = None,
458 **kwargs: Any, # noqa:ANN401
459 ) -> tuple[str | None, list[Message]]:
460 """
461 Send a single file from this :term:`XMPP Entity`.
463 :param attachment: The file to send.
464 Ideally, a :class:`.LegacyAttachment` with a unique ``legacy_file_id``
465 attribute set, to optimise potential future reuses.
466 It can also be:
467 - a :class:`pathlib.Path` instance to point to a local file, or
468 - a ``str``, representing a fetchable HTTP URL.
469 :param legacy_msg_id: If you want to be able to transport read markers from the gateway
470 user to the legacy network, specify this
471 :param reply_to: Quote another message (:xep:`0461`)
472 :param when: when the file was sent, for a "delay" tag (:xep:`0203`)
473 :param thread:
474 """
475 coro = self.__send_file(
476 attachment,
477 legacy_msg_id,
478 reply_to=reply_to,
479 when=when,
480 thread=thread,
481 **kwargs,
482 )
483 if (
484 self.__is_component
485 or not isinstance(attachment, LegacyAttachment)
486 or attachment.legacy_file_id is None
487 ):
488 return await coro
489 else:
490 # prevents race conditions where we download the same thing several time
491 # and end up attempting to insert it twice in the DB, raising an
492 # IntegrityError.
493 async with self.session.lock(("attachment", attachment.legacy_file_id)):
494 return await coro
496 async def __send_file(
497 self,
498 attachment: LegacyAttachment | Path | str,
499 legacy_msg_id: LegacyMessageType | None = None,
500 *,
501 reply_to: AnyMessageReference | None = None,
502 when: datetime | None = None,
503 thread: LegacyThreadType | None = None,
504 **kwargs: Any, # noqa:ANN401
505 ) -> tuple[str | None, list[Message]]:
506 store_multi = kwargs.pop("store_multi", True)
507 carbon = kwargs.pop("carbon", False)
508 mto = kwargs.pop("mto", None)
509 correction = kwargs.get("correction", False)
511 msg = self.__get_base_message(
512 legacy_msg_id, reply_to, when, thread, carbon, correction, mto
513 )
515 if isinstance(attachment, str):
516 attachment = LegacyAttachment(url=attachment)
517 elif isinstance(attachment, Path):
518 attachment = LegacyAttachment(path=attachment)
520 if attachment.is_sticker:
521 msg.enable("sticker")
523 stored = await self.__get_stored(attachment)
525 if attachment.content_type is None and (
526 name := (attachment.name or attachment.url or attachment.path)
527 ):
528 attachment.content_type, _ = guess_type(name)
530 if stored.url:
531 is_temp = False
532 local_path = None
533 new_url = stored.url
534 else:
535 try:
536 is_temp, local_path, new_url = await self.__get_url(attachment, stored)
537 except Exception as e:
538 log.error("Error with attachment: %s: %s", attachment, e)
539 log.debug("", exc_info=e)
540 msg["body"] = (
541 f"/me tried to send a file ({attachment.format_for_user()}), "
542 f"but something went wrong: {e}. "
543 )
544 self._set_msg_id(msg, legacy_msg_id)
545 return None, [self._send(msg, **kwargs)]
546 assert new_url is not None
548 stored.url = new_url
549 if config.USE_ATTACHMENT_ORIGINAL_URLS and attachment.url:
550 await self.__set_sfs_and_sims_without_download(msg, attachment)
551 else:
552 thumbnail = await self.__set_sims(
553 msg, new_url, local_path, attachment, stored
554 )
555 self.__set_sfs(msg, new_url, local_path, attachment, stored, thumbnail)
557 if not self.__is_component:
558 with self.xmpp.store.session(expire_on_commit=False) as orm:
559 orm.add(stored)
560 orm.commit()
562 if is_temp and isinstance(local_path, Path):
563 local_path.unlink()
564 local_path.parent.rmdir()
566 msgs = self.__send_url(
567 msg, legacy_msg_id, new_url, attachment.caption, carbon, when, **kwargs
568 )
569 if not self.__is_component and store_multi:
570 self.__store_multi(legacy_msg_id, msgs)
571 return new_url, msgs
573 def __send_body(
574 self,
575 body: str | None = None,
576 legacy_msg_id: LegacyMessageType | None = None,
577 reply_to: AnyMessageReference | None = None,
578 when: datetime | None = None,
579 thread: LegacyThreadType | None = None,
580 **kwargs: Any, # noqa:ANN401
581 ) -> Message | None:
582 if body:
583 return self.send_text(
584 body,
585 legacy_msg_id,
586 reply_to=reply_to,
587 when=when,
588 thread=thread,
589 **kwargs,
590 )
591 else:
592 return None
594 async def send_files(
595 self,
596 attachments: Collection[LegacyAttachment],
597 legacy_msg_id: LegacyMessageType | None = None,
598 body: str | None = None,
599 *,
600 reply_to: AnyMessageReference | None = None,
601 when: datetime | None = None,
602 thread: LegacyThreadType | None = None,
603 body_first: bool = False,
604 correction: bool = False,
605 correction_event_id: LegacyMessageType | None = None,
606 **kwargs: Any, # noqa:ANN401
607 ) -> None:
608 # TODO: once the epic XEP-0385 vs XEP-0447 battle is over, pick
609 # one and stop sending several attachments this way
610 # we attach the legacy_message ID to the last message we send, because
611 # we don't want several messages with the same ID (especially for MUC MAM)
612 if not attachments and not body:
613 # ignoring empty message
614 return
615 body_msg_id = (
616 legacy_msg_id if body_needs_msg_id(attachments, body, body_first) else None
617 )
618 send_body = functools.partial(
619 self.__send_body,
620 body=body,
621 reply_to=reply_to,
622 when=when,
623 thread=thread,
624 correction=correction,
625 legacy_msg_id=body_msg_id,
626 correction_event_id=correction_event_id,
627 **kwargs,
628 )
629 all_msgs = []
630 if body_first:
631 all_msgs.append(send_body())
632 for i, attachment in enumerate(attachments):
633 legacy = legacy_msg_id if i == 0 and body_msg_id is None else None
634 _url, msgs = await self.send_file(
635 attachment,
636 legacy,
637 reply_to=reply_to,
638 when=when,
639 thread=thread,
640 store_multi=False,
641 **kwargs,
642 )
643 all_msgs.extend(msgs)
644 if not body_first:
645 all_msgs.append(send_body())
646 self.__store_multi(legacy_msg_id, all_msgs)
648 def __store_multi(
649 self,
650 legacy_msg_id: LegacyMessageType | None,
651 all_msgs: Sequence[Message | None],
652 ) -> None:
653 if legacy_msg_id is None:
654 return
655 ids = []
656 for msg in all_msgs:
657 if not msg:
658 continue
659 if stanza_id := msg.get_plugin("stanza_id", check=True):
660 ids.append(stanza_id["id"])
661 else:
662 ids.append(msg.get_id())
663 with self.xmpp.store.session() as orm:
664 self.xmpp.store.id_map.set_msg(
665 orm, self._recipient_pk(), str(legacy_msg_id), ids, self.is_participant
666 )
667 orm.commit()
670def body_needs_msg_id(
671 attachments: Collection[LegacyAttachment], body: str | None, body_first: bool
672) -> bool:
673 if attachments:
674 return bool(body and body_first)
675 else:
676 return True
679def get_thumbhash(path: Path) -> tuple[str, int, int]:
680 with path.open("rb") as fp:
681 img = Image.open(fp)
682 width, height = img.size
683 img = img.convert("RGBA")
684 if width > 100 or height > 100:
685 img.thumbnail((100, 100))
686 img = ImageOps.exif_transpose(img)
687 rgba_2d = list(img.getdata())
688 rgba = list(chain(*rgba_2d))
689 ints = thumbhash.rgba_to_thumb_hash(img.width, img.height, rgba)
690 return base64.b64encode(bytes(ints)).decode(), width, height
693log = logging.getLogger(__name__)