Coverage for slidge / core / mixins / attachment.py: 84%
365 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-13 04:38 +0000
1import base64
2import functools
3import logging
4import os
5import shutil
6import stat
7import tempfile
8from collections.abc import Collection, Sequence
9from datetime import datetime
10from itertools import chain
11from mimetypes import guess_extension, guess_type
12from pathlib import Path
13from typing import Any
14from urllib.parse import quote as urlquote
15from uuid import uuid4
16from xml.etree import ElementTree as ET
18import aiohttp
19import thumbhash
20from PIL import Image, ImageOps
21from slixmpp import JID, Iq, Message
22from slixmpp.plugins.xep_0264.stanza import Thumbnail
23from slixmpp.plugins.xep_0447.stanza import StatelessFileSharing
25from ...db.avatar import avatar_cache
26from ...db.models import Attachment
27from ...util.types import LegacyAttachment, MessageReference
28from ...util.util import fix_suffix
29from .. import config
30from .message_text import TextMessageMixin
33class AttachmentMixin(TextMessageMixin):
34 PRIVILEGED_UPLOAD = False
36 @property
37 def __is_component(self) -> bool:
38 return self.session is NotImplemented
40 async def __upload(
41 self,
42 file_path: Path,
43 file_name: str | None = None,
44 content_type: str | None = None,
45 ) -> str:
46 assert config.UPLOAD_SERVICE
48 file_size = file_path.stat().st_size
50 content_type = content_type or self.xmpp.plugin["xep_0363"].default_content_type
51 iq_slot = await self.__request_upload_slot(
52 config.UPLOAD_SERVICE,
53 file_name or file_path.name,
54 file_size,
55 content_type,
56 )
57 slot = iq_slot["http_upload_slot"]
58 headers = {
59 "Content-Length": str(file_size),
60 "Content-Type": content_type,
61 **{header["name"]: header["value"] for header in slot["put"]["headers"]},
62 }
64 async with aiohttp.ClientSession() as http:
65 with file_path.open("rb") as fp:
66 async with http.put(
67 slot["put"]["url"], data=fp, headers=headers
68 ) as response:
69 response.raise_for_status()
70 return slot["get"]["url"] # type:ignore[no-any-return]
72 async def __request_upload_slot(
73 self,
74 upload_service: JID | str,
75 filename: str,
76 size: int,
77 content_type: str,
78 ) -> Iq:
79 iq_request = self.xmpp.make_iq_get(
80 ito=upload_service, ifrom=config.UPLOAD_REQUESTER or self.xmpp.boundjid
81 )
82 request = iq_request["http_upload_request"]
83 request["filename"] = filename
84 request["size"] = str(size)
85 request["content-type"] = content_type
86 if self.__is_component or not self.PRIVILEGED_UPLOAD:
87 return await iq_request.send() # type:ignore[no-any-return]
89 assert self.session is not NotImplemented
90 iq_request.set_from(self.session.user_jid)
91 return await self.xmpp["xep_0356"].send_privileged_iq(iq_request) # type:ignore[no-any-return]
93 @staticmethod
94 async def __no_upload(
95 file_path: Path,
96 file_name: str | None = None,
97 legacy_file_id: str | int | None = None,
98 ) -> tuple[Path, str]:
99 file_id = str(uuid4()) if legacy_file_id is None else str(legacy_file_id)
100 assert config.NO_UPLOAD_PATH is not None
101 assert config.NO_UPLOAD_URL_PREFIX is not None
102 destination_dir = Path(config.NO_UPLOAD_PATH) / file_id
104 if destination_dir.exists():
105 log.debug("Dest dir exists: %s", destination_dir)
106 files = list(f for f in destination_dir.glob("**/*") if f.is_file())
107 if len(files) == 1:
108 log.debug(
109 "Found the legacy attachment '%s' at '%s'",
110 legacy_file_id,
111 files[0],
112 )
113 name = files[0].name
114 uu = files[0].parent.name # anti-obvious url trick, see below
115 return files[0], "/".join([file_id, uu, name])
116 else:
117 log.warning(
118 (
119 "There are several or zero files in %s, "
120 "slidge doesn't know which one to pick among %s. "
121 "Removing the dir."
122 ),
123 destination_dir,
124 files,
125 )
126 shutil.rmtree(destination_dir)
128 log.debug("Did not find a file in: %s", destination_dir)
129 # let's use a UUID to avoid URLs being too obvious
130 uu = str(uuid4())
131 destination_dir = destination_dir / uu
132 destination_dir.mkdir(parents=True)
134 name = file_name or file_path.name
135 destination = destination_dir / name
136 method = config.NO_UPLOAD_METHOD
137 if method == "copy":
138 shutil.copy2(file_path, destination)
139 elif method == "hardlink":
140 os.link(file_path, destination)
141 elif method == "symlink":
142 os.symlink(file_path, destination, target_is_directory=True)
143 elif method == "move":
144 shutil.move(file_path, destination)
145 else:
146 raise RuntimeError("No upload method not recognized", method)
148 if config.NO_UPLOAD_FILE_READ_OTHERS:
149 log.debug("Changing perms of %s", destination)
150 destination.chmod(destination.stat().st_mode | stat.S_IROTH)
151 uploaded_url = "/".join([file_id, uu, name])
153 return destination, uploaded_url
155 async def __valid_url(self, url: str) -> bool:
156 async with self.session.http.head(url) as r:
157 return r.status < 400
159 async def __get_stored(self, attachment: LegacyAttachment) -> Attachment:
160 if attachment.legacy_file_id is not None and not self.__is_component:
161 with self.xmpp.store.session() as orm:
162 stored = (
163 orm.query(Attachment)
164 .filter_by(
165 legacy_file_id=str(attachment.legacy_file_id),
166 user_account_id=self.session.user_pk,
167 )
168 .one_or_none()
169 )
170 if stored is not None:
171 if not await self.__valid_url(stored.url):
172 stored.url = None # type:ignore
173 return stored
174 return Attachment(
175 user_account_id=None if self.__is_component else self.session.user_pk,
176 legacy_file_id=None
177 if attachment.legacy_file_id is None
178 else str(attachment.legacy_file_id),
179 url=attachment.url if config.USE_ATTACHMENT_ORIGINAL_URLS else None,
180 )
182 async def __get_url(
183 self, attachment: LegacyAttachment, stored: Attachment
184 ) -> tuple[bool, Path | None, str]:
185 file_name = attachment.name
186 content_type = attachment.content_type
187 file_path = attachment.path
189 if file_name and len(file_name) > config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH:
190 log.debug("Trimming long filename: %s", file_name)
191 base, ext = os.path.splitext(file_name)
192 file_name = (
193 base[: config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH - len(ext)] + ext
194 )
196 if file_path is None:
197 if file_name is None:
198 file_name = str(uuid4())
199 if content_type is not None:
200 ext = guess_extension(content_type, strict=False) # type:ignore
201 if ext is not None:
202 file_name += ext
203 temp_dir = Path(tempfile.mkdtemp())
204 file_path = temp_dir / file_name
205 if attachment.url:
206 async with self.session.http.get(attachment.url) as r:
207 r.raise_for_status()
208 with file_path.open("wb") as f:
209 f.write(await r.read())
211 elif attachment.stream is not None:
212 data = attachment.stream.read()
213 if data is None:
214 raise RuntimeError
216 with file_path.open("wb") as f:
217 f.write(data)
218 elif attachment.aio_stream is not None:
219 # TODO: patch slixmpp to allow this as data source for
220 # upload_file() so we don't even have to write anything
221 # to disk.
222 with file_path.open("wb") as f:
223 async for chunk in attachment.aio_stream:
224 f.write(chunk)
225 elif attachment.data is not None:
226 with file_path.open("wb") as f:
227 f.write(attachment.data)
229 is_temp = not bool(config.NO_UPLOAD_PATH)
230 else:
231 is_temp = False
233 assert isinstance(file_path, Path)
234 if config.FIX_FILENAME_SUFFIX_MIME_TYPE:
235 file_name, content_type = fix_suffix(file_path, content_type, file_name)
236 attachment.content_type = content_type
237 attachment.name = file_name
239 if config.NO_UPLOAD_PATH:
240 local_path, new_url = await self.__no_upload(
241 file_path, file_name, stored.legacy_file_id
242 )
243 new_url = (config.NO_UPLOAD_URL_PREFIX or "") + "/" + urlquote(new_url)
244 else:
245 local_path = file_path
246 new_url = await self.__upload(file_path, file_name, content_type)
247 if stored.legacy_file_id and new_url is not None:
248 stored.url = new_url
250 if local_path is not None and local_path.stat().st_size == 0:
251 raise RuntimeError("File size is 0")
253 return is_temp, local_path, new_url
255 async def __set_sims(
256 self,
257 msg: Message,
258 uploaded_url: str,
259 path: Path | None,
260 attachment: LegacyAttachment,
261 stored: Attachment,
262 ) -> Thumbnail | None:
263 if stored.sims is not None:
264 ref = self.xmpp["xep_0372"].stanza.Reference(xml=ET.fromstring(stored.sims))
265 msg.append(ref)
266 if "thumbnail" in ref["sims"]["file"]:
267 return ref["sims"]["file"]["thumbnail"] # type:ignore[no-any-return]
268 else:
269 return None
271 if not path:
272 return None
274 ref = self.xmpp["xep_0385"].get_sims(
275 path, [uploaded_url], attachment.content_type, attachment.caption
276 )
277 if attachment.name:
278 ref["sims"]["file"]["name"] = attachment.name
279 thumbnail = None
280 if attachment.content_type is not None and attachment.content_type.startswith(
281 "image"
282 ):
283 try:
284 h, x, y = await self.xmpp.loop.run_in_executor(
285 avatar_cache._thread_pool, get_thumbhash, path
286 )
287 except Exception as e:
288 log.debug("Could not generate a thumbhash", exc_info=e)
289 else:
290 thumbnail = ref["sims"]["file"]["thumbnail"]
291 thumbnail["width"] = x
292 thumbnail["height"] = y
293 thumbnail["media-type"] = "image/thumbhash"
294 thumbnail["uri"] = "data:image/thumbhash;base64," + urlquote(h)
296 stored.sims = str(ref)
297 msg.append(ref)
299 return thumbnail
301 def __set_sfs(
302 self,
303 msg: Message,
304 uploaded_url: str,
305 path: Path | None,
306 attachment: LegacyAttachment,
307 stored: Attachment,
308 thumbnail: Thumbnail | None = None,
309 ) -> None:
310 if stored.sfs is not None:
311 msg.append(StatelessFileSharing(xml=ET.fromstring(stored.sfs)))
312 return
314 if not path:
315 return
317 sfs = self.xmpp["xep_0447"].get_sfs(
318 path, [uploaded_url], attachment.content_type, attachment.caption
319 )
320 if attachment.name:
321 sfs["file"]["name"] = attachment.name
322 if attachment.disposition:
323 sfs["disposition"] = attachment.disposition
324 else:
325 del sfs["disposition"]
326 if thumbnail is not None:
327 sfs["file"].append(thumbnail)
328 stored.sfs = str(sfs)
329 msg.append(sfs)
331 async def __set_sfs_and_sims_without_download(
332 self, msg: Message, attachment: LegacyAttachment
333 ) -> None:
334 assert attachment.url is not None
336 if not any(
337 (
338 attachment.content_type,
339 attachment.name,
340 attachment.disposition,
341 )
342 ):
343 return
345 sims = self.xmpp.plugin["xep_0385"].stanza.Sims()
346 ref = self.xmpp["xep_0372"].stanza.Reference()
348 ref["uri"] = attachment.url
349 ref["type"] = "data"
350 sims["sources"].append(ref)
351 sims.enable("file")
353 xep_0447_stanza = self.xmpp.plugin["xep_0447"].stanza
354 sfs = xep_0447_stanza.StatelessFileSharing()
355 url_data = xep_0447_stanza.UrlData()
356 url_data["target"] = attachment.url
357 sfs["sources"].append(url_data)
358 sfs.enable("file")
360 if attachment.content_type:
361 sims["file"]["media-type"] = attachment.content_type
362 sfs["file"]["media-type"] = attachment.content_type
363 if attachment.caption:
364 sims["file"]["desc"] = attachment.caption
365 sfs["file"]["desc"] = attachment.caption
366 if attachment.name:
367 sims["file"]["name"] = attachment.name
368 sfs["file"]["name"] = attachment.name
369 if attachment.disposition:
370 sfs["disposition"] = attachment.disposition
372 msg.append(sims)
373 msg.append(sfs)
375 def __send_url(
376 self,
377 msg: Message,
378 legacy_msg_id: str | None,
379 uploaded_url: str,
380 caption: str | None = None,
381 carbon: bool = False,
382 when: datetime | None = None,
383 correction: bool = False,
384 **kwargs: Any, # noqa:ANN401
385 ) -> list[Message]:
386 msg["oob"]["url"] = uploaded_url
387 msg["body"] = uploaded_url
388 if "sfs" in msg:
389 msg["fallback"].enable("body")
390 msg["fallback"]["for"] = self.xmpp.plugin["xep_0447"].stanza.NAMESPACE
391 if caption:
392 if correction:
393 if not legacy_msg_id:
394 raise TypeError
395 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
396 elif legacy_msg_id:
397 self._set_msg_id(msg, legacy_msg_id)
398 m1 = self._send(msg, carbon=carbon, correction=correction, **kwargs)
399 m2 = self.send_text(
400 caption, legacy_msg_id=None, when=when, carbon=carbon, **kwargs
401 )
402 return [m1, m2] if m2 else [m1]
403 else:
404 if correction:
405 if not legacy_msg_id:
406 raise TypeError
407 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
408 elif legacy_msg_id:
409 self._set_msg_id(msg, legacy_msg_id)
410 return [self._send(msg, carbon=carbon, **kwargs)]
412 def __get_base_message(
413 self,
414 legacy_msg_id: str | None = None,
415 reply_to: MessageReference | None = None,
416 when: datetime | None = None,
417 thread: str | None = None,
418 carbon: bool = False,
419 correction: bool = False,
420 mto: JID | None = None,
421 ) -> Message:
422 if correction:
423 if not legacy_msg_id:
424 raise TypeError
425 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
426 if xmpp_ids:
427 original_xmpp_id = xmpp_ids[0]
428 for xmpp_id in xmpp_ids:
429 if xmpp_id == original_xmpp_id:
430 continue
431 self.retract(xmpp_id, thread)
433 if reply_to is not None and reply_to.body:
434 # We cannot have a "quote fallback" for attachments since most (all?)
435 # XMPP clients will only treat a message as an attachment if the
436 # body is the URL and nothing else.
437 reply_to_for_attachment: MessageReference | None = MessageReference(
438 reply_to.legacy_id, reply_to.author
439 )
440 else:
441 reply_to_for_attachment = reply_to
443 return self._make_message(
444 when=when,
445 reply_to=reply_to_for_attachment,
446 carbon=carbon,
447 mto=mto,
448 thread=thread,
449 )
451 async def send_file(
452 self,
453 attachment: LegacyAttachment | Path | str,
454 legacy_msg_id: str | None = None,
455 *,
456 reply_to: MessageReference | None = None,
457 when: datetime | None = None,
458 thread: str | None = None,
459 **kwargs: Any, # noqa:ANN401
460 ) -> tuple[str | None, list[Message]]:
461 """
462 Send a single file from this :term:`XMPP Entity`.
464 :param attachment: The file to send.
465 Ideally, a :class:`.LegacyAttachment` with a unique ``legacy_file_id``
466 attribute set, to optimise potential future reuses.
467 It can also be:
468 - a :class:`pathlib.Path` instance to point to a local file, or
469 - a ``str``, representing a fetchable HTTP URL.
470 :param legacy_msg_id: If you want to be able to transport read markers from the gateway
471 user to the legacy network, specify this
472 :param reply_to: Quote another message (:xep:`0461`)
473 :param when: when the file was sent, for a "delay" tag (:xep:`0203`)
474 :param thread:
475 """
476 coro = self.__send_file(
477 attachment,
478 legacy_msg_id,
479 reply_to=reply_to,
480 when=when,
481 thread=thread,
482 **kwargs,
483 )
484 if (
485 self.__is_component
486 or not isinstance(attachment, LegacyAttachment)
487 or attachment.legacy_file_id is None
488 ):
489 return await coro
490 else:
491 # prevents race conditions where we download the same thing several time
492 # and end up attempting to insert it twice in the DB, raising an
493 # IntegrityError.
494 async with self.session.lock(("attachment", attachment.legacy_file_id)):
495 return await coro
497 async def __send_file(
498 self,
499 attachment: LegacyAttachment | Path | str,
500 legacy_msg_id: str | None = None,
501 *,
502 reply_to: MessageReference | None = None,
503 when: datetime | None = None,
504 thread: str | None = None,
505 **kwargs: Any, # noqa:ANN401
506 ) -> tuple[str | None, list[Message]]:
507 store_multi = kwargs.pop("store_multi", True)
508 carbon = kwargs.pop("carbon", False)
509 mto = kwargs.pop("mto", None)
510 correction = kwargs.get("correction", False)
512 msg = self.__get_base_message(
513 legacy_msg_id, reply_to, when, thread, carbon, correction, mto
514 )
516 if isinstance(attachment, str):
517 attachment = LegacyAttachment(url=attachment)
518 elif isinstance(attachment, Path):
519 attachment = LegacyAttachment(path=attachment)
521 if attachment.is_sticker:
522 msg.enable("sticker")
524 stored = await self.__get_stored(attachment)
526 if attachment.content_type is None and (
527 name := (attachment.name or attachment.url or attachment.path)
528 ):
529 attachment.content_type, _ = guess_type(name)
531 if stored.url:
532 is_temp = False
533 local_path = None
534 new_url = stored.url
535 else:
536 try:
537 is_temp, local_path, new_url = await self.__get_url(attachment, stored)
538 except Exception as e:
539 log.error("Error with attachment: %s: %s", attachment, e)
540 log.debug("", exc_info=e)
541 msg["body"] = (
542 f"/me tried to send a file ({attachment.format_for_user()}), "
543 f"but something went wrong: {e}. "
544 )
545 self._set_msg_id(msg, legacy_msg_id)
546 return None, [self._send(msg, **kwargs)]
547 assert new_url is not None
549 stored.url = new_url
550 if config.USE_ATTACHMENT_ORIGINAL_URLS and attachment.url:
551 await self.__set_sfs_and_sims_without_download(msg, attachment)
552 else:
553 thumbnail = await self.__set_sims(
554 msg, new_url, local_path, attachment, stored
555 )
556 self.__set_sfs(msg, new_url, local_path, attachment, stored, thumbnail)
558 if not self.__is_component:
559 with self.xmpp.store.session(expire_on_commit=False) as orm:
560 orm.add(stored)
561 orm.commit()
563 if is_temp and isinstance(local_path, Path):
564 local_path.unlink()
565 local_path.parent.rmdir()
567 msgs = self.__send_url(
568 msg, legacy_msg_id, new_url, attachment.caption, carbon, when, **kwargs
569 )
570 if not self.__is_component and store_multi and legacy_msg_id:
571 self.__store_multi(legacy_msg_id, msgs)
572 return new_url, msgs
574 def __send_body(
575 self,
576 body: str | None = None,
577 legacy_msg_id: str | None = None,
578 reply_to: MessageReference | None = None,
579 when: datetime | None = None,
580 thread: str | None = None,
581 **kwargs: Any, # noqa:ANN401
582 ) -> Message | None:
583 if body:
584 return self.send_text(
585 body,
586 legacy_msg_id,
587 reply_to=reply_to,
588 when=when,
589 thread=thread,
590 **kwargs,
591 )
592 else:
593 return None
595 async def send_files(
596 self,
597 attachments: Collection[LegacyAttachment],
598 legacy_msg_id: str | None = None,
599 body: str | None = None,
600 *,
601 reply_to: MessageReference | None = None,
602 when: datetime | None = None,
603 thread: str | None = None,
604 body_first: bool = False,
605 correction: bool = False,
606 correction_event_id: str | None = None,
607 **kwargs: Any, # noqa:ANN401
608 ) -> None:
609 # TODO: once the epic XEP-0385 vs XEP-0447 battle is over, pick
610 # one and stop sending several attachments this way
611 # we attach the legacy_message ID to the last message we send, because
612 # we don't want several messages with the same ID (especially for MUC MAM)
613 if not attachments and not body:
614 # ignoring empty message
615 return
616 body_msg_id = (
617 legacy_msg_id if body_needs_msg_id(attachments, body, body_first) else None
618 )
619 send_body = functools.partial(
620 self.__send_body,
621 body=body,
622 reply_to=reply_to,
623 when=when,
624 thread=thread,
625 correction=correction,
626 legacy_msg_id=body_msg_id,
627 correction_event_id=correction_event_id,
628 **kwargs,
629 )
630 all_msgs = []
631 if body_first:
632 all_msgs.append(send_body())
633 for i, attachment in enumerate(attachments):
634 legacy = legacy_msg_id if i == 0 and body_msg_id is None else None
635 _url, msgs = await self.send_file(
636 attachment,
637 legacy,
638 reply_to=reply_to,
639 when=when,
640 thread=thread,
641 store_multi=False,
642 **kwargs,
643 )
644 all_msgs.extend(msgs)
645 if not body_first:
646 all_msgs.append(send_body())
647 self.__store_multi(legacy_msg_id, all_msgs)
649 def __store_multi(
650 self,
651 legacy_msg_id: str | None,
652 all_msgs: Sequence[Message | None],
653 ) -> None:
654 if legacy_msg_id is None:
655 return
656 ids = []
657 for msg in all_msgs:
658 if not msg:
659 continue
660 if stanza_id := msg.get_plugin("stanza_id", check=True):
661 ids.append(stanza_id["id"])
662 else:
663 ids.append(msg.get_id())
664 with self.xmpp.store.session() as orm:
665 self.xmpp.store.id_map.set_msg(
666 orm, self._recipient_pk(), str(legacy_msg_id), ids, self.is_participant
667 )
668 orm.commit()
671def body_needs_msg_id(
672 attachments: Collection[LegacyAttachment], body: str | None, body_first: bool
673) -> bool:
674 if attachments:
675 return bool(body and body_first)
676 else:
677 return True
680def get_thumbhash(path: Path) -> tuple[str, int, int]:
681 with path.open("rb") as fp:
682 img = Image.open(fp)
683 width, height = img.size
684 img = img.convert("RGBA")
685 if width > 100 or height > 100:
686 img.thumbnail((100, 100))
687 img = ImageOps.exif_transpose(img)
688 rgba_2d = list(img.getdata())
689 rgba = list(chain(*rgba_2d))
690 ints = thumbhash.rgba_to_thumb_hash(img.width, img.height, rgba)
691 return base64.b64encode(bytes(ints)).decode(), width, height
694log = logging.getLogger(__name__)