Coverage for slidge / core / mixins / attachment.py: 85%
362 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-06 05:07 +0000
1import base64
2import functools
3import logging
4import os
5import shutil
6import stat
7import tempfile
8from collections.abc import Collection, Sequence
9from datetime import datetime
10from itertools import chain
11from mimetypes import guess_extension, guess_type
12from pathlib import Path
13from typing import Any
14from urllib.parse import quote as urlquote
15from uuid import uuid4
16from xml.etree import ElementTree as ET
18import aiohttp
19import thumbhash
20from PIL import Image, ImageOps
21from slixmpp import JID, Iq, Message
22from slixmpp.plugins.xep_0264.stanza import Thumbnail
23from slixmpp.plugins.xep_0447.stanza import StatelessFileSharing
25from ...db.avatar import avatar_cache
26from ...db.models import Attachment
27from ...util.types import (
28 AnyMessageReference,
29 LegacyAttachment,
30 LegacyMessageType,
31 LegacyThreadType,
32)
33from ...util.util import fix_suffix
34from .. import config
35from .message_text import TextMessageMixin
38class AttachmentMixin(TextMessageMixin):
39 PRIVILEGED_UPLOAD = False
41 @property
42 def __is_component(self) -> bool:
43 return self.session is NotImplemented
45 async def __upload(
46 self,
47 file_path: Path,
48 file_name: str | None = None,
49 content_type: str | None = None,
50 ) -> str:
51 assert config.UPLOAD_SERVICE
53 file_size = file_path.stat().st_size
55 content_type = content_type or self.xmpp.plugin["xep_0363"].default_content_type
56 iq_slot = await self.__request_upload_slot(
57 config.UPLOAD_SERVICE,
58 file_name or file_path.name,
59 file_size,
60 content_type,
61 )
62 slot = iq_slot["http_upload_slot"]
63 headers = {
64 "Content-Length": str(file_size),
65 "Content-Type": content_type,
66 **{header["name"]: header["value"] for header in slot["put"]["headers"]},
67 }
69 async with aiohttp.ClientSession() as http:
70 with file_path.open("rb") as fp:
71 async with http.put(
72 slot["put"]["url"], data=fp, headers=headers
73 ) as response:
74 response.raise_for_status()
75 return slot["get"]["url"] # type:ignore[no-any-return]
77 async def __request_upload_slot(
78 self,
79 upload_service: JID | str,
80 filename: str,
81 size: int,
82 content_type: str,
83 ) -> Iq:
84 iq_request = self.xmpp.make_iq_get(
85 ito=upload_service, ifrom=config.UPLOAD_REQUESTER or self.xmpp.boundjid
86 )
87 request = iq_request["http_upload_request"]
88 request["filename"] = filename
89 request["size"] = str(size)
90 request["content-type"] = content_type
91 if self.__is_component or not self.PRIVILEGED_UPLOAD:
92 return await iq_request.send() # type:ignore[no-any-return]
94 assert self.session is not NotImplemented
95 iq_request.set_from(self.session.user_jid)
96 return await self.xmpp["xep_0356"].send_privileged_iq(iq_request) # type:ignore[no-any-return]
98 @staticmethod
99 async def __no_upload(
100 file_path: Path,
101 file_name: str | None = None,
102 legacy_file_id: str | int | None = None,
103 ) -> tuple[Path, str]:
104 file_id = str(uuid4()) if legacy_file_id is None else str(legacy_file_id)
105 assert config.NO_UPLOAD_PATH is not None
106 assert config.NO_UPLOAD_URL_PREFIX is not None
107 destination_dir = Path(config.NO_UPLOAD_PATH) / file_id
109 if destination_dir.exists():
110 log.debug("Dest dir exists: %s", destination_dir)
111 files = list(f for f in destination_dir.glob("**/*") if f.is_file())
112 if len(files) == 1:
113 log.debug(
114 "Found the legacy attachment '%s' at '%s'",
115 legacy_file_id,
116 files[0],
117 )
118 name = files[0].name
119 uu = files[0].parent.name # anti-obvious url trick, see below
120 return files[0], "/".join([file_id, uu, name])
121 else:
122 log.warning(
123 (
124 "There are several or zero files in %s, "
125 "slidge doesn't know which one to pick among %s. "
126 "Removing the dir."
127 ),
128 destination_dir,
129 files,
130 )
131 shutil.rmtree(destination_dir)
133 log.debug("Did not find a file in: %s", destination_dir)
134 # let's use a UUID to avoid URLs being too obvious
135 uu = str(uuid4())
136 destination_dir = destination_dir / uu
137 destination_dir.mkdir(parents=True)
139 name = file_name or file_path.name
140 destination = destination_dir / name
141 method = config.NO_UPLOAD_METHOD
142 if method == "copy":
143 shutil.copy2(file_path, destination)
144 elif method == "hardlink":
145 os.link(file_path, destination)
146 elif method == "symlink":
147 os.symlink(file_path, destination, target_is_directory=True)
148 elif method == "move":
149 shutil.move(file_path, destination)
150 else:
151 raise RuntimeError("No upload method not recognized", method)
153 if config.NO_UPLOAD_FILE_READ_OTHERS:
154 log.debug("Changing perms of %s", destination)
155 destination.chmod(destination.stat().st_mode | stat.S_IROTH)
156 uploaded_url = "/".join([file_id, uu, name])
158 return destination, uploaded_url
160 async def __valid_url(self, url: str) -> bool:
161 async with self.session.http.head(url) as r:
162 return r.status < 400
164 async def __get_stored(self, attachment: LegacyAttachment) -> Attachment:
165 if attachment.legacy_file_id is not None and not self.__is_component:
166 with self.xmpp.store.session() as orm:
167 stored = (
168 orm.query(Attachment)
169 .filter_by(
170 legacy_file_id=str(attachment.legacy_file_id),
171 user_account_id=self.session.user_pk,
172 )
173 .one_or_none()
174 )
175 if stored is not None:
176 if not await self.__valid_url(stored.url):
177 stored.url = None # type:ignore
178 return stored
179 return Attachment(
180 user_account_id=None if self.__is_component else self.session.user_pk,
181 legacy_file_id=None
182 if attachment.legacy_file_id is None
183 else str(attachment.legacy_file_id),
184 url=attachment.url if config.USE_ATTACHMENT_ORIGINAL_URLS else None,
185 )
187 async def __get_url(
188 self, attachment: LegacyAttachment, stored: Attachment
189 ) -> tuple[bool, Path | None, str]:
190 file_name = attachment.name
191 content_type = attachment.content_type
192 file_path = attachment.path
194 if file_name and len(file_name) > config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH:
195 log.debug("Trimming long filename: %s", file_name)
196 base, ext = os.path.splitext(file_name)
197 file_name = (
198 base[: config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH - len(ext)] + ext
199 )
201 if file_path is None:
202 if file_name is None:
203 file_name = str(uuid4())
204 if content_type is not None:
205 ext = guess_extension(content_type, strict=False) # type:ignore
206 if ext is not None:
207 file_name += ext
208 temp_dir = Path(tempfile.mkdtemp())
209 file_path = temp_dir / file_name
210 if attachment.url:
211 async with self.session.http.get(attachment.url) as r:
212 r.raise_for_status()
213 with file_path.open("wb") as f:
214 f.write(await r.read())
216 elif attachment.stream is not None:
217 data = attachment.stream.read()
218 if data is None:
219 raise RuntimeError
221 with file_path.open("wb") as f:
222 f.write(data)
223 elif attachment.aio_stream is not None:
224 # TODO: patch slixmpp to allow this as data source for
225 # upload_file() so we don't even have to write anything
226 # to disk.
227 with file_path.open("wb") as f:
228 async for chunk in attachment.aio_stream:
229 f.write(chunk)
230 elif attachment.data is not None:
231 with file_path.open("wb") as f:
232 f.write(attachment.data)
234 is_temp = not bool(config.NO_UPLOAD_PATH)
235 else:
236 is_temp = False
238 assert isinstance(file_path, Path)
239 if config.FIX_FILENAME_SUFFIX_MIME_TYPE:
240 file_name, content_type = fix_suffix(file_path, content_type, file_name)
241 attachment.content_type = content_type
242 attachment.name = file_name
244 if config.NO_UPLOAD_PATH:
245 local_path, new_url = await self.__no_upload(
246 file_path, file_name, stored.legacy_file_id
247 )
248 new_url = (config.NO_UPLOAD_URL_PREFIX or "") + "/" + urlquote(new_url)
249 else:
250 local_path = file_path
251 new_url = await self.__upload(file_path, file_name, content_type)
252 if stored.legacy_file_id and new_url is not None:
253 stored.url = new_url
255 if local_path is not None and local_path.stat().st_size == 0:
256 raise RuntimeError("File size is 0")
258 return is_temp, local_path, new_url
260 async def __set_sims(
261 self,
262 msg: Message,
263 uploaded_url: str,
264 path: Path | None,
265 attachment: LegacyAttachment,
266 stored: Attachment,
267 ) -> Thumbnail | None:
268 if stored.sims is not None:
269 ref = self.xmpp["xep_0372"].stanza.Reference(xml=ET.fromstring(stored.sims))
270 msg.append(ref)
271 if ref["sims"]["file"].get_plugin("thumbnail", check=True):
272 return ref["sims"]["file"]["thumbnail"] # type:ignore[no-any-return]
273 else:
274 return None
276 if not path:
277 return None
279 ref = self.xmpp["xep_0385"].get_sims(
280 path, [uploaded_url], attachment.content_type, attachment.caption
281 )
282 if attachment.name:
283 ref["sims"]["file"]["name"] = attachment.name
284 thumbnail = None
285 if attachment.content_type is not None and attachment.content_type.startswith(
286 "image"
287 ):
288 try:
289 h, x, y = await self.xmpp.loop.run_in_executor(
290 avatar_cache._thread_pool, get_thumbhash, path
291 )
292 except Exception as e:
293 log.debug("Could not generate a thumbhash", exc_info=e)
294 else:
295 thumbnail = ref["sims"]["file"]["thumbnail"]
296 thumbnail["width"] = x
297 thumbnail["height"] = y
298 thumbnail["media-type"] = "image/thumbhash"
299 thumbnail["uri"] = "data:image/thumbhash;base64," + urlquote(h)
301 stored.sims = str(ref)
302 msg.append(ref)
304 return thumbnail
306 def __set_sfs(
307 self,
308 msg: Message,
309 uploaded_url: str,
310 path: Path | None,
311 attachment: LegacyAttachment,
312 stored: Attachment,
313 thumbnail: Thumbnail | None = None,
314 ) -> None:
315 if stored.sfs is not None:
316 msg.append(StatelessFileSharing(xml=ET.fromstring(stored.sfs)))
317 return
319 if not path:
320 return
322 sfs = self.xmpp["xep_0447"].get_sfs(
323 path, [uploaded_url], attachment.content_type, attachment.caption
324 )
325 if attachment.name:
326 sfs["file"]["name"] = attachment.name
327 if attachment.disposition:
328 sfs["disposition"] = attachment.disposition
329 else:
330 del sfs["disposition"]
331 if thumbnail is not None:
332 sfs["file"].append(thumbnail)
333 stored.sfs = str(sfs)
334 msg.append(sfs)
336 async def __set_sfs_and_sims_without_download(
337 self, msg: Message, attachment: LegacyAttachment
338 ) -> None:
339 assert attachment.url is not None
341 if not any(
342 (
343 attachment.content_type,
344 attachment.name,
345 attachment.disposition,
346 )
347 ):
348 return
350 sims = self.xmpp.plugin["xep_0385"].stanza.Sims()
351 ref = self.xmpp["xep_0372"].stanza.Reference()
353 ref["uri"] = attachment.url
354 ref["type"] = "data"
355 sims["sources"].append(ref)
356 sims.enable("file")
358 xep_0447_stanza = self.xmpp.plugin["xep_0447"].stanza
359 sfs = xep_0447_stanza.StatelessFileSharing()
360 url_data = xep_0447_stanza.UrlData()
361 url_data["target"] = attachment.url
362 sfs["sources"].append(url_data)
363 sfs.enable("file")
365 if attachment.content_type:
366 sims["file"]["media-type"] = attachment.content_type
367 sfs["file"]["media-type"] = attachment.content_type
368 if attachment.caption:
369 sims["file"]["desc"] = attachment.caption
370 sfs["file"]["desc"] = attachment.caption
371 if attachment.name:
372 sims["file"]["name"] = attachment.name
373 sfs["file"]["name"] = attachment.name
374 if attachment.disposition:
375 sfs["disposition"] = attachment.disposition
377 msg.append(sims)
378 msg.append(sfs)
380 def __send_url(
381 self,
382 msg: Message,
383 legacy_msg_id: LegacyMessageType,
384 uploaded_url: str,
385 caption: str | None = None,
386 carbon: bool = False,
387 when: datetime | None = None,
388 correction: bool = False,
389 **kwargs: Any, # noqa:ANN401
390 ) -> list[Message]:
391 msg["oob"]["url"] = uploaded_url
392 msg["body"] = uploaded_url
393 if msg.get_plugin("sfs", check=True):
394 msg["fallback"].enable("body")
395 msg["fallback"]["for"] = self.xmpp.plugin["xep_0447"].stanza.NAMESPACE
396 if caption:
397 if correction:
398 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
399 else:
400 self._set_msg_id(msg, legacy_msg_id)
401 m1 = self._send(msg, carbon=carbon, correction=correction, **kwargs)
402 m2 = self.send_text(
403 caption, legacy_msg_id=None, when=when, carbon=carbon, **kwargs
404 )
405 return [m1, m2] if m2 else [m1]
406 else:
407 if correction:
408 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
409 else:
410 self._set_msg_id(msg, legacy_msg_id)
411 return [self._send(msg, carbon=carbon, **kwargs)]
413 def __get_base_message(
414 self,
415 legacy_msg_id: LegacyMessageType | None = None,
416 reply_to: AnyMessageReference | None = None,
417 when: datetime | None = None,
418 thread: LegacyThreadType | None = None,
419 carbon: bool = False,
420 correction: bool = False,
421 mto: JID | None = None,
422 ) -> Message:
423 if correction:
424 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
425 if xmpp_ids:
426 original_xmpp_id = xmpp_ids[0]
427 for xmpp_id in xmpp_ids:
428 if xmpp_id == original_xmpp_id:
429 continue
430 self.retract(xmpp_id, thread)
432 if reply_to is not None and reply_to.body:
433 # We cannot have a "quote fallback" for attachments since most (all?)
434 # XMPP clients will only treat a message as an attachment if the
435 # body is the URL and nothing else.
436 reply_to_for_attachment: AnyMessageReference | None = AnyMessageReference(
437 reply_to.legacy_id, reply_to.author
438 )
439 else:
440 reply_to_for_attachment = reply_to
442 return self._make_message(
443 when=when,
444 reply_to=reply_to_for_attachment,
445 carbon=carbon,
446 mto=mto,
447 thread=thread,
448 )
450 async def send_file(
451 self,
452 attachment: LegacyAttachment | Path | str,
453 legacy_msg_id: LegacyMessageType | None = None,
454 *,
455 reply_to: AnyMessageReference | None = None,
456 when: datetime | None = None,
457 thread: LegacyThreadType | None = None,
458 **kwargs: Any, # noqa:ANN401
459 ) -> tuple[str | None, list[Message]]:
460 """
461 Send a single file from this :term:`XMPP Entity`.
463 :param attachment: The file to send.
464 Ideally, a :class:`.LegacyAttachment` with a unique ``legacy_file_id``
465 attribute set, to optimise potential future reuses.
466 It can also be:
467 - a :class:`pathlib.Path` instance to point to a local file, or
468 - a ``str``, representing a fetchable HTTP URL.
469 :param legacy_msg_id: If you want to be able to transport read markers from the gateway
470 user to the legacy network, specify this
471 :param reply_to: Quote another message (:xep:`0461`)
472 :param when: when the file was sent, for a "delay" tag (:xep:`0203`)
473 :param thread:
474 """
475 coro = self.__send_file(
476 attachment,
477 legacy_msg_id,
478 reply_to=reply_to,
479 when=when,
480 thread=thread,
481 **kwargs,
482 )
483 if self.__is_component:
484 return await coro
485 elif not isinstance(attachment, LegacyAttachment):
486 return await coro
487 elif attachment.legacy_file_id is None:
488 return await coro
489 else:
490 # prevents race conditions where we download the same thing several time
491 # and end up attempting to insert it twice in the DB, raising an
492 # IntegrityError.
493 async with self.session.lock(("attachment", attachment.legacy_file_id)):
494 return await coro
496 async def __send_file(
497 self,
498 attachment: LegacyAttachment | Path | str,
499 legacy_msg_id: LegacyMessageType | None = None,
500 *,
501 reply_to: AnyMessageReference | None = None,
502 when: datetime | None = None,
503 thread: LegacyThreadType | None = None,
504 **kwargs: Any, # noqa:ANN401
505 ) -> tuple[str | None, list[Message]]:
506 store_multi = kwargs.pop("store_multi", True)
507 carbon = kwargs.pop("carbon", False)
508 mto = kwargs.pop("mto", None)
509 correction = kwargs.get("correction", False)
511 msg = self.__get_base_message(
512 legacy_msg_id, reply_to, when, thread, carbon, correction, mto
513 )
515 if isinstance(attachment, str):
516 attachment = LegacyAttachment(url=attachment)
517 elif isinstance(attachment, Path):
518 attachment = LegacyAttachment(path=attachment)
520 stored = await self.__get_stored(attachment)
522 if attachment.content_type is None and (
523 name := (attachment.name or attachment.url or attachment.path)
524 ):
525 attachment.content_type, _ = guess_type(name)
527 if stored.url:
528 is_temp = False
529 local_path = None
530 new_url = stored.url
531 else:
532 try:
533 is_temp, local_path, new_url = await self.__get_url(attachment, stored)
534 except Exception as e:
535 log.error("Error with attachment: %s: %s", attachment, e)
536 log.debug("", exc_info=e)
537 msg["body"] = (
538 f"/me tried to send a file ({attachment.format_for_user()}), "
539 f"but something went wrong: {e}. "
540 )
541 self._set_msg_id(msg, legacy_msg_id)
542 return None, [self._send(msg, **kwargs)]
543 assert new_url is not None
545 stored.url = new_url
546 if config.USE_ATTACHMENT_ORIGINAL_URLS and attachment.url:
547 await self.__set_sfs_and_sims_without_download(msg, attachment)
548 else:
549 thumbnail = await self.__set_sims(
550 msg, new_url, local_path, attachment, stored
551 )
552 self.__set_sfs(msg, new_url, local_path, attachment, stored, thumbnail)
554 if not self.__is_component:
555 with self.xmpp.store.session(expire_on_commit=False) as orm:
556 orm.add(stored)
557 orm.commit()
559 if is_temp and isinstance(local_path, Path):
560 local_path.unlink()
561 local_path.parent.rmdir()
563 msgs = self.__send_url(
564 msg, legacy_msg_id, new_url, attachment.caption, carbon, when, **kwargs
565 )
566 if not self.__is_component:
567 if store_multi:
568 self.__store_multi(legacy_msg_id, msgs)
569 return new_url, msgs
571 def __send_body(
572 self,
573 body: str | None = None,
574 legacy_msg_id: LegacyMessageType | None = None,
575 reply_to: AnyMessageReference | None = None,
576 when: datetime | None = None,
577 thread: LegacyThreadType | None = None,
578 **kwargs: Any, # noqa:ANN401
579 ) -> Message | None:
580 if body:
581 return self.send_text(
582 body,
583 legacy_msg_id,
584 reply_to=reply_to,
585 when=when,
586 thread=thread,
587 **kwargs,
588 )
589 else:
590 return None
592 async def send_files(
593 self,
594 attachments: Collection[LegacyAttachment],
595 legacy_msg_id: LegacyMessageType | None = None,
596 body: str | None = None,
597 *,
598 reply_to: AnyMessageReference | None = None,
599 when: datetime | None = None,
600 thread: LegacyThreadType | None = None,
601 body_first: bool = False,
602 correction: bool = False,
603 correction_event_id: LegacyMessageType | None = None,
604 **kwargs: Any, # noqa:ANN401
605 ) -> None:
606 # TODO: once the epic XEP-0385 vs XEP-0447 battle is over, pick
607 # one and stop sending several attachments this way
608 # we attach the legacy_message ID to the last message we send, because
609 # we don't want several messages with the same ID (especially for MUC MAM)
610 if not attachments and not body:
611 # ignoring empty message
612 return
613 body_msg_id = (
614 legacy_msg_id if body_needs_msg_id(attachments, body, body_first) else None
615 )
616 send_body = functools.partial(
617 self.__send_body,
618 body=body,
619 reply_to=reply_to,
620 when=when,
621 thread=thread,
622 correction=correction,
623 legacy_msg_id=body_msg_id,
624 correction_event_id=correction_event_id,
625 **kwargs,
626 )
627 all_msgs = []
628 if body_first:
629 all_msgs.append(send_body())
630 for i, attachment in enumerate(attachments):
631 if i == 0 and body_msg_id is None:
632 legacy = legacy_msg_id
633 else:
634 legacy = None
635 _url, msgs = await self.send_file(
636 attachment,
637 legacy,
638 reply_to=reply_to,
639 when=when,
640 thread=thread,
641 store_multi=False,
642 **kwargs,
643 )
644 all_msgs.extend(msgs)
645 if not body_first:
646 all_msgs.append(send_body())
647 self.__store_multi(legacy_msg_id, all_msgs)
649 def __store_multi(
650 self,
651 legacy_msg_id: LegacyMessageType | None,
652 all_msgs: Sequence[Message | None],
653 ) -> None:
654 if legacy_msg_id is None:
655 return
656 ids = []
657 for msg in all_msgs:
658 if not msg:
659 continue
660 if stanza_id := msg.get_plugin("stanza_id", check=True):
661 ids.append(stanza_id["id"])
662 else:
663 ids.append(msg.get_id())
664 with self.xmpp.store.session() as orm:
665 self.xmpp.store.id_map.set_msg(
666 orm, self._recipient_pk(), str(legacy_msg_id), ids, self.is_participant
667 )
668 orm.commit()
671def body_needs_msg_id(
672 attachments: Collection[LegacyAttachment], body: str | None, body_first: bool
673) -> bool:
674 if attachments:
675 return bool(body and body_first)
676 else:
677 return True
680def get_thumbhash(path: Path) -> tuple[str, int, int]:
681 with path.open("rb") as fp:
682 img = Image.open(fp)
683 width, height = img.size
684 img = img.convert("RGBA")
685 if width > 100 or height > 100:
686 img.thumbnail((100, 100))
687 img = ImageOps.exif_transpose(img)
688 rgba_2d = list(img.getdata())
689 rgba = list(chain(*rgba_2d))
690 ints = thumbhash.rgba_to_thumb_hash(img.width, img.height, rgba)
691 return base64.b64encode(bytes(ints)).decode(), width, height
694log = logging.getLogger(__name__)