Coverage for slidge / core / dispatcher / presence.py: 84%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-13 04:38 +0000

1import contextlib 

2import logging 

3import typing 

4 

5from slixmpp import JID, Presence 

6from slixmpp.exceptions import XMPPError 

7 

8from ...contact import LegacyContact 

9from ...contact.roster import ContactIsUser 

10from ...util.types import AnySession 

11from ...util.util import merge_resources 

12from .util import DispatcherMixin, exceptions_to_xmpp_errors 

13 

14if typing.TYPE_CHECKING: 

15 from slidge.util.types import AnyGateway 

16 

17 

18class _IsDirectedAtComponent(Exception): 

19 def __init__(self, session: AnySession) -> None: 

20 self.session = session 

21 

22 

23class PresenceHandlerMixin(DispatcherMixin): 

24 __slots__: list[str] = [] 

25 xmpp: "AnyGateway" 

26 

27 def __init__(self, xmpp: "AnyGateway") -> None: 

28 super().__init__(xmpp) 

29 

30 xmpp.add_event_handler("presence_subscribe", self._handle_subscribe) 

31 xmpp.add_event_handler("presence_subscribed", self._handle_subscribed) 

32 xmpp.add_event_handler("presence_unsubscribe", self._handle_unsubscribe) 

33 xmpp.add_event_handler("presence_unsubscribed", self._handle_unsubscribed) 

34 xmpp.add_event_handler("presence_probe", self._handle_probe) 

35 xmpp.add_event_handler("presence", self.on_presence) 

36 

37 async def __get_contact(self, pres: Presence) -> LegacyContact: 

38 sess = await self._get_session(pres) 

39 pto = pres.get_to() 

40 if pto == self.xmpp.boundjid.bare: 

41 raise _IsDirectedAtComponent(sess) 

42 await sess.contacts.ready 

43 return await sess.contacts.by_jid(pto) # type:ignore[no-any-return] 

44 

45 @exceptions_to_xmpp_errors 

46 async def _handle_subscribe(self, pres: Presence) -> None: 

47 try: 

48 contact = await self.__get_contact(pres) 

49 except _IsDirectedAtComponent: 

50 pres.reply().send() 

51 return 

52 

53 if contact.is_friend: 

54 pres.reply().send() 

55 else: 

56 await contact.on_friend_request(pres["status"]) 

57 

58 @exceptions_to_xmpp_errors 

59 async def _handle_unsubscribe(self, pres: Presence) -> None: 

60 pres.reply().send() 

61 

62 try: 

63 contact = await self.__get_contact(pres) 

64 except _IsDirectedAtComponent as e: 

65 e.session.send_gateway_message("Bye bye!") 

66 await e.session.kill_by_jid(e.session.user_jid) 

67 return 

68 

69 contact.is_friend = False 

70 await contact.on_friend_delete(pres["status"]) 

71 

72 @exceptions_to_xmpp_errors 

73 async def _handle_subscribed(self, pres: Presence) -> None: 

74 try: 

75 contact = await self.__get_contact(pres) 

76 except _IsDirectedAtComponent: 

77 return 

78 

79 await contact.on_friend_accept() 

80 contact.send_last_presence(force=True) 

81 

82 @exceptions_to_xmpp_errors 

83 async def _handle_unsubscribed(self, pres: Presence) -> None: 

84 try: 

85 contact = await self.__get_contact(pres) 

86 except _IsDirectedAtComponent: 

87 return 

88 

89 if contact.is_friend: 

90 contact.is_friend = False 

91 await contact.on_friend_delete(pres["status"]) 

92 

93 @exceptions_to_xmpp_errors 

94 async def _handle_probe(self, pres: Presence) -> None: 

95 try: 

96 contact = await self.__get_contact(pres) 

97 except _IsDirectedAtComponent: 

98 session = await self._get_session(pres) 

99 session.send_cached_presence(pres.get_from()) 

