"""
Just a stab at steam chat.
Unfortunately the library underneath uses gevent and there is probably some work to do
to make it play nice with asyncio.
Right now, listing friends + send them messages works BUT in a blocking way.
Asyncsteampy https://github.com/somespecialone/asyncsteampy
might be interesting as it uses python's asyncio BUT the
login process seem a little too exotic for my taste.
"""
import asyncio
from collections import defaultdict
from functools import partial
from typing import Any, Callable, Optional, Union
import steam.enums
from slixmpp import JID
from steam.client import SteamClient
from steam.client.user import SteamUser
from steam.core.msg import MsgProto
from steam.enums.common import EPersonaState, EResult
from steam.enums.emsg import EMsg
from steam.protobufs.steammessages_friendmessages_pb2 import (
k_EMessageReactionType_Emoticon,
)
from steam.steamid import SteamID
from slidge import (
BaseGateway,
BaseSession,
FormField,
GatewayUser,
LegacyContact,
LegacyMUC,
LegacyRoster,
XMPPError,
global_config,
)
from slidge.core.command.register import RegistrationType, TwoFactorNotRequired
from slidge.util import BiDict
[docs]class Gateway(BaseGateway):
[docs] REGISTRATION_INSTRUCTIONS = "Enter steam credentials"
[docs] REGISTRATION_FIELDS = [
FormField(var="username", label="Steam username", required=True),
FormField(var="password", label="Password", private=True, required=True),
]
[docs] REGISTRATION_TYPE = RegistrationType.TWO_FACTOR_CODE
[docs] COMPONENT_NAME = "Steam (slidge)"
[docs] COMPONENT_TYPE = "steam"
[docs] COMPONENT_AVATAR = "https://logos-download.com/wp-content/uploads/2016/05/Steam_icon_logo_logotype.png"
def __init__(self):
super().__init__()
self._pending_registrations = dict[str, tuple[SteamClient, EResult]]()
# we store logged clients on registration to get it
self.steam_clients = dict[str, SteamClient]()
[docs] async def validate(
self, user_jid: JID, registration_form: dict[str, Optional[str]]
):
username = registration_form["username"]
password = registration_form["password"]
store_dir = global_config.HOME_DIR / user_jid.bare
store_dir.mkdir(exist_ok=True)
client = SteamClient()
client.set_credential_location(store_dir)
login_result = client.login(username, password)
if login_result == EResult.InvalidPassword:
raise ValueError("Invalid password")
elif login_result == EResult.OK:
self.steam_clients[user_jid.bare] = client
raise TwoFactorNotRequired
elif login_result in (
EResult.AccountLogonDenied,
EResult.AccountLoginDeniedNeedTwoFactor,
):
self._pending_registrations[user_jid.bare] = client, login_result
else:
raise ValueError(f"Login problem: {login_result}")
[docs] async def validate_two_factor_code(self, user: GatewayUser, code: str):
username = user.registration_form["username"]
password = user.registration_form["password"]
client, login_result = self._pending_registrations.pop(user.bare_jid)
if login_result == EResult.AccountLogonDenied:
# 2FA by mail (?)
login_result = client.login(username, password, auth_code=code)
elif login_result == EResult.AccountLoginDeniedNeedTwoFactor:
# steam guard (?)
login_result = client.login(username, password, two_factor_code=code)
if login_result != EResult.OK:
raise XMPPError("forbidden", text=f"Could not login: {login_result}")
# store the client, so it's picked up on Sessions.login(), without re-auth
self.steam_clients[user.bare_jid] = client
[docs]class Roster(LegacyRoster[int, Contact]):
[docs] async def jid_username_to_legacy_id(self, jid_username: str) -> int:
try:
return int(jid_username)
except ValueError:
raise XMPPError("bad-request")
[docs] def by_steam_user(self, steam_user: SteamUser) -> asyncio.Task[Contact]:
return self.by_steam_id(steam_user.steam_id)
[docs] def by_steam_id(self, steam_id: SteamID) -> asyncio.Task[Contact]:
return self.session.xmpp.loop.create_task(self.by_legacy_id(steam_id.id))
[docs] def by_steam_user_apply(self, steam_user: SteamUser, method: Callable):
task = self.by_steam_user(steam_user)
task.add_done_callback(lambda f: method(f.result()))
[docs] def by_steam_id_apply(self, steam_id: SteamID, method: Callable):
task = self.by_steam_id(steam_id)
task.add_done_callback(lambda f: method(f.result()))
[docs] async def fill(self):
for f in self.session.steam.friends:
self.session.log.debug("Friend: %s - %s - %s", f, f.name, f.steam_id.id)
c = await self.by_legacy_id(f.steam_id.id)
c.is_friend = True
await c.add_to_roster()
[docs]Recipient = Union[Contact, LegacyMUC]
[docs]class Session(BaseSession[int, Recipient]):
def __init__(self, user):
super().__init__(user)
store_dir = global_config.HOME_DIR / self.user.bare_jid
store_dir.mkdir(exist_ok=True)
self.job_futures = dict[str, asyncio.Future[Any]]()
client = self.xmpp.steam_clients.pop(user.bare_jid, None)
if client is None:
self.steam = SteamClient()
self.log.debug("Creating steam client, %s", store_dir)
self.steam.set_credential_location(store_dir)
else:
# in case the session is created just after successful registration
self.log.debug("Using found client: %s - %s", client, client.logged_on)
self.steam = client
self.steam.on(EMsg.ClientPersonaState, self.on_persona_state)
self.steam.on("FriendMessagesClient.IncomingMessage#1", self.on_friend_message)
self.steam.on("FriendMessagesClient.MessageReaction#1", self.on_friend_reaction)
self.steam.on(EMsg.ServiceMethodResponse, self.on_service_method_response)
self.steam.on("disconnected", self.on_steam_disconnected)
@staticmethod
[docs] def xmpp_msg_id_to_legacy_msg_id(xmpp_msg_id: str):
return int(xmpp_msg_id)
[docs] def on_steam_disconnected(self):
self.logged = False
self.send_gateway_status("Disconnected from steam", show="busy")
self.send_gateway_message(
"You have been disconnected from steam. "
"You can try to re-login via the dedicated adhoc command, but you "
"might need to unregister and re-register to the gateway component."
)
[docs] async def login(self):
if not self.steam.logged_on:
# if just after registration, we're already logged on
self.log.debug("Client is not logged on")
login_result = self.steam.login(
self.user.registration_form["username"],
self.user.registration_form["password"],
)
self.log.debug("Re-login result: %s", login_result)
if login_result == EResult.OK:
self.log.debug("Login success")
else:
raise RuntimeError("Could not connect to steam")
asyncio.create_task(self.idle())
return "Connected as " + self.user.registration_form["username"] # type: ignore
[docs] async def idle(self):
while True:
self.steam.idle()
await asyncio.sleep(0.1)
[docs] def on_service_method_response(self, msg):
self.log.debug("New service method response : %s", msg)
try:
fut = self.job_futures.pop(f"job_{msg.header.jobid_target}")
except KeyError:
self.log.debug(
"Ignoring: %s vs %s", msg.header.jobid_target, self.job_futures
)
else:
fut.set_result(msg.body)
[docs] def on_friend_message(self, msg):
self.log.debug("New friend message : %s", msg)
steam_user = self.steam.get_user(msg.body.steamid_friend)
if (type_ := msg.body.chat_entry_type) == steam.enums.EChatEntryType.Typing:
self.contacts.by_steam_user_apply(steam_user, Contact.composing)
elif type_ == steam.enums.EChatEntryType.ChatMsg:
self.contacts.by_steam_user_apply(
steam_user,
partial(
Contact.send_text,
body=msg.body.message,
legacy_msg_id=msg.body.rtime32_server_timestamp,
carbon=msg.body.local_echo,
),
)
[docs] def on_friend_reaction(self, msg):
self.log.debug("New friend reaction : %s", msg)
body = msg.body
timestamp = body.server_timestamp
emoji = emoji_translate.get(body.reaction) or "❓"
if body.reactor == self.steam.steam_id:
if body.reaction_type == k_EMessageReactionType_Emoticon:
contact_task = self.contacts.by_steam_id(
SteamID(msg.body.steamid_friend)
)
def callback(task: asyncio.Task[Contact]):
c = task.result()
if body.is_add:
c.user_reactions[timestamp].add(emoji)
else:
try:
c.user_reactions[timestamp].remove(emoji)
except KeyError:
self.log.warning(
"User removed a reaction we didn't know about"
)
c.react(timestamp, c.user_reactions[timestamp], carbon=True)
contact_task.add_done_callback(callback)
else:
if body.reaction_type == k_EMessageReactionType_Emoticon:
contact_task = self.contacts.by_steam_id(SteamID(msg.body.reactor))
def callback(task: asyncio.Task[Contact]):
c = task.result()
if body.is_add:
c.contact_reactions[timestamp].add(emoji)
else:
try:
c.contact_reactions[timestamp].remove(emoji)
except KeyError:
self.log.warning(
"Contact removed a reaction we didn't know about"
)
c.update_reactions(timestamp)
contact_task.add_done_callback(callback)
[docs] def on_persona_state(self, msg: MsgProto):
persona_state = msg.body
self.log.debug("New state event: %s", persona_state)
for f in persona_state.friends:
if f.friendid == self.steam.steam_id:
self.log.debug("This is me %s", self.steam.steam_id)
return
self.contacts.by_steam_id_apply(
SteamID(f.friendid),
partial(Contact.update_status, persona_state=f.persona_state),
)
[docs] async def logout(self):
pass
[docs] async def send_text(self, chat: Recipient, text: str, **k):
if not text:
return
job_id = self.steam.send_um(
"FriendMessages.SendMessage#1",
{
"steamid": SteamID(chat.legacy_id),
"chat_entry_type": steam.enums.EChatEntryType.ChatMsg,
"message": text,
},
)
f = self.job_futures[job_id] = self.xmpp.loop.create_future()
return (await f).server_timestamp
[docs] async def send_file(self, chat: Recipient, url: str, *a, **k):
return await self.send_text(chat, url)
[docs] async def active(self, c: Recipient, thread=None):
pass
[docs] async def inactive(self, c: Recipient, thread=None):
pass
[docs] async def composing(self, c: Recipient, thread=None):
self.steam.send_um(
"FriendMessages.SendMessage#1",
{
"steamid": SteamID(c.legacy_id),
"chat_entry_type": steam.enums.EChatEntryType.Typing,
},
)
[docs] async def paused(self, c: Recipient, thread=None):
pass
[docs] async def displayed(self, c: Recipient, legacy_msg_id: Any, thread=None):
pass
[docs] async def correct(self, c: Recipient, text: str, legacy_msg_id: Any, thread=None):
pass
[docs] async def search(self, form_values: dict[str, str]):
pass
[docs] async def react(
self, c: Recipient, legacy_msg_id: Any, emojis: list[str], thread=None
):
old = c.user_reactions[legacy_msg_id] # type: ignore
new = set[str]()
for emoji in emojis:
if emoji_translate.inverse.get(emoji) is None:
# should not happen anymore, slidge core should take care of that never happening
self.send_gateway_message(
"On steam, you can only react with"
f" {' '.join(emoji_translate.values())}"
)
else:
new.add(emoji)
for emoji_char in old - new:
self.steam.send_um(
"FriendMessages.UpdateMessageReaction#1",
{
"steamid": SteamID(c.legacy_id).as_64,
"server_timestamp": legacy_msg_id,
"reaction_type": k_EMessageReactionType_Emoticon,
"reaction": emoji_translate.inverse.get(emoji_char),
"is_add": False,
},
)
for emoji_char in new - old:
self.steam.send_um(
"FriendMessages.UpdateMessageReaction#1",
{
"steamid": SteamID(c.legacy_id).as_64,
"server_timestamp": legacy_msg_id,
"reaction_type": k_EMessageReactionType_Emoticon,
"reaction": emoji_translate.inverse.get(emoji_char),
"is_add": True,
},
)
c.user_reactions[legacy_msg_id] = new # type: ignore
c.react(legacy_msg_id, new, carbon=True) # type: ignore
[docs] async def retract(self, c: Recipient, legacy_msg_id: Any, thread=None):
pass
[docs]emoji_translate = BiDict[str, str](
[
(":steamthumbsup:", "👍"),
(":steamthumbsdown:", "👎"),
(":steambored:", "🥱"),
(":steamfacepalm:", "🤦"),
(":steamhappy:", "😄"),
(":steammocking:", "😝"),
(":steamsalty:", "🧂"),
(":steamsad:", "😔"),
(":steamthis:", "⬆"),
]
)