import re
from datetime import datetime, timezone
from typing import TYPE_CHECKING, AsyncIterator, Optional
from slidge.group import LegacyBookmarks, LegacyMUC, LegacyParticipant, MucType
from slidge.util.archive_msg import HistoryMessage
from slidge.util.types import Hat, HoleBound, Mention, MucAffiliation
from slixmpp.exceptions import XMPPError
from .generated import go, whatsapp
if TYPE_CHECKING:
from .contact import Contact
from .session import Session
[docs]class Participant(LegacyParticipant):
[docs]class MUC(LegacyMUC[str, str, Participant, str]):
[docs] HAS_DESCRIPTION = False
[docs] REACTIONS_SINGLE_EMOJI = True
[docs] _ALL_INFO_FILLED_ON_STARTUP = True
[docs] async def update_info(self):
try:
avatar = self.session.whatsapp.GetAvatar(self.legacy_id, self.avatar or "")
if avatar.URL and self.avatar != avatar.ID:
await self.set_avatar(avatar.URL, avatar.ID)
elif avatar.URL == "":
await self.set_avatar(None)
except RuntimeError as err:
self.session.log.error(
"Failed getting avatar for group %s: %s", self.legacy_id, err
)
[docs] async def backfill(
self,
after: HoleBound | None = None,
before: HoleBound | None = None,
):
"""
Request history for messages older than the oldest message given by ID and date.
"""
if before is None:
return
# WhatsApp requires a full reference to the last seen message in performing on-demand sync.
assert isinstance(before.id, str)
oldest_message = whatsapp.Message(
ID=before.id,
IsCarbon=self.session.message_is_carbon(self, before.id),
Timestamp=int(before.timestamp.timestamp()),
)
self.session.whatsapp.RequestMessageHistory(self.legacy_id, oldest_message)
[docs] def get_message_sender(self, legacy_msg_id: str):
assert self.pk is not None
stored = self.xmpp.store.mam.get_by_legacy_id(self.pk, legacy_msg_id)
if stored is None:
raise XMPPError("internal-server-error", "Unable to find message sender")
msg = HistoryMessage(stored.stanza)
occupant_id = msg.stanza["occupant-id"]["id"]
if occupant_id == "slidge-user":
return self.session.contacts.user_legacy_id
if "@" in occupant_id:
jid_username = occupant_id.split("@")[0]
return jid_username.removeprefix("+") + "@" + whatsapp.DefaultUserServer
raise XMPPError("internal-server-error", "Unable to find message sender")
[docs] async def update_whatsapp_info(self, info: whatsapp.Group):
"""
Set MUC information based on WhatsApp group information, which may or may not be partial in
case of updates to existing MUCs.
"""
if info.Nickname:
self.user_nick = info.Nickname
if info.Name:
self.name = info.Name
if info.Subject.Subject:
self.subject = info.Subject.Subject
if info.Subject.SetAt:
set_at = datetime.fromtimestamp(info.Subject.SetAt, tz=timezone.utc)
self.subject_date = set_at
if info.Subject.SetByJID:
participant = await self.get_participant_by_legacy_id(
info.Subject.SetByJID
)
if name := participant.nickname:
self.subject_setter = name
self.session.whatsapp_participants[self.legacy_id] = info.Participants
[docs] async def fill_participants(self) -> AsyncIterator[Participant]:
await self.session.bookmarks.ready
try:
participants = self.session.whatsapp_participants.pop(self.legacy_id)
except KeyError:
self.log.warning("No participants!")
return
for data in participants:
participant = await self.get_participant_by_legacy_id(data.JID)
if data.Action == whatsapp.GroupParticipantActionRemove:
self.remove_participant(participant)
else:
if data.Affiliation == whatsapp.GroupAffiliationAdmin:
# Only owners can change the group name according to
# XEP-0045, so we make all "WA admins" "XMPP owners"
participant.affiliation = "owner"
participant.role = "moderator"
elif data.Affiliation == whatsapp.GroupAffiliationOwner:
# The WA owner is in fact the person who created the room
participant.set_hats(
[Hat("https://slidge.im/hats/slidge-whatsapp/owner", "Owner")]
)
participant.affiliation = "owner"
participant.role = "moderator"
else:
participant.affiliation = "member"
participant.role = "participant"
yield participant
[docs] async def replace_mentions(self, t: str):
return replace_whatsapp_mentions(
t,
participants=(
{
p.contact.jid_username: p.nickname
async for p in self.get_participants()
if p.contact is not None # should not happen
}
| {self.session.user_phone: self.user_nick}
if self.session.user_phone # user_phone *should* be set at this point,
else {} # but better safe than sorry
),
)
[docs] async def on_avatar(self, data: Optional[bytes], mime: Optional[str]) -> None:
return self.session.whatsapp.SetAvatar(
self.legacy_id,
go.Slice_byte.from_bytes(data) if data else go.Slice_byte(),
)
[docs] async def on_set_config(
self,
name: Optional[str],
description: Optional[str],
):
# there are no group descriptions in WA, but topics=subjects
if self.name != name:
self.session.whatsapp.SetGroupName(self.legacy_id, name)
[docs] async def on_set_subject(self, subject: str):
if self.subject != subject:
self.session.whatsapp.SetGroupTopic(self.legacy_id, subject)
[docs] async def on_set_affiliation(
self,
contact: "Contact", # type:ignore
affiliation: MucAffiliation,
reason: Optional[str],
nickname: Optional[str],
):
assert contact.contact_pk is not None
assert self.pk is not None
if affiliation == "member":
if (
self.xmpp.store.participants.get_by_contact(self.pk, contact.contact_pk)
is not None
):
change = "demote"
else:
change = "add"
elif affiliation == "admin":
change = "promote"
elif affiliation == "outcast" or affiliation == "none":
change = "remove"
else:
raise XMPPError(
"bad-request",
f"You can't make a participant '{affiliation}' in whatsapp",
)
self.session.whatsapp.SetAffiliation(self.legacy_id, contact.legacy_id, change)
[docs]class Bookmarks(LegacyBookmarks[str, MUC]):
def __init__(self, session: "Session"):
super().__init__(session)
[docs] async def fill(self):
groups = self.session.whatsapp.GetGroups()
for group in groups:
await self.add_whatsapp_group(group)
self.__filled = True
[docs] async def add_whatsapp_group(self, data: whatsapp.Group):
muc = await self.by_legacy_id(data.JID)
await muc.update_whatsapp_info(data)
await muc.add_to_bookmarks()
[docs] async def legacy_id_to_jid_local_part(self, legacy_id: str):
return "#" + legacy_id[: legacy_id.find("@")]
[docs] async def jid_local_part_to_legacy_id(self, local_part: str):
if not local_part.startswith("#"):
raise XMPPError("bad-request", "Invalid group ID, expected '#' prefix")
if not self.__filled:
raise XMPPError(
"recipient-unavailable", "Still fetching group info, please retry later"
)
whatsapp_group_id = (
local_part.removeprefix("#") + "@" + whatsapp.DefaultGroupServer
)
if (
self.xmpp.store.rooms.get_by_legacy_id(
self.session.user_pk, whatsapp_group_id
)
is None
):
raise XMPPError("item-not-found", f"No group found for {whatsapp_group_id}")
return whatsapp_group_id
[docs]def replace_xmpp_mentions(text: str, mentions: list[Mention]):
offset: int = 0
result: str = ""
for m in mentions:
legacy_id = "@" + m.contact.legacy_id[: m.contact.legacy_id.find("@")]
result = result + text[offset : m.start] + legacy_id
offset = m.end
return result + text[offset:] if offset > 0 else text
[docs]def replace_whatsapp_mentions(text: str, participants: dict[str, str]):
def match(m: re.Match):
group = m.group(0)
return participants.get(group.replace("@", "+"), group)
return re.sub(r"@\d+", match, text)