Coverage for slidge/db/alembic/old_user_store.py: 0%

88 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-07 05:11 +0000

1""" 

2This module covers a backend for storing user data persistently and managing a 

3pseudo-roster for the gateway component. 

4""" 

5 

6import dataclasses 

7import datetime 

8import logging 

9import os.path 

10import shelve 

11from io import BytesIO 

12from os import PathLike 

13from typing import Iterable, Optional, Union 

14 

15from pickle_secure import Pickler, Unpickler 

16from slixmpp import JID, Iq, Message, Presence 

17 

18 

19# noinspection PyUnresolvedReferences 

20class EncryptedShelf(shelve.DbfilenameShelf): 

21 cache: dict 

22 dict: dict 

23 writeback: bool 

24 keyencoding: str 

25 _protocol: int 

26 

27 def __init__( 

28 self, filename: PathLike, key: str, flag="c", protocol=None, writeback=False 

29 ): 

30 super().__init__(str(filename), flag, protocol, writeback) 

31 self.secret_key = key 

32 

33 def __getitem__(self, key): 

34 try: 

35 value = self.cache[key] 

36 except KeyError: 

37 f = BytesIO(self.dict[key.encode(self.keyencoding)]) 

38 value = Unpickler(f, key=self.secret_key).load() # type:ignore 

39 if self.writeback: 

40 self.cache[key] = value 

41 return value 

42 

43 def __setitem__(self, key, value): 

44 if self.writeback: 

45 self.cache[key] = value 

46 f = BytesIO() 

47 p = Pickler(f, self._protocol, key=self.secret_key) # type:ignore 

48 p.dump(value) 

49 self.dict[key.encode(self.keyencoding)] = f.getvalue() 

50 

51 

52@dataclasses.dataclass 

53class GatewayUser: 

54 """ 

55 A gateway user 

56 """ 

57 

58 bare_jid: str 

59 """Bare JID of the user""" 

60 registration_form: dict[str, Optional[str]] 

61 """Content of the registration form, as a dict""" 

62 plugin_data: Optional[dict] = None 

63 registration_date: Optional[datetime.datetime] = None 

64 

65 def __hash__(self): 

66 return hash(self.bare_jid) 

67 

68 def __repr__(self): 

69 return f"<User {self.bare_jid}>" 

70 

71 def __post_init__(self): 

72 if self.registration_date is None: 

73 self.registration_date = datetime.datetime.now() 

74 

75 @property 

76 def jid(self) -> JID: 

77 """ 

78 The user's (bare) JID 

79 

80 :return: 

81 """ 

82 return JID(self.bare_jid) 

83 

84 def get(self, field: str, default: str = "") -> Optional[str]: 

85 # """ 

86 # Get fields from the registration form (required to comply with slixmpp backend protocol) 

87 # 

88 # :param field: Name of the field 

89 # :param default: Default value to return if the field is not present 

90 # 

91 # :return: Value of the field 

92 # """ 

93 return self.registration_form.get(field, default) 

94 

95 

96class UserStore: 

97 """ 

98 Basic user store implementation using shelve from the python standard library 

99 

100 Set_file must be called before it is usable 

101 """ 

102 

103 def __init__(self): 

104 self._users: shelve.Shelf[GatewayUser] = None # type: ignore 

105 

106 def set_file(self, filename: PathLike, secret_key: Optional[str] = None): 

107 """ 

108 Set the file to use to store user data 

109 

110 :param filename: Path to the shelf file 

111 :param secret_key: Secret key to store files encrypted on disk 

112 """ 

113 if self._users is not None: 

114 raise RuntimeError("Shelf file already set!") 

115 if os.path.exists(filename): 

116 log.info("Using existing slidge DB: %s", filename) 

117 else: 

118 log.info("Creating a new slidge DB: %s", filename) 

119 if secret_key: 

120 self._users = EncryptedShelf(filename, key=secret_key) 

121 else: 

122 self._users = shelve.open(str(filename)) 

123 log.info("Registered users in the DB: %s", list(self._users.keys())) 

124 

125 def get_all(self) -> Iterable[GatewayUser]: 

126 """ 

127 Get all users in the store 

128 

129 :return: An iterable of GatewayUsers 

130 """ 

131 return self._users.values() 

132 

133 def commit(self, user: GatewayUser): 

134 self._users[user.jid.bare] = user 

135 self._users.sync() 

136 

137 def get(self, _gateway_jid, _node, ifrom: JID, iq) -> Optional[GatewayUser]: 

138 """ 

139 Get a user from the store 

140 

141 NB: there is no reason to call this, it is used by SliXMPP internal API 

142 

143 :param _gateway_jid: 

144 :param _node: 

145 :param ifrom: 

146 :param iq: 

147 :return: 

148 """ 

149 if ifrom is None: # bug in SliXMPP's XEP_0100 plugin 

150 ifrom = iq["from"] 

151 log.debug("Getting user %s", ifrom.bare) 

152 return self._users.get(ifrom.bare) 

153 

154 def get_by_jid(self, jid: JID) -> Optional[GatewayUser]: 

155 """ 

156 Convenience function to get a user from their JID. 

157 

158 :param jid: JID of the gateway user 

159 :return: 

160 """ 

161 return self._users.get(jid.bare) 

162 

163 def get_by_stanza(self, s: Union[Presence, Message, Iq]) -> Optional[GatewayUser]: 

164 """ 

165 Convenience function to get a user from a stanza they sent. 

166 

167 :param s: A stanza sent by the gateway user 

168 :return: 

169 """ 

170 return self.get_by_jid(s.get_from()) 

171 

172 def close(self): 

173 self._users.sync() 

174 self._users.close() 

175 

176 

177user_store = UserStore() 

178""" 

179A persistent store for slidge users. Not public, but I didn't find how to hide 

180it from the docs! 

181""" 

182 

183log = logging.getLogger(__name__)