home-assistant-configuration/config/custom_components/localtuya/discovery.py

91 lines
2.6 KiB
Python

"""Discovery module for Tuya devices.
Entirely based on tuya-convert.py from tuya-convert:
https://github.com/ct-Open-Source/tuya-convert/blob/master/scripts/tuya-discovery.py
"""
import asyncio
import json
import logging
from hashlib import md5
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
_LOGGER = logging.getLogger(__name__)
UDP_KEY = md5(b"yGAdlopoPVldABfn").digest()
DEFAULT_TIMEOUT = 6.0
def decrypt_udp(message):
"""Decrypt encrypted UDP broadcasts."""
def _unpad(data):
return data[: -ord(data[len(data) - 1 :])]
cipher = Cipher(algorithms.AES(UDP_KEY), modes.ECB(), default_backend())
decryptor = cipher.decryptor()
return _unpad(decryptor.update(message) + decryptor.finalize()).decode()
class TuyaDiscovery(asyncio.DatagramProtocol):
"""Datagram handler listening for Tuya broadcast messages."""
def __init__(self, callback=None):
"""Initialize a new BaseDiscovery."""
self.devices = {}
self._listeners = []
self._callback = callback
async def start(self):
"""Start discovery by listening to broadcasts."""
loop = asyncio.get_running_loop()
listener = loop.create_datagram_endpoint(
lambda: self, local_addr=("0.0.0.0", 6666)
)
encrypted_listener = loop.create_datagram_endpoint(
lambda: self, local_addr=("0.0.0.0", 6667)
)
self._listeners = await asyncio.gather(listener, encrypted_listener)
_LOGGER.debug("Listening to broadcasts on UDP port 6666 and 6667")
def close(self):
"""Stop discovery."""
self._callback = None
for transport, _ in self._listeners:
transport.close()
def datagram_received(self, data, addr):
"""Handle received broadcast message."""
data = data[20:-8]
try:
data = decrypt_udp(data)
except Exception: # pylint: disable=broad-except
data = data.decode()
decoded = json.loads(data)
self.device_found(decoded)
def device_found(self, device):
"""Discover a new device."""
if device.get("ip") not in self.devices:
self.devices[device.get("gwId")] = device
_LOGGER.debug("Discovered device: %s", device)
if self._callback:
self._callback(device)
async def discover():
"""Discover and return devices on local network."""
discovery = TuyaDiscovery()
try:
await discovery.start()
await asyncio.sleep(DEFAULT_TIMEOUT)
finally:
discovery.close()
return discovery.devices