Coverage for slidge/db/alembic/old_user_store.py: 0%
88 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-07 05:11 +0000
1"""
2This module covers a backend for storing user data persistently and managing a
3pseudo-roster for the gateway component.
4"""
6import dataclasses
7import datetime
8import logging
9import os.path
10import shelve
11from io import BytesIO
12from os import PathLike
13from typing import Iterable, Optional, Union
15from pickle_secure import Pickler, Unpickler
16from slixmpp import JID, Iq, Message, Presence
19# noinspection PyUnresolvedReferences
20class EncryptedShelf(shelve.DbfilenameShelf):
21 cache: dict
22 dict: dict
23 writeback: bool
24 keyencoding: str
25 _protocol: int
27 def __init__(
28 self, filename: PathLike, key: str, flag="c", protocol=None, writeback=False
29 ):
30 super().__init__(str(filename), flag, protocol, writeback)
31 self.secret_key = key
33 def __getitem__(self, key):
34 try:
35 value = self.cache[key]
36 except KeyError:
37 f = BytesIO(self.dict[key.encode(self.keyencoding)])
38 value = Unpickler(f, key=self.secret_key).load() # type:ignore
39 if self.writeback:
40 self.cache[key] = value
41 return value
43 def __setitem__(self, key, value):
44 if self.writeback:
45 self.cache[key] = value
46 f = BytesIO()
47 p = Pickler(f, self._protocol, key=self.secret_key) # type:ignore
48 p.dump(value)
49 self.dict[key.encode(self.keyencoding)] = f.getvalue()
52@dataclasses.dataclass
53class GatewayUser:
54 """
55 A gateway user
56 """
58 bare_jid: str
59 """Bare JID of the user"""
60 registration_form: dict[str, Optional[str]]
61 """Content of the registration form, as a dict"""
62 plugin_data: Optional[dict] = None
63 registration_date: Optional[datetime.datetime] = None
65 def __hash__(self):
66 return hash(self.bare_jid)
68 def __repr__(self):
69 return f"<User {self.bare_jid}>"
71 def __post_init__(self):
72 if self.registration_date is None:
73 self.registration_date = datetime.datetime.now()
75 @property
76 def jid(self) -> JID:
77 """
78 The user's (bare) JID
80 :return:
81 """
82 return JID(self.bare_jid)
84 def get(self, field: str, default: str = "") -> Optional[str]:
85 # """
86 # Get fields from the registration form (required to comply with slixmpp backend protocol)
87 #
88 # :param field: Name of the field
89 # :param default: Default value to return if the field is not present
90 #
91 # :return: Value of the field
92 # """
93 return self.registration_form.get(field, default)
96class UserStore:
97 """
98 Basic user store implementation using shelve from the python standard library
100 Set_file must be called before it is usable
101 """
103 def __init__(self):
104 self._users: shelve.Shelf[GatewayUser] = None # type: ignore
106 def set_file(self, filename: PathLike, secret_key: Optional[str] = None):
107 """
108 Set the file to use to store user data
110 :param filename: Path to the shelf file
111 :param secret_key: Secret key to store files encrypted on disk
112 """
113 if self._users is not None:
114 raise RuntimeError("Shelf file already set!")
115 if os.path.exists(filename):
116 log.info("Using existing slidge DB: %s", filename)
117 else:
118 log.info("Creating a new slidge DB: %s", filename)
119 if secret_key:
120 self._users = EncryptedShelf(filename, key=secret_key)
121 else:
122 self._users = shelve.open(str(filename))
123 log.info("Registered users in the DB: %s", list(self._users.keys()))
125 def get_all(self) -> Iterable[GatewayUser]:
126 """
127 Get all users in the store
129 :return: An iterable of GatewayUsers
130 """
131 return self._users.values()
133 def commit(self, user: GatewayUser):
134 self._users[user.jid.bare] = user
135 self._users.sync()
137 def get(self, _gateway_jid, _node, ifrom: JID, iq) -> Optional[GatewayUser]:
138 """
139 Get a user from the store
141 NB: there is no reason to call this, it is used by SliXMPP internal API
143 :param _gateway_jid:
144 :param _node:
145 :param ifrom:
146 :param iq:
147 :return:
148 """
149 if ifrom is None: # bug in SliXMPP's XEP_0100 plugin
150 ifrom = iq["from"]
151 log.debug("Getting user %s", ifrom.bare)
152 return self._users.get(ifrom.bare)
154 def get_by_jid(self, jid: JID) -> Optional[GatewayUser]:
155 """
156 Convenience function to get a user from their JID.
158 :param jid: JID of the gateway user
159 :return:
160 """
161 return self._users.get(jid.bare)
163 def get_by_stanza(self, s: Union[Presence, Message, Iq]) -> Optional[GatewayUser]:
164 """
165 Convenience function to get a user from a stanza they sent.
167 :param s: A stanza sent by the gateway user
168 :return:
169 """
170 return self.get_by_jid(s.get_from())
172 def close(self):
173 self._users.sync()
174 self._users.close()
177user_store = UserStore()
178"""
179A persistent store for slidge users. Not public, but I didn't find how to hide
180it from the docs!
181"""
183log = logging.getLogger(__name__)