import asyncio
import io
import logging
import pprint
import threading
from pathlib import Path
from threading import Lock, Thread
from typing import Any, Optional, Union

import skpy
from requests.exceptions import ConnectionError
from slidge import (
from slidge.util.sql import CachedPresence
from slidge.util.util import get_version  # noqa: F401
from slixmpp import JID
from slixmpp.exceptions import XMPPError

class Gateway(BaseGateway):
REGISTRATION_INSTRUCTIONS = "Enter skype credentials"
REGISTRATION_FIELDS = [ FormField(var="username", label="Username", required=True), FormField(var="password", label="Password", required=True, private=True), ]
ROSTER_GROUP = "Skype"
COMPONENT_NAME = "Skype (slidge)"
COMPONENT_TYPE = "skype"
async def validate( self, user_jid: JID, registration_form: dict[str, Optional[str]] ): try: await asyncio.to_thread( skpy.Skype, registration_form["username"], registration_form["password"], str(global_config.HOME_DIR / user_jid.bare), ) except skpy.SkypeApiException: raise XMPPError("bad-request") except skpy.SkypeAuthException: raise XMPPError("forbidden")
class Contact(LegacyContact[str]):
session: "Session"
def update_presence(self, status: skpy.SkypeUtils.Status): if status == skpy.SkypeUtils.Status.Offline: self.offline() elif status == skpy.SkypeUtils.Status.Busy: self.busy() elif status == skpy.SkypeUtils.Status.Away: self.away("Away") elif status == skpy.SkypeUtils.Status.Idle: self.away("Idle") elif status == skpy.SkypeUtils.Status.Online: else: log.warning("Unknown contact status: %s", status)
def update_mood(self, mood: skpy.SkypeContact.Mood): mood_str = str(mood) if mood_str: if p := self._get_last_presence(): self._store_last_presence( CachedPresence(p.last_seen, p.ptype, mood_str, p.pshow) ) self.send_last_presence() else:
async def update_info(self, contact: Optional[skpy.SkypeContact] = None): if contact is None: contact = if contact is None: raise XMPPError("item-not-found") first = last = if first is not None and last is not None: = f"{first} {last}" elif first is not None: = first elif last is not None: = last if contact.avatar is not None: await self.set_avatar(contact.avatar) self.set_vcard( given=first, surname=last,, locality=str(contact.location), phones=[p for p in contact.phones if p.number], birthday=contact.birthday, ) self.update_mood(contact.mood) self.is_friend = True
class ListenThread(Thread): def __init__(self, session: "Session", *a, **kw): super().__init__(*a, **kw, daemon=True) = f"listen-{session.user.bare_jid}" self.session = session self._target = self.skype_blocking self.stop_event = threading.Event()
def skype_blocking(self): session = self.session sk = loop = session.xmpp.loop while True: if self.stop_event.is_set(): break for event in sk.getEvents(): # no need to sleep since getEvents blocks for 30 seconds already asyncio.run_coroutine_threadsafe(session.on_skype_event(event), loop)
def stop(self): self.stop_event.set()
class Roster(LegacyRoster):
session: "Session"
async def fill(self): for contact in c = await self.by_legacy_id( await c.add_to_roster()
Recipient = Union[Contact, LegacyMUC]
class Session(BaseSession[int, Recipient]):
skype_token_path: Path
sk: skpy.Skype
def __init__(self, user): super().__init__(user) self.skype_token_path = global_config.HOME_DIR / self.user.bare_jid self.thread: Optional[ListenThread] = None self.sent_by_user_to_ack = dict[int, asyncio.Future]() self.unread_by_user = dict[int, skpy.SkypeMsg]() self.send_lock = Lock()
def shutdown(self): super().shutdown() log.debug("Shutting down user threads") if thread := self.thread: thread.stop()
async def login(self): f = self.user.registration_form = await asyncio.to_thread( skpy.Skype, f["username"], f["password"], str(self.skype_token_path), ) try: except skpy.core.SkypeApiException: self.log.warning("Could not subscribe to presences") extra = " (presences not working) " else: extra = "" # TODO: Creating 1 thread per user is probably very not optimal. # We should contribute to skpy to make it aiohttp compatible… self.thread = thread = ListenThread(self) thread.start() return f"Connected{extra} as '{}'"
async def on_skype_event(self, event: skpy.SkypeEvent): log.debug("Skype event: %s", event) if isinstance(event, skpy.SkypeNewMessageEvent): while self.send_lock.locked(): await asyncio.sleep(0.1) msg = event.msg chat = if isinstance(chat, skpy.SkypeSingleChat): log.debug("this is a single chat with user: %s", chat.userIds[0]) contact = await self.contacts.by_legacy_id(chat.userIds[0]) if msg.userId == try: fut = self.sent_by_user_to_ack.pop(msg.clientId) except KeyError: if log.isEnabledFor(logging.DEBUG): log.debug( "Slidge did not send this message: %s", pprint.pformat(vars(event)), ) contact.send_text(msg.plain, carbon=True) else: fut.set_result(msg) else: if isinstance(msg, skpy.SkypeTextMsg): contact.send_text(msg.plain, legacy_msg_id=msg.clientId) self.unread_by_user[msg.clientId] = msg elif isinstance(msg, skpy.SkypeFileMsg): # non-blocking download / lambda because fileContent = property data = await asyncio.to_thread(lambda: msg.fileContent) await contact.send_file(, data=data) elif isinstance(event, skpy.SkypeTypingEvent): contact = await self.contacts.by_legacy_id(event.userId) if contact.composing() else: contact.paused() elif isinstance(event, skpy.SkypeEditMessageEvent): msg = event.msg chat = if isinstance(chat, skpy.SkypeSingleChat): if (user_id := msg.userId) != if log.isEnabledFor(logging.DEBUG): log.debug("edit msg event: %s", pprint.pformat(vars(event))) contact = await self.contacts.by_legacy_id(user_id) msg_id = msg.clientId log.debug("edited msg id: %s", msg_id) if text := msg.plain: contact.correct(msg_id, text) else: if msg_id: contact.retract(msg_id) else: contact.send_text( "/me tried to remove a message, but slidge got in" " trouble" ) elif isinstance(event, skpy.SkypeChatUpdateEvent): if log.isEnabledFor(logging.DEBUG): log.debug("chat update: %s", pprint.pformat(vars(event))) elif isinstance(event, skpy.SkypePresenceEvent): if event.userId != (await self.contacts.by_legacy_id(event.userId)).update_presence( event.status ) # No 'contact has read' event :( await asyncio.to_thread(event.ack)
async def on_text(self, chat: Recipient, text: str, **k): skype_chat =[chat.legacy_id].chat self.send_lock.acquire() msg = await asyncio.to_thread(skype_chat.sendMsg, text) if log.isEnabledFor(logging.DEBUG): log.debug("Sent msg: %s", pprint.pformat(vars(msg))) future = asyncio.Future[skpy.SkypeMsg]() self.sent_by_user_to_ack[msg.clientId] = future self.send_lock.release() skype_msg = await future return skype_msg.clientId
async def logout(self): if self.thread is not None: self.thread.stop() self.thread.join()
async def on_file(self, chat: Recipient, url: str, http_response, **kwargs): fname = url.split("/")[-1] await asyncio.to_thread([chat.legacy_id].chat.sendFile, io.BytesIO(await, fname, http_response.content_type.startswith("image"), )
async def on_composing(self, c: Recipient, thread=None): await asyncio.to_thread([c.legacy_id].chat.setTyping, True)
async def on_paused(self, c: Recipient, thread=None): await asyncio.to_thread([c.legacy_id].chat.setTyping, False)
async def on_displayed(self, c: Recipient, legacy_msg_id: int, thread=None): try: skype_msg = self.unread_by_user.pop(legacy_msg_id) except KeyError: log.debug( "We did not transmit: %s (%s)", legacy_msg_id, self.unread_by_user ) else: log.debug("Calling read on %s", skype_msg) try: await asyncio.to_thread( except skpy.SkypeApiException as e: # FIXME: this raises HTTP 400 and does not mark the message as read # self.log.debug("Skype read marker failed: %r", e)
async def on_correct( self, c: Recipient, text: str, legacy_msg_id: Any, thread=None, link_previews=(), mentions=None, ): m = self.get_msg(legacy_msg_id, c) await asyncio.to_thread(m.edit, text)
async def on_retract(self, c: Recipient, legacy_msg_id: Any, thread=None): m = self.get_msg(legacy_msg_id, c) log.debug("Deleting %s", m) await asyncio.to_thread(m.delete)
def get_msg(self, legacy_msg_id: int, contact: Recipient) -> skpy.SkypeTextMsg: for m in[contact.legacy_id].chat.getMsgs(): log.debug("Message %r vs %r : %s", legacy_msg_id, m.clientId, m) if m.clientId == legacy_msg_id: return m else: raise XMPPError( "item-not-found", text=f"Could not find message '{legacy_msg_id}'" )
def handle_thread_exception(args: threading.ExceptHookArgs): if ( (thread := getattr(args, "thread")) and isinstance(thread, ListenThread) and args.exc_type is ConnectionError ): session = thread.session"Connection error, attempting re-login for %s", session.user) session.logged = False thread.stop() session.re_login() else: log.error("Exception in thread: %s", args)
threading.excepthook = handle_thread_exception
log = logging.getLogger(__name__)
__version__ = "0.0.0-dev+20240517_gitf7e22c8abf"