Coverage for slidge/core/mixins/attachment.py: 85%
363 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-26 19:34 +0000
1import base64
2import functools
3import logging
4import os
5import re
6import shutil
7import stat
8import tempfile
9import warnings
10from datetime import datetime
11from itertools import chain
12from mimetypes import guess_extension, guess_type
13from pathlib import Path
14from typing import Collection, Optional, Sequence, Union
15from urllib.parse import quote as urlquote
16from uuid import uuid4
17from xml.etree import ElementTree as ET
19import aiohttp
20import thumbhash
21from PIL import Image, ImageOps
22from slixmpp import JID, Iq, Message
23from slixmpp.plugins.xep_0264.stanza import Thumbnail
24from slixmpp.plugins.xep_0447.stanza import StatelessFileSharing
26from ...db.avatar import avatar_cache
27from ...db.models import Attachment
28from ...util.types import (
29 LegacyAttachment,
30 LegacyMessageType,
31 LegacyThreadType,
32 MessageReference,
33)
34from ...util.util import fix_suffix
35from .. import config
36from .message_text import TextMessageMixin
39class AttachmentMixin(TextMessageMixin):
40 PRIVILEGED_UPLOAD = False
42 @property
43 def __is_component(self) -> bool:
44 return self.session is NotImplemented
46 async def __upload(
47 self,
48 file_path: Path,
49 file_name: Optional[str] = None,
50 content_type: Optional[str] = None,
51 ) -> str:
52 assert config.UPLOAD_SERVICE
54 file_size = file_path.stat().st_size
56 content_type = content_type or self.xmpp.plugin["xep_0363"].default_content_type
57 iq_slot = await self.__request_upload_slot(
58 config.UPLOAD_SERVICE,
59 file_name or file_path.name,
60 file_size,
61 content_type,
62 )
63 slot = iq_slot["http_upload_slot"]
64 headers = {
65 "Content-Length": str(file_size),
66 "Content-Type": content_type,
67 **{header["name"]: header["value"] for header in slot["put"]["headers"]},
68 }
70 async with aiohttp.ClientSession() as http:
71 with file_path.open("rb") as fp:
72 async with http.put(
73 slot["put"]["url"], data=fp, headers=headers
74 ) as response:
75 response.raise_for_status()
76 return slot["get"]["url"]
78 async def __request_upload_slot(
79 self,
80 upload_service: JID | str,
81 filename: str,
82 size: int,
83 content_type: str,
84 ) -> Iq:
85 iq_request = self.xmpp.make_iq_get(
86 ito=upload_service, ifrom=config.UPLOAD_REQUESTER or self.xmpp.boundjid
87 )
88 request = iq_request["http_upload_request"]
89 request["filename"] = filename
90 request["size"] = str(size)
91 request["content-type"] = content_type
92 if self.__is_component or not self.PRIVILEGED_UPLOAD:
93 return await iq_request.send()
95 assert self.session is not NotImplemented
96 iq_request.set_from(self.session.user_jid)
97 return await self.xmpp["xep_0356"].send_privileged_iq(iq_request)
99 @staticmethod
100 async def __no_upload(
101 file_path: Path,
102 file_name: Optional[str] = None,
103 legacy_file_id: Optional[Union[str, int]] = None,
104 ) -> tuple[Path, str]:
105 file_id = str(uuid4()) if legacy_file_id is None else str(legacy_file_id)
106 assert config.NO_UPLOAD_PATH is not None
107 assert config.NO_UPLOAD_URL_PREFIX is not None
108 destination_dir = Path(config.NO_UPLOAD_PATH) / file_id
110 if destination_dir.exists():
111 log.debug("Dest dir exists: %s", destination_dir)
112 files = list(f for f in destination_dir.glob("**/*") if f.is_file())
113 if len(files) == 1:
114 log.debug(
115 "Found the legacy attachment '%s' at '%s'",
116 legacy_file_id,
117 files[0],
118 )
119 name = files[0].name
120 uu = files[0].parent.name # anti-obvious url trick, see below
121 return files[0], "/".join([file_id, uu, name])
122 else:
123 log.warning(
124 (
125 "There are several or zero files in %s, "
126 "slidge doesn't know which one to pick among %s. "
127 "Removing the dir."
128 ),
129 destination_dir,
130 files,
131 )
132 shutil.rmtree(destination_dir)
134 log.debug("Did not find a file in: %s", destination_dir)
135 # let's use a UUID to avoid URLs being too obvious
136 uu = str(uuid4())
137 destination_dir = destination_dir / uu
138 destination_dir.mkdir(parents=True)
140 name = file_name or file_path.name
141 destination = destination_dir / name
142 method = config.NO_UPLOAD_METHOD
143 if method == "copy":
144 shutil.copy2(file_path, destination)
145 elif method == "hardlink":
146 os.link(file_path, destination)
147 elif method == "symlink":
148 os.symlink(file_path, destination, target_is_directory=True)
149 elif method == "move":
150 shutil.move(file_path, destination)
151 else:
152 raise RuntimeError("No upload method not recognized", method)
154 if config.NO_UPLOAD_FILE_READ_OTHERS:
155 log.debug("Changing perms of %s", destination)
156 destination.chmod(destination.stat().st_mode | stat.S_IROTH)
157 uploaded_url = "/".join([file_id, uu, name])
159 return destination, uploaded_url
161 async def __valid_url(self, url: str) -> bool:
162 async with self.session.http.head(url) as r:
163 return r.status < 400
165 async def __get_stored(self, attachment: LegacyAttachment) -> Attachment:
166 if attachment.legacy_file_id is not None and not self.__is_component:
167 with self.xmpp.store.session() as orm:
168 stored = (
169 orm.query(Attachment)
170 .filter_by(
171 legacy_file_id=str(attachment.legacy_file_id),
172 user_account_id=self.session.user_pk,
173 )
174 .one_or_none()
175 )
176 if stored is not None:
177 if not await self.__valid_url(stored.url):
178 stored.url = None # type:ignore
179 return stored
180 return Attachment(
181 user_account_id=None if self.__is_component else self.session.user_pk,
182 legacy_file_id=None
183 if attachment.legacy_file_id is None
184 else str(attachment.legacy_file_id),
185 url=attachment.url if config.USE_ATTACHMENT_ORIGINAL_URLS else None,
186 )
188 async def __get_url(
189 self, attachment: LegacyAttachment, stored: Attachment
190 ) -> tuple[bool, Path | None, str]:
191 file_name = attachment.name
192 content_type = attachment.content_type
193 file_path = attachment.path
195 if file_name and len(file_name) > config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH:
196 log.debug("Trimming long filename: %s", file_name)
197 base, ext = os.path.splitext(file_name)
198 file_name = (
199 base[: config.ATTACHMENT_MAXIMUM_FILE_NAME_LENGTH - len(ext)] + ext
200 )
202 if file_path is None:
203 if file_name is None:
204 file_name = str(uuid4())
205 if content_type is not None:
206 ext = guess_extension(content_type, strict=False) # type:ignore
207 if ext is not None:
208 file_name += ext
209 temp_dir = Path(tempfile.mkdtemp())
210 file_path = temp_dir / file_name
211 if attachment.url:
212 async with self.session.http.get(attachment.url) as r:
213 r.raise_for_status()
214 with file_path.open("wb") as f:
215 f.write(await r.read())
217 elif attachment.stream is not None:
218 data = attachment.stream.read()
219 if data is None:
220 raise RuntimeError
222 with file_path.open("wb") as f:
223 f.write(data)
224 elif attachment.aio_stream is not None:
225 # TODO: patch slixmpp to allow this as data source for
226 # upload_file() so we don't even have to write anything
227 # to disk.
228 with file_path.open("wb") as f:
229 async for chunk in attachment.aio_stream:
230 f.write(chunk)
231 elif attachment.data is not None:
232 with file_path.open("wb") as f:
233 f.write(attachment.data)
235 is_temp = not bool(config.NO_UPLOAD_PATH)
236 else:
237 is_temp = False
239 assert isinstance(file_path, Path)
240 if config.FIX_FILENAME_SUFFIX_MIME_TYPE:
241 file_name, content_type = fix_suffix(file_path, content_type, file_name)
242 attachment.content_type = content_type
243 attachment.name = file_name
245 if config.NO_UPLOAD_PATH:
246 local_path, new_url = await self.__no_upload(
247 file_path, file_name, stored.legacy_file_id
248 )
249 new_url = (config.NO_UPLOAD_URL_PREFIX or "") + "/" + urlquote(new_url)
250 else:
251 local_path = file_path
252 new_url = await self.__upload(file_path, file_name, content_type)
253 if stored.legacy_file_id and new_url is not None:
254 stored.url = new_url
256 if local_path is not None and local_path.stat().st_size == 0:
257 raise RuntimeError("File size is 0")
259 return is_temp, local_path, new_url
261 async def __set_sims(
262 self,
263 msg: Message,
264 uploaded_url: str,
265 path: Optional[Path],
266 attachment: LegacyAttachment,
267 stored: Attachment,
268 ) -> Thumbnail | None:
269 if stored.sims is not None:
270 ref = self.xmpp["xep_0372"].stanza.Reference(xml=ET.fromstring(stored.sims))
271 msg.append(ref)
272 if ref["sims"]["file"].get_plugin("thumbnail", check=True):
273 return ref["sims"]["file"]["thumbnail"]
274 else:
275 return None
277 if not path:
278 return None
280 ref = self.xmpp["xep_0385"].get_sims(
281 path, [uploaded_url], attachment.content_type, attachment.caption
282 )
283 if attachment.name:
284 ref["sims"]["file"]["name"] = attachment.name
285 thumbnail = None
286 if attachment.content_type is not None and attachment.content_type.startswith(
287 "image"
288 ):
289 try:
290 h, x, y = await self.xmpp.loop.run_in_executor(
291 avatar_cache._thread_pool, get_thumbhash, path
292 )
293 except Exception as e:
294 log.debug("Could not generate a thumbhash", exc_info=e)
295 else:
296 thumbnail = ref["sims"]["file"]["thumbnail"]
297 thumbnail["width"] = x
298 thumbnail["height"] = y
299 thumbnail["media-type"] = "image/thumbhash"
300 thumbnail["uri"] = "data:image/thumbhash;base64," + urlquote(h)
302 stored.sims = str(ref)
303 msg.append(ref)
305 return thumbnail
307 def __set_sfs(
308 self,
309 msg: Message,
310 uploaded_url: str,
311 path: Optional[Path],
312 attachment: LegacyAttachment,
313 stored: Attachment,
314 thumbnail: Optional[Thumbnail] = None,
315 ) -> None:
316 if stored.sfs is not None:
317 msg.append(StatelessFileSharing(xml=ET.fromstring(stored.sfs)))
318 return
320 if not path:
321 return
323 sfs = self.xmpp["xep_0447"].get_sfs(
324 path, [uploaded_url], attachment.content_type, attachment.caption
325 )
326 if attachment.name:
327 sfs["file"]["name"] = attachment.name
328 if attachment.disposition:
329 sfs["disposition"] = attachment.disposition
330 else:
331 del sfs["disposition"]
332 if thumbnail is not None:
333 sfs["file"].append(thumbnail)
334 stored.sfs = str(sfs)
335 msg.append(sfs)
337 async def __set_sfs_and_sims_without_download(
338 self, msg: Message, attachment: LegacyAttachment
339 ) -> None:
340 assert attachment.url is not None
342 if not any(
343 (
344 attachment.content_type,
345 attachment.name,
346 attachment.disposition,
347 )
348 ):
349 return
351 sims = self.xmpp.plugin["xep_0385"].stanza.Sims()
352 ref = self.xmpp["xep_0372"].stanza.Reference()
354 ref["uri"] = attachment.url
355 ref["type"] = "data"
356 sims["sources"].append(ref)
357 sims.enable("file")
359 xep_0447_stanza = self.xmpp.plugin["xep_0447"].stanza
360 sfs = xep_0447_stanza.StatelessFileSharing()
361 url_data = xep_0447_stanza.UrlData()
362 url_data["target"] = attachment.url
363 sfs["sources"].append(url_data)
364 sfs.enable("file")
366 if attachment.content_type:
367 sims["file"]["media-type"] = attachment.content_type
368 sfs["file"]["media-type"] = attachment.content_type
369 if attachment.caption:
370 sims["file"]["desc"] = attachment.caption
371 sfs["file"]["desc"] = attachment.caption
372 if attachment.name:
373 sims["file"]["name"] = attachment.name
374 sfs["file"]["name"] = attachment.name
375 if attachment.disposition:
376 sfs["disposition"] = attachment.disposition
378 msg.append(sims)
379 msg.append(sfs)
381 def __send_url(
382 self,
383 msg: Message,
384 legacy_msg_id: LegacyMessageType,
385 uploaded_url: str,
386 caption: Optional[str] = None,
387 carbon: bool = False,
388 when: Optional[datetime] = None,
389 correction: bool = False,
390 **kwargs,
391 ) -> list[Message]:
392 msg["oob"]["url"] = uploaded_url
393 msg["body"] = uploaded_url
394 if msg.get_plugin("sfs", check=True):
395 msg["fallback"].enable("body")
396 msg["fallback"]["for"] = self.xmpp.plugin["xep_0447"].stanza.NAMESPACE
397 if caption:
398 if correction:
399 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
400 else:
401 self._set_msg_id(msg, legacy_msg_id)
402 m1 = self._send(msg, carbon=carbon, correction=correction, **kwargs)
403 m2 = self.send_text(
404 caption, legacy_msg_id=None, when=when, carbon=carbon, **kwargs
405 )
406 return [m1, m2] if m2 else [m1]
407 else:
408 if correction:
409 msg["replace"]["id"] = self._replace_id(legacy_msg_id)
410 else:
411 self._set_msg_id(msg, legacy_msg_id)
412 return [self._send(msg, carbon=carbon, **kwargs)]
414 def __get_base_message(
415 self,
416 legacy_msg_id: Optional[LegacyMessageType] = None,
417 reply_to: Optional[MessageReference] = None,
418 when: Optional[datetime] = None,
419 thread: Optional[LegacyThreadType] = None,
420 carbon: bool = False,
421 correction: bool = False,
422 mto: Optional[JID] = None,
423 ) -> Message:
424 if correction:
425 xmpp_ids = self._legacy_to_xmpp(legacy_msg_id)
426 if xmpp_ids:
427 original_xmpp_id = xmpp_ids[0]
428 for xmpp_id in xmpp_ids:
429 if xmpp_id == original_xmpp_id:
430 continue
431 self.retract(xmpp_id, thread)
433 if reply_to is not None and reply_to.body:
434 # We cannot have a "quote fallback" for attachments since most (all?)
435 # XMPP clients will only treat a message as an attachment if the
436 # body is the URL and nothing else.
437 reply_to_for_attachment: MessageReference | None = MessageReference(
438 reply_to.legacy_id, reply_to.author
439 )
440 else:
441 reply_to_for_attachment = reply_to
443 return self._make_message(
444 when=when,
445 reply_to=reply_to_for_attachment,
446 carbon=carbon,
447 mto=mto,
448 thread=thread,
449 )
451 async def send_file(
452 self,
453 attachment: LegacyAttachment | Path | str,
454 legacy_msg_id: Optional[LegacyMessageType] = None,
455 *,
456 reply_to: Optional[MessageReference] = None,
457 when: Optional[datetime] = None,
458 thread: Optional[LegacyThreadType] = None,
459 **kwargs,
460 ) -> tuple[Optional[str], list[Message]]:
461 """
462 Send a single file from this :term:`XMPP Entity`.
464 :param attachment: The file to send.
465 Ideally, a :class:`.LegacyAttachment` with a unique ``legacy_file_id``
466 attribute set, to optimise potential future reuses.
467 It can also be:
468 - a :class:`pathlib.Path` instance to point to a local file, or
469 - a ``str``, representing a fetchable HTTP URL.
470 :param legacy_msg_id: If you want to be able to transport read markers from the gateway
471 user to the legacy network, specify this
472 :param reply_to: Quote another message (:xep:`0461`)
473 :param when: when the file was sent, for a "delay" tag (:xep:`0203`)
474 :param thread:
475 """
476 coro = self.__send_file(
477 attachment,
478 legacy_msg_id,
479 reply_to=reply_to,
480 when=when,
481 thread=thread,
482 **kwargs,
483 )
484 if self.__is_component:
485 return await coro
486 elif not isinstance(attachment, LegacyAttachment):
487 return await coro
488 elif attachment.legacy_file_id is None:
489 return await coro
490 else:
491 # prevents race conditions where we download the same thing several time
492 # and end up attempting to insert it twice in the DB, raising an
493 # IntegrityError.
494 async with self.session.lock(("attachment", attachment.legacy_file_id)):
495 return await coro
497 async def __send_file(
498 self,
499 attachment: LegacyAttachment | Path | str,
500 legacy_msg_id: Optional[LegacyMessageType] = None,
501 *,
502 reply_to: Optional[MessageReference] = None,
503 when: Optional[datetime] = None,
504 thread: Optional[LegacyThreadType] = None,
505 **kwargs,
506 ) -> tuple[Optional[str], list[Message]]:
507 store_multi = kwargs.pop("store_multi", True)
508 carbon = kwargs.pop("carbon", False)
509 mto = kwargs.pop("mto", None)
510 correction = kwargs.get("correction", False)
512 msg = self.__get_base_message(
513 legacy_msg_id, reply_to, when, thread, carbon, correction, mto
514 )
516 if isinstance(attachment, str):
517 attachment = LegacyAttachment(url=attachment)
518 elif isinstance(attachment, Path):
519 attachment = LegacyAttachment(path=attachment)
521 stored = await self.__get_stored(attachment)
523 if attachment.content_type is None and (
524 name := (attachment.name or attachment.url or attachment.path)
525 ):
526 attachment.content_type, _ = guess_type(name)
528 if stored.url:
529 is_temp = False
530 local_path = None
531 new_url = stored.url
532 else:
533 try:
534 is_temp, local_path, new_url = await self.__get_url(attachment, stored)
535 except Exception as e:
536 log.error("Error with attachment: %s: %s", attachment, e)
537 log.debug("", exc_info=e)
538 msg["body"] = (
539 f"/me tried to send a file ({attachment.format_for_user()}), "
540 f"but something went wrong: {e}. "
541 )
542 self._set_msg_id(msg, legacy_msg_id)
543 return None, [self._send(msg, **kwargs)]
544 assert new_url is not None
546 stored.url = new_url
547 if config.USE_ATTACHMENT_ORIGINAL_URLS and attachment.url:
548 await self.__set_sfs_and_sims_without_download(msg, attachment)
549 else:
550 thumbnail = await self.__set_sims(
551 msg, new_url, local_path, attachment, stored
552 )
553 self.__set_sfs(msg, new_url, local_path, attachment, stored, thumbnail)
555 if not self.__is_component:
556 with self.xmpp.store.session(expire_on_commit=False) as orm:
557 orm.add(stored)
558 orm.commit()
560 if is_temp and isinstance(local_path, Path):
561 local_path.unlink()
562 local_path.parent.rmdir()
564 msgs = self.__send_url(
565 msg, legacy_msg_id, new_url, attachment.caption, carbon, when, **kwargs
566 )
567 if not self.__is_component:
568 if store_multi:
569 self.__store_multi(legacy_msg_id, msgs)
570 return new_url, msgs
572 def __send_body(
573 self,
574 body: Optional[str] = None,
575 legacy_msg_id: Optional[LegacyMessageType] = None,
576 reply_to: Optional[MessageReference] = None,
577 when: Optional[datetime] = None,
578 thread: Optional[LegacyThreadType] = None,
579 **kwargs,
580 ) -> Optional[Message]:
581 if body:
582 return self.send_text(
583 body,
584 legacy_msg_id,
585 reply_to=reply_to,
586 when=when,
587 thread=thread,
588 **kwargs,
589 )
590 else:
591 return None
593 async def send_files(
594 self,
595 attachments: Collection[LegacyAttachment],
596 legacy_msg_id: Optional[LegacyMessageType] = None,
597 body: Optional[str] = None,
598 *,
599 reply_to: Optional[MessageReference] = None,
600 when: Optional[datetime] = None,
601 thread: Optional[LegacyThreadType] = None,
602 body_first: bool = False,
603 correction: bool = False,
604 correction_event_id: Optional[LegacyMessageType] = None,
605 **kwargs,
606 ) -> None:
607 # TODO: once the epic XEP-0385 vs XEP-0447 battle is over, pick
608 # one and stop sending several attachments this way
609 # we attach the legacy_message ID to the last message we send, because
610 # we don't want several messages with the same ID (especially for MUC MAM)
611 if not attachments and not body:
612 # ignoring empty message
613 return
614 body_msg_id = (
615 legacy_msg_id if body_needs_msg_id(attachments, body, body_first) else None
616 )
617 send_body = functools.partial(
618 self.__send_body,
619 body=body,
620 reply_to=reply_to,
621 when=when,
622 thread=thread,
623 correction=correction,
624 legacy_msg_id=body_msg_id,
625 correction_event_id=correction_event_id,
626 **kwargs,
627 )
628 all_msgs = []
629 if body_first:
630 all_msgs.append(send_body())
631 for i, attachment in enumerate(attachments):
632 if i == 0 and body_msg_id is None:
633 legacy = legacy_msg_id
634 else:
635 legacy = None
636 _url, msgs = await self.send_file(
637 attachment,
638 legacy,
639 reply_to=reply_to,
640 when=when,
641 thread=thread,
642 store_multi=False,
643 **kwargs,
644 )
645 all_msgs.extend(msgs)
646 if not body_first:
647 all_msgs.append(send_body())
648 self.__store_multi(legacy_msg_id, all_msgs)
650 def __store_multi(
651 self,
652 legacy_msg_id: Optional[LegacyMessageType],
653 all_msgs: Sequence[Optional[Message]],
654 ) -> None:
655 if legacy_msg_id is None:
656 return
657 ids = []
658 for msg in all_msgs:
659 if not msg:
660 continue
661 if stanza_id := msg.get_plugin("stanza_id", check=True):
662 ids.append(stanza_id["id"])
663 else:
664 ids.append(msg.get_id())
665 with self.xmpp.store.session() as orm:
666 self.xmpp.store.id_map.set_msg(
667 orm, self._recipient_pk(), str(legacy_msg_id), ids, self.is_participant
668 )
669 orm.commit()
672def body_needs_msg_id(
673 attachments: Collection[LegacyAttachment], body: str | None, body_first: bool
674) -> bool:
675 if attachments:
676 return bool(body and body_first)
677 else:
678 return True
681def get_thumbhash(path: Path) -> tuple[str, int, int]:
682 with path.open("rb") as fp:
683 img = Image.open(fp)
684 width, height = img.size
685 img = img.convert("RGBA")
686 if width > 100 or height > 100:
687 img.thumbnail((100, 100))
688 img = ImageOps.exif_transpose(img)
689 rgba_2d = list(img.getdata())
690 rgba = list(chain(*rgba_2d))
691 ints = thumbhash.rgba_to_thumb_hash(img.width, img.height, rgba)
692 return base64.b64encode(bytes(ints)).decode(), width, height
695log = logging.getLogger(__name__)