Coverage for slidge / core / dispatcher / presence.py: 82%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-06 05:07 +0000

1import logging 

2import typing 

3 

4from slixmpp import JID, Presence 

5from slixmpp.exceptions import XMPPError 

6 

7from ...contact.roster import ContactIsUser 

8from ...util.types import AnyContact, AnySession 

9from ...util.util import merge_resources 

10from .util import DispatcherMixin, exceptions_to_xmpp_errors 

11 

12if typing.TYPE_CHECKING: 

13 from slidge.core.gateway import BaseGateway 

14 

15 

16class _IsDirectedAtComponent(Exception): 

17 def __init__(self, session: AnySession) -> None: 

18 self.session = session 

19 

20 

21class PresenceHandlerMixin(DispatcherMixin): 

22 __slots__: list[str] = [] 

23 xmpp: "BaseGateway" 

24 

25 def __init__(self, xmpp: "BaseGateway") -> None: 

26 super().__init__(xmpp) 

27 

28 xmpp.add_event_handler("presence_subscribe", self._handle_subscribe) 

29 xmpp.add_event_handler("presence_subscribed", self._handle_subscribed) 

30 xmpp.add_event_handler("presence_unsubscribe", self._handle_unsubscribe) 

31 xmpp.add_event_handler("presence_unsubscribed", self._handle_unsubscribed) 

32 xmpp.add_event_handler("presence_probe", self._handle_probe) 

33 xmpp.add_event_handler("presence", self.on_presence) 

34 

35 async def __get_contact(self, pres: Presence) -> AnyContact: 

36 sess = await self._get_session(pres) 

37 pto = pres.get_to() 

38 if pto == self.xmpp.boundjid.bare: 

39 raise _IsDirectedAtComponent(sess) 

40 await sess.contacts.ready 

41 return await sess.contacts.by_jid(pto) # type:ignore[no-any-return] 

42 

43 @exceptions_to_xmpp_errors 

44 async def _handle_subscribe(self, pres: Presence) -> None: 

45 try: 

46 contact = await self.__get_contact(pres) 

47 except _IsDirectedAtComponent: 

48 pres.reply().send() 

49 return 

50 

51 if contact.is_friend: 

52 pres.reply().send() 

53 else: 

54 await contact.on_friend_request(pres["status"]) 

55 

56 @exceptions_to_xmpp_errors 

57 async def _handle_unsubscribe(self, pres: Presence) -> None: 

58 pres.reply().send() 

59 

60 try: 

61 contact = await self.__get_contact(pres) 

62 except _IsDirectedAtComponent as e: 

63 e.session.send_gateway_message("Bye bye!") 

64 await e.session.kill_by_jid(e.session.user_jid) 

65 return 

66 

67 contact.is_friend = False 

68 await contact.on_friend_delete(pres["status"]) 

69 

70 @exceptions_to_xmpp_errors 

71 async def _handle_subscribed(self, pres: Presence) -> None: 

72 try: 

73 contact = await self.__get_contact(pres) 

74 except _IsDirectedAtComponent: 

75 return 

76 

77 await contact.on_friend_accept() 

78 contact.send_last_presence(force=True) 

79 

80 @exceptions_to_xmpp_errors 

81 async def _handle_unsubscribed(self, pres: Presence) -> None: 

82 try: 

83 contact = await self.__get_contact(pres) 

84 except _IsDirectedAtComponent: 

85 return 

86 

87 if contact.is_friend: 

88 contact.is_friend = False 

89 await contact.on_friend_delete(pres["status"]) 

90 

91 @exceptions_to_xmpp_errors 

92 async def _handle_probe(self, pres: Presence) -> None: 

93 try: 

94 contact = await self.__get_contact(pres) 

95 except _IsDirectedAtComponent: 

96 session = await self._get_session(pres) 

97 session.send_cached_presence(pres.get_from()) 

98 return 

