Coverage for slidge / core / mixins / attachment.py: 84%
361 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-01-06 15:18 +0000
1import base64
2import functools
3import logging
4import os
5import shutil
6import stat
7import tempfile
8from datetime import datetime
9from itertools import chain
10from mimetypes import guess_extension, guess_type
11from pathlib import Path
12from typing import Collection, Optional, Sequence, Union
13from urllib.parse import quote as urlquote
14from uuid import uuid4
15from xml.etree import ElementTree as ET
17import aiohttp
18import thumbhash
19from PIL import Image, ImageOps
20from slixmpp import JID, Iq, Message
21from slixmpp.plugins.xep_0264.stanza import Thumbnail
22from slixmpp.plugins.xep_0447.stanza import StatelessFileSharing
24from ...db.avatar import avatar_cache
25from ...db.models import Attachment
26from ...util.types import (
27 LegacyAttachment,
28 LegacyMessageType,
29 LegacyThreadType,
30 MessageReference,
31)
32from ...util.util import fix_suffix
33from .. import config
34from .message_text import TextMessageMixin
37class AttachmentMixin(TextMessageMixin):
38 PRIVILEGED_UPLOAD = False
40 @property
41 def __is_component(self) -> bool:
42 return self.session is NotImplemented
44 async def __upload(
45 self,
46 file_path: Path,
47 file_name: Optional[str] = None,
48 content_type: Optional[str] = None,
49 ) -> str:
50 assert config.UPLOAD_SERVICE
52 file_size = file_path.stat().st_size
54 content_type = content_type or self.xmpp.plugin["xep_0363"].default_content_type
55 iq_slot = await self.__request_upload_slot(
56 config.UPLOAD_SERVICE,
57 file_name or file_path.name,
58 file_size,
59 content_type,
60 )
61 slot = iq_slot["http_upload_slot"]
62 headers = {
63 "Content-Length": str(file_size),
64 "Content-Type": content_type,
65 **{header["name"]: header["value"] for header in slot["put"]["headers"]},
66 }
68 async with aiohttp.ClientSession() as http:
69 with file_path.open("rb") as fp:
70 async with http.put(
71 slot["put"]["url"], data=fp, headers=headers
72 ) as response:
73 response.raise_for_status()
74 return slot["get"]["url"]
76 async def __request_upload_slot(
77 self,
78 upload_service: JID | str,
79 filename: str,
80 size: int,
81 content_type: str,
82 ) -> Iq:
83 iq_request = self.xmpp.make_iq_get(
84 ito=upload_service, ifrom=config.UPLOAD_REQUESTER or self.xmpp.boundjid
85 )
86 request = iq_request["http_upload_request"]
87 request["filename"] = filename
88 request["size"] = str(size)
89 request["content-type"] = content_type
90 if self.__is_component or not self.PRIVILEGED_UPLOAD:
91 return await iq_request.send()
93 assert self.session is not NotImplemented
94 iq_request.set_from(self.session.user_jid)
95 return await self.xmpp["xep_0356"].send_privileged_iq(iq_request)
97 @staticmethod
98 async def __no_upload(
99 file_path: Path,
100 file_name: Optional[str] = None,
101 legacy_file_id: Optional[Union[str, int]] = None,
102 ) -> tuple[Path, str]:
103 file_id = str(uuid4()) if legacy_file_id is None else str(legacy_file_id)
104 assert config.NO_UPLOAD_PATH is not None
105 assert config.NO_UPLOAD_URL_PREFIX is not None
106 destination_dir = Path(config.NO_UPLOAD_PATH) / file_id
108 if destination_dir.exists():
109 log.debug("Dest dir exists: %s", destination_dir)
110 files = list(f for f in destination_dir.glob("**/*") if f.is_file())
111 if len(files) == 1:
112 log.debug(
113 "Found the legacy attachment '%s' at '%s'",
114 legacy_file_id,
115 files[0],
116 )
117 name = files[0].name
118 uu = files[0].parent.name # anti-obvious url trick, see below
119 return files[0], "/".join([file_id, uu, name])
120 else:
121 log.warning(
122 (
123 "There are several or zero files in %s, "
124 "slidge doesn't know which one to pick among %s. "
125 "Removing the dir."
126 ),
127 destination_dir,
128 files,
129 )
130 shutil.rmtree(destination_dir)
132 log.debug("Did not find a file in: %s", destination_dir)
133 # let's use a UUID to avoid URLs being too obvious
134 uu = str(uuid4())
135 destination_dir = destination_dir / uu
136 destination_dir.mkdir(parents=True)
138 name = file_name or file_path.name
139 destination = destination_dir / name
140 method = config.NO_UPLOAD_METHOD
141 if method == "copy":
142 shutil.copy2(file_path, destination)
143 elif method == "hardlink":
144 os.link(file_path, destination)
145 elif method == "symlink":
146 os.symlink(file_path, destination, target_is_directory=True)
147 elif method == "move":
148 shutil.move(file_path, destination)
149 else:
150 raise RuntimeError("No upload method not recognized", method)
152 if config.NO_UPLOAD_FILE_READ_OTHERS:
153 log.debug("Changing perms of %s", destination)
154 destination.chmod(destination.stat().st_mode | stat.S_IROTH)
155 uploaded_url = "/".join([file_id, uu, name])
157 return destination, uploaded_url
159 async def __valid_url(self, url: str) -> bool:
160 async with self.session.http.head(url) as r:
161 return r.status < 400
163 async def __get_stored(self, attachment: LegacyAttachment) -> Attachment:
164 if attachment.legacy_file_id is not None and not self.__is_component:
165 with self.xmpp.store.session() as orm:
166 stored = (
167 orm.query(Attachment)
168 .filter_by(
169 legacy_file_id=str(attachment.legacy_file_id),
170 user_account_id=self.session.user_pk,
171 )
172 .one_or_none()
173 )
174 if stored is not None:
175 if not await self.__valid_url(stored.url):
176 stored.url = None # type:ignore
177 return stored
178 return Attachment(
179 user_account_id=None if self.__is_component else self.session.user_pk,
180 legacy_file_id=None
181 if attachment.legacy_file_id is None
182 else str(attachment.legacy_file_id),
183 url=attachment.url if config.USE_ATTACHMENT_ORIGINAL_URLS else None,
184 )
186 async def __get_url(
187 self, attachment: LegacyAttachment, stored: Attachment
188 ) -> tuple[bool, Path | None, str]:
189 file_name = attachment.name
190 content_type = attachment.content_type
191 file_path = attachment.path
193 if file_name and len(file_name) > config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH:
194 log.debug("Trimming long filename: %s", file_name)
195 base, ext = os.path.splitext(file_name)
196 file_name = (
197 base[: config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH - len(ext)] + ext
198 )
200 if file_path is None:
201 if file_name is None:
202 file_name = str(uuid4())
203 if content_type is not None:
204 ext = guess_extension(content_type, strict=False) # type:ignore
205 if ext is not None:
206 file_name += ext
207 temp_dir = Path(tempfile.mkdtemp())
208 file_path = temp_dir / file_name
209 if attachment.url:
210 async with self.session.http.get(attachment.url) as r:
211 r.raise_for_status()
212 with file_path.open("wb") as f:
213 f.write(await r.read())
215 elif attachment.stream is not None:
216 data = attachment.stream.read()
217 if data is None:
218 raise RuntimeError
220 with file_path.open("wb") as f:
221 f.write(data)
222 elif attachment.aio_stream is not None:
223 # TODO: patch slixmpp to allow this as data source for
224 # upload_file() so we don't even have to write anything
225 # to disk.
226 with file_path.open("wb") as f:
227 async for chunk in attachment.aio_stream:
228 f.write(chunk)
229 elif attachment.data is not None:
230 with file_path.open("wb") as f:
231 f.write(attachment.data)
233 is_temp = not bool(config.NO_UPLOAD_PATH)
234 else:
235 is_temp = False
237 assert isinstance(file_path, Path)
238 if config.FIX_FILENAME_SUFFIX_MIME_TYPE:
239 file_name, content_type = fix_suffix(file_path, content_type, file_name)
240 attachment.content_type = content_type
241 attachment.name = file_name
243 if config.NO_UPLOAD_PATH:
244 local_path, new_url = await self.__no_upload(
245 file_path, file_name, stored.legacy_file_id
246 )
247 new_url = (config.NO_UPLOAD_URL_PREFIX or "") + "/" + urlquote(new_url)
248 else:
249 local_path = file_path
250 new_url = await self.__upload(file_path, file_name, content_type)
251 if stored.legacy_file_id and new_url is not None:
252 stored.url = new_url
254 if local_path is not None and local_path.stat().st_size == 0:
255 raise RuntimeError("File size is 0")
257 return is_temp, local_path, new_url
259 async def __set_sims(
260 self,
261 msg: Message,
262 uploaded_url: str,
263 path: Optional[Path],
264 attachment: LegacyAttachment,
265 stored: Attachment,
266 ) -> Thumbnail | None:
267 if stored.sims is not None:
268 ref = self.xmpp["xep_0372"].stanza.Reference(xml=ET.fromstring(stored.sims))
269 msg.append(ref)
270 if ref["sims"]["file"].get_plugin("thumbnail", check=True):
271 return ref["sims"]["file"]["thumbnail"]
272 else:
273 return None
275 if not path:
276 return None
278 ref = self.xmpp["xep_0385"].get_sims(
279 path, [uploaded_url], attachment.content_type, attachment.caption
280 )
281 if attachment.name:
282 ref["sims"]["file"]["name"] = attachment.name
283 thumbnail = None
284 if attachment.content_type is not None and attachment.content_type.startswith(
285 "image"
286 ):
287 try:
288 h, x, y = await self.xmpp.loop.run_in_executor(
289 avatar_cache._thread_pool, get_thumbhash, path
290 )
291 except Exception as e:
292 log.debug("Could not generate a thumbhash", exc_info=e)
293 else:
294 thumbnail = ref["sims"]["file"]["thumbnail"]
295 thumbnail["width"] = x
296 thumbnail["height"] = y
297 thumbnail["media-type"] = "image/thumbhash"
298 thumbnail["uri"] = "data:image/thumbhash;base64," + urlquote(h)
300 stored.sims = str(ref)
301 msg.append(ref)
303 return thumbnail
305 def __set_sfs(
306 self,
307 msg: Message,
308 uploaded_url: str,
309 path: Optional[Path],
310 attachment: LegacyAttachment,
311 stored: Attachment,
312 thumbnail: Optional[Thumbnail] = None,
313 ) -> None:
314 if stored.sfs is not None:
315 msg.append(StatelessFileSharing(xml=ET.fromstring(stored.sfs)))
316 return
318 if not path:
319 return
321 sfs = self.xmpp["xep_0447"].get_sfs(
322 path, [uploaded_url], attachment.content_type, attachment.caption
323 )
324 if attachment.name:
325 sfs["file"]["name"] = attachment.name
326 if attachment.disposition:
327 sfs["disposition"] = attachment.disposition
328 else:
329 del sfs["disposition"]
330 if thumbnail is not None:
331 sfs["file"].append(thumbnail)
332 stored.sfs = str(sfs)
333 msg.append(sfs)
335 async def __set_sfs_and_sims_without_download(
336 self, msg: Message, attachment: LegacyAttachment
337 ) -> None:
338 assert attachment.url is not None
340 if not any(
341 (
342 attachment.content_type,
343 attachment.name,
344 attachment.disposition,
345 )
346 ):
347 return
349 sims = self.xmpp.plugin["xep_0385"].stanza.Sims()
350 ref = self.xmpp["xep_0372"].stanza.Reference()
352 ref["uri"] = attachment.url
353 ref["type"] = "data"
354 sims["sources"].append(ref)
355 sims.enable("file")
357 xep_0447_stanza = self.xmpp.plugin["xep_0447"].stanza
358 sfs = xep_0447_stanza.StatelessFileSharing()
359 url_data = xep_0447_stanza.UrlData()
360 url_data["target"] = attachment.url
361 sfs["sources"].append(url_data)
362 sfs.enable("file")
364 if attachment.content_type:
365 sims["file"]["media-type"] = attachment.content_type
366 sfs["file"]["media-type"] = attachment.content_type
367 if attachment.caption:
368 sims["file"]["desc"] = attachment.caption
369 sfs["file"]["desc"] = attachment.caption
370 if attachment.name:
371 sims["file"]["name"] = attachment.name
372 sfs["file"]["name"] = attachment.name
373 if attachment.disposition:
374 sfs["disposition"] = attachment.disposition
376 msg.append(sims)
377 msg.append(sfs)
379 def __send_url(
380 self,
381 msg: Message,
382 legacy_msg_id: LegacyMessageType,
383 uploaded_url: str,
384 caption: Optional[str] = None,
385 carbon: bool = False,
386 when: Optional[datetime] = None,
387 correction: bool = False,
388 **kwargs,
389 ) -> list[Message]:
390 msg["oob"]["url"] = uploaded_url
391 msg["body"] = uploaded_url
392 if msg.get_plugin("sfs", check=True):
393 msg["fallback"].enable("body")
394 msg["fallback"]["for"] = self.xmpp.plugin["xep_0447"].stanza.NAMESPACE
395 if caption:
396 if correction:
397 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
398 else:
399 self._set_msg_id(msg, legacy_msg_id)
400 m1 = self._send(msg, carbon=carbon, correction=correction, **kwargs)
401 m2 = self.send_text(
402 caption, legacy_msg_id=None, when=when, carbon=carbon, **kwargs
403 )
404 return [m1, m2] if m2 else [m1]
405 else:
406 if correction:
407 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
408 else:
409 self._set_msg_id(msg, legacy_msg_id)
410 return [self._send(msg, carbon=carbon, **kwargs)]
412 def __get_base_message(
413 self,
414 legacy_msg_id: Optional[LegacyMessageType] = None,
415 reply_to: Optional[MessageReference] = None,
416 when: Optional[datetime] = None,
417 thread: Optional[LegacyThreadType] = None,
418 carbon: bool = False,
419 correction: bool = False,
420 mto: Optional[JID] = None,
421 ) -> Message:
422 if correction:
423 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
424 if xmpp_ids:
425 original_xmpp_id = xmpp_ids[0]
426 for xmpp_id in xmpp_ids:
427 if xmpp_id == original_xmpp_id:
428 continue
429 self.retract(xmpp_id, thread)
431 if reply_to is not None and reply_to.body:
432 # We cannot have a "quote fallback" for attachments since most (all?)
433 # XMPP clients will only treat a message as an attachment if the
434 # body is the URL and nothing else.
435 reply_to_for_attachment: MessageReference | None = MessageReference(
436 reply_to.legacy_id, reply_to.author
437 )
438 else:
439 reply_to_for_attachment = reply_to
441 return self._make_message(
442 when=when,
443 reply_to=reply_to_for_attachment,
444 carbon=carbon,
445 mto=mto,
446 thread=thread,
447 )
449 async def send_file(
450 self,
451 attachment: LegacyAttachment | Path | str,
452 legacy_msg_id: Optional[LegacyMessageType] = None,
453 *,
454 reply_to: Optional[MessageReference] = None,
455 when: Optional[datetime] = None,
456 thread: Optional[LegacyThreadType] = None,
457 **kwargs,
458 ) -> tuple[Optional[str], list[Message]]:
459 """
460 Send a single file from this :term:`XMPP Entity`.
462 :param attachment: The file to send.
463 Ideally, a :class:`.LegacyAttachment` with a unique ``legacy_file_id``
464 attribute set, to optimise potential future reuses.
465 It can also be:
466 - a :class:`pathlib.Path` instance to point to a local file, or
467 - a ``str``, representing a fetchable HTTP URL.
468 :param legacy_msg_id: If you want to be able to transport read markers from the gateway
469 user to the legacy network, specify this
470 :param reply_to: Quote another message (:xep:`0461`)
471 :param when: when the file was sent, for a "delay" tag (:xep:`0203`)
472 :param thread:
473 """
474 coro = self.__send_file(
475 attachment,
476 legacy_msg_id,
477 reply_to=reply_to,
478 when=when,
479 thread=thread,
480 **kwargs,
481 )
482 if self.__is_component:
483 return await coro
484 elif not isinstance(attachment, LegacyAttachment):
485 return await coro
486 elif attachment.legacy_file_id is None:
487 return await coro
488 else:
489 # prevents race conditions where we download the same thing several time
490 # and end up attempting to insert it twice in the DB, raising an
491 # IntegrityError.
492 async with self.session.lock(("attachment", attachment.legacy_file_id)):
493 return await coro
495 async def __send_file(
496 self,
497 attachment: LegacyAttachment | Path | str,
498 legacy_msg_id: Optional[LegacyMessageType] = None,
499 *,
500 reply_to: Optional[MessageReference] = None,
501 when: Optional[datetime] = None,
502 thread: Optional[LegacyThreadType] = None,
503 **kwargs,
504 ) -> tuple[Optional[str], list[Message]]:
505 store_multi = kwargs.pop("store_multi", True)
506 carbon = kwargs.pop("carbon", False)
507 mto = kwargs.pop("mto", None)
508 correction = kwargs.get("correction", False)
510 msg = self.__get_base_message(
511 legacy_msg_id, reply_to, when, thread, carbon, correction, mto
512 )
514 if isinstance(attachment, str):
515 attachment = LegacyAttachment(url=attachment)
516 elif isinstance(attachment, Path):
517 attachment = LegacyAttachment(path=attachment)
519 stored = await self.__get_stored(attachment)
521 if attachment.content_type is None and (
522 name := (attachment.name or attachment.url or attachment.path)
523 ):
524 attachment.content_type, _ = guess_type(name)
526 if stored.url:
527 is_temp = False
528 local_path = None
529 new_url = stored.url
530 else:
531 try:
532 is_temp, local_path, new_url = await self.__get_url(attachment, stored)
533 except Exception as e:
534 log.error("Error with attachment: %s: %s", attachment, e)
535 log.debug("", exc_info=e)
536 msg["body"] = (
537 f"/me tried to send a file ({attachment.format_for_user()}), "
538 f"but something went wrong: {e}. "
539 )
540 self._set_msg_id(msg, legacy_msg_id)
541 return None, [self._send(msg, **kwargs)]
542 assert new_url is not None
544 stored.url = new_url
545 if config.USE_ATTACHMENT_ORIGINAL_URLS and attachment.url:
546 await self.__set_sfs_and_sims_without_download(msg, attachment)
547 else:
548 thumbnail = await self.__set_sims(
549 msg, new_url, local_path, attachment, stored
550 )
551 self.__set_sfs(msg, new_url, local_path, attachment, stored, thumbnail)
553 if not self.__is_component:
554 with self.xmpp.store.session(expire_on_commit=False) as orm:
555 orm.add(stored)
556 orm.commit()
558 if is_temp and isinstance(local_path, Path):
559 local_path.unlink()
560 local_path.parent.rmdir()
562 msgs = self.__send_url(
563 msg, legacy_msg_id, new_url, attachment.caption, carbon, when, **kwargs
564 )
565 if not self.__is_component:
566 if store_multi:
567 self.__store_multi(legacy_msg_id, msgs)
568 return new_url, msgs
570 def __send_body(
571 self,
572 body: Optional[str] = None,
573 legacy_msg_id: Optional[LegacyMessageType] = None,
574 reply_to: Optional[MessageReference] = None,
575 when: Optional[datetime] = None,
576 thread: Optional[LegacyThreadType] = None,
577 **kwargs,
578 ) -> Optional[Message]:
579 if body:
580 return self.send_text(
581 body,
582 legacy_msg_id,
583 reply_to=reply_to,
584 when=when,
585 thread=thread,
586 **kwargs,
587 )
588 else:
589 return None
591 async def send_files(
592 self,
593 attachments: Collection[LegacyAttachment],
594 legacy_msg_id: Optional[LegacyMessageType] = None,
595 body: Optional[str] = None,
596 *,
597 reply_to: Optional[MessageReference] = None,
598 when: Optional[datetime] = None,
599 thread: Optional[LegacyThreadType] = None,
600 body_first: bool = False,
601 correction: bool = False,
602 correction_event_id: Optional[LegacyMessageType] = None,
603 **kwargs,
604 ) -> None:
605 # TODO: once the epic XEP-0385 vs XEP-0447 battle is over, pick
606 # one and stop sending several attachments this way
607 # we attach the legacy_message ID to the last message we send, because
608 # we don't want several messages with the same ID (especially for MUC MAM)
609 if not attachments and not body:
610 # ignoring empty message
611 return
612 body_msg_id = (
613 legacy_msg_id if body_needs_msg_id(attachments, body, body_first) else None
614 )
615 send_body = functools.partial(
616 self.__send_body,
617 body=body,
618 reply_to=reply_to,
619 when=when,
620 thread=thread,
621 correction=correction,
622 legacy_msg_id=body_msg_id,
623 correction_event_id=correction_event_id,
624 **kwargs,
625 )
626 all_msgs = []
627 if body_first:
628 all_msgs.append(send_body())
629 for i, attachment in enumerate(attachments):
630 if i == 0 and body_msg_id is None:
631 legacy = legacy_msg_id
632 else:
633 legacy = None
634 _url, msgs = await self.send_file(
635 attachment,
636 legacy,
637 reply_to=reply_to,
638 when=when,
639 thread=thread,
640 store_multi=False,
641 **kwargs,
642 )
643 all_msgs.extend(msgs)
644 if not body_first:
645 all_msgs.append(send_body())
646 self.__store_multi(legacy_msg_id, all_msgs)
648 def __store_multi(
649 self,
650 legacy_msg_id: Optional[LegacyMessageType],
651 all_msgs: Sequence[Optional[Message]],
652 ) -> None:
653 if legacy_msg_id is None:
654 return
655 ids = []
656 for msg in all_msgs:
657 if not msg:
658 continue
659 if stanza_id := msg.get_plugin("stanza_id", check=True):
660 ids.append(stanza_id["id"])
661 else:
662 ids.append(msg.get_id())
663 with self.xmpp.store.session() as orm:
664 self.xmpp.store.id_map.set_msg(
665 orm, self._recipient_pk(), str(legacy_msg_id), ids, self.is_participant
666 )
667 orm.commit()
670def body_needs_msg_id(
671 attachments: Collection[LegacyAttachment], body: str | None, body_first: bool
672) -> bool:
673 if attachments:
674 return bool(body and body_first)
675 else:
676 return True
679def get_thumbhash(path: Path) -> tuple[str, int, int]:
680 with path.open("rb") as fp:
681 img = Image.open(fp)
682 width, height = img.size
683 img = img.convert("RGBA")
684 if width > 100 or height > 100:
685 img.thumbnail((100, 100))
686 img = ImageOps.exif_transpose(img)
687 rgba_2d = list(img.getdata())
688 rgba = list(chain(*rgba_2d))
689 ints = thumbhash.rgba_to_thumb_hash(img.width, img.height, rgba)
690 return base64.b64encode(bytes(ints)).decode(), width, height
693log = logging.getLogger(__name__)