100 return 

101 if contact.is_friend: 

102 contact.send_last_presence(force=True) 

103 else: 

104 reply = pres.reply() 

105 reply["type"] = "unsubscribed" 

106 reply.send() 

107 

108 @exceptions_to_xmpp_errors 

109 async def on_presence(self, p: Presence) -> None: 

110 if "muc_join" in p: 

111 # handled in on_groupchat_join 

112 # without this early return, since we switch from and to in this 

113 # presence stanza, on_groupchat_join ends up trying to instantiate 

114 # a MUC with the user's JID, which in turn leads to slidge sending 

115 # a (error) presence from=the user's JID, which terminates the 

116 # XML stream. 

117 return 

118 

119 session = await self._get_session(p) 

120 

121 pto = p.get_to() 

122 if pto == self.xmpp.boundjid.bare: 

123 await self._on_presence_to_component(session, p) 

124 return 

125 

126 if p.get_type() == "available": 

127 try: 

128 contact = await session.contacts.by_jid(pto) 

129 except XMPPError: 

130 contact = None 

131 except ContactIsUser: 

132 raise XMPPError( 

133 "bad-request", "Actions with yourself are not supported." 

134 ) 

135 if contact is not None: 

136 await self.xmpp.pubsub.on_presence_available(p, contact) 

137 return 

138 

139 muc = session.bookmarks.by_jid_only_if_exists(JID(pto.bare)) 

140 

141 if muc is not None and p.get_type() == "unavailable": 

142 muc.on_presence_unavailable(p) 

143 return 

144 

145 if muc is None or p.get_from().resource not in muc.get_user_resources(): 

146 return 

147 

148 if pto.resource == muc.user_nick: 

149 # Ignore presence stanzas with the valid nick. 

150 # even if joined to the group, we might receive those from clients, 

151 # when setting a status message, or going away, etc. 

152 return 

153 

154 # We can't use XMPPError here because XMPPError does not have a way to 

155 # add the <x xmlns="http://jabber.org/protocol/muc" /> element 

156 

157 error_stanza = p.error() 

158 error_stanza.set_to(p.get_from()) 

159 error_stanza.set_from(pto) 

160 error_stanza.enable("muc_join") # <x xmlns="http://jabber.org/protocol/muc" /> 

161 error_stanza.enable("error") 

162 error_stanza["error"]["type"] = "cancel" 

163 error_stanza["error"]["by"] = muc.jid 

164 error_stanza["error"]["condition"] = "not-acceptable" 

165 error_stanza["error"]["text"] = ( 

166 "Slidge does not let you change your nickname in groups." 

167 ) 

168 error_stanza.send() 

169 

170 async def _on_presence_to_component(self, session: AnySession, p: Presence) -> None: 

171 session.log.debug("Received a presence from %s", p.get_from()) 

172 if (ptype := p.get_type()) not in _USEFUL_PRESENCES: 

173 return 

174 if not session.user.preferences.get("sync_presence", False): 

175 session.log.debug("User does not want to sync their presence") 

176 return 

177 # NB: get_type() returns either a proper presence type or 

178 # a presence show if available. Weird, weird, weird slix. 

179 resources = self.xmpp.roster[self.xmpp.boundjid.bare][p.get_from()].resources 

180 with contextlib.suppress(NotImplementedError): 

181 await session.on_presence( 

182 p.get_from().resource, 

183 ptype, # type: ignore 

184 p["status"], 

185 resources, 

186 merge_resources(resources), 

187 ) 

188 if p.get_type() == "available": 

189 await self.xmpp.pubsub.on_presence_available(p, None) 

190 for contact in session.contacts: 

191 await self.xmpp.pubsub.on_presence_available(p, contact) 

192 

193 

194_USEFUL_PRESENCES = {"available", "unavailable", "away", "chat", "dnd", "xa"} 

195 

196log = logging.getLogger(__name__)