99 if contact.is_friend: 

100 contact.send_last_presence(force=True) 

101 else: 

102 reply = pres.reply() 

103 reply["type"] = "unsubscribed" 

104 reply.send() 

105 

106 @exceptions_to_xmpp_errors 

107 async def on_presence(self, p: Presence) -> None: 

108 if p.get_plugin("muc_join", check=True): 

109 # handled in on_groupchat_join 

110 # without this early return, since we switch from and to in this 

111 # presence stanza, on_groupchat_join ends up trying to instantiate 

112 # a MUC with the user's JID, which in turn leads to slidge sending 

113 # a (error) presence from=the user's JID, which terminates the 

114 # XML stream. 

115 return 

116 

117 session = await self._get_session(p) 

118 

119 pto = p.get_to() 

120 if pto == self.xmpp.boundjid.bare: 

121 await self._on_presence_to_component(session, p) 

122 return 

123 

124 if p.get_type() == "available": 

125 try: 

126 contact = await session.contacts.by_jid(pto) 

127 except XMPPError: 

128 contact = None 

129 except ContactIsUser: 

130 raise XMPPError( 

131 "bad-request", "Actions with yourself are not supported." 

132 ) 

133 if contact is not None: 

134 await self.xmpp.pubsub.on_presence_available(p, contact) 

135 return 

136 

137 muc = session.bookmarks.by_jid_only_if_exists(JID(pto.bare)) 

138 

139 if muc is not None and p.get_type() == "unavailable": 

140 muc.on_presence_unavailable(p) 

141 return 

142 

143 if muc is None or p.get_from().resource not in muc.get_user_resources(): 

144 return 

145 

146 if pto.resource == muc.user_nick: 

147 # Ignore presence stanzas with the valid nick. 

148 # even if joined to the group, we might receive those from clients, 

149 # when setting a status message, or going away, etc. 

150 return 

151 

152 # We can't use XMPPError here because XMPPError does not have a way to 

153 # add the <x xmlns="http://jabber.org/protocol/muc" /> element 

154 

155 error_stanza = p.error() 

156 error_stanza.set_to(p.get_from()) 

157 error_stanza.set_from(pto) 

158 error_stanza.enable("muc_join") # <x xmlns="http://jabber.org/protocol/muc" /> 

159 error_stanza.enable("error") 

160 error_stanza["error"]["type"] = "cancel" 

161 error_stanza["error"]["by"] = muc.jid 

162 error_stanza["error"]["condition"] = "not-acceptable" 

163 error_stanza["error"]["text"] = ( 

164 "Slidge does not let you change your nickname in groups." 

165 ) 

166 error_stanza.send() 

167 

168 async def _on_presence_to_component(self, session: AnySession, p: Presence) -> None: 

169 session.log.debug("Received a presence from %s", p.get_from()) 

170 if (ptype := p.get_type()) not in _USEFUL_PRESENCES: 

171 return 

172 if not session.user.preferences.get("sync_presence", False): 

173 session.log.debug("User does not want to sync their presence") 

174 return 

175 # NB: get_type() returns either a proper presence type or 

176 # a presence show if available. Weird, weird, weird slix. 

177 resources = self.xmpp.roster[self.xmpp.boundjid.bare][p.get_from()].resources 

178 try: 

179 await session.on_presence( 

180 p.get_from().resource, 

181 ptype, # type: ignore 

182 p["status"], 

183 resources, 

184 merge_resources(resources), 

185 ) 

186 except NotImplementedError: 

187 pass 

188 if p.get_type() == "available": 

189 await self.xmpp.pubsub.on_presence_available(p, None) 

190 for contact in session.contacts: 

191 await self.xmpp.pubsub.on_presence_available(p, contact) 

192 

193 

194_USEFUL_PRESENCES = {"available", "unavailable", "away", "chat", "dnd", "xa"} 

195 

196log = logging.getLogger(__name__)