First commit

This commit is contained in:
Nicolas Duhamel 2021-01-13 19:26:50 +01:00
commit 0a732b478a
6 changed files with 184 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# Source for the following rules: https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
*.egg-info/

4
pyproject.toml Normal file
View File

@ -0,0 +1,4 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

15
setup.cfg Normal file
View File

@ -0,0 +1,15 @@
[metadata]
name = citadel.devices
version = 0.0.1
[options]
package_dir =
=src
packages = find_namespace:
install_requires =
typer==0.3.2
citadel.mqtt==0.0.1
[options.packages.find]
where = src

2
setup.py Normal file
View File

@ -0,0 +1,2 @@
import setuptools
setuptools.setup()

View File

@ -0,0 +1,103 @@
import typing
from citadel.mqtt import Client
class Light:
def __init__(self, _id: str, client: Client, **kwargs):
self._state = None
self._change_callback = None
self._id = _id
self.STATE_TOPIC = kwargs.get('STATE_TOPIC', 'stat/{id}/POWER1')
self.STATE_ON_PAYLOAD = kwargs.get('STATE_ON_PAYLOAD', 'ON')
self.STATE_OFF_PAYLOAD = kwargs.get('STATE_OFF_PAYLOAD', 'OFF')
self.CMND_TOPIC = kwargs.get('CMND_TOPIC', 'cmnd/{id}/POWER1')
self.CMND_ON_PAYLOAD = kwargs.get('CMND_ON_PAYLOAD', 'ON')
self.CMND_OFF_PAYLOAD = kwargs.get('CMND_OFF_PAYLOAD', 'OFF')
self._client = client
self._client.subscribe_callback(self.STATE_TOPIC.format(id=self.id), self.onStateChange)
def setChangeCallback(self, callback: typing.Callable):
self._change_callback = callback
def onStateChange(self, client, userdata, msg):
state = msg.payload.decode()
if state == self.STATE_ON_PAYLOAD:
self._state = True
elif state == self.STATE_OFF_PAYLOAD:
self._state = False
else:
raise Exception("Unexpected value %s for stat topic" % state)
if self._change_callback:
self._change_callback(self)
def on(self):
self._client.publish(self.CMND_TOPIC.format(id=self.id), self.CMND_ON_PAYLOAD)
def off(self):
self._client.publish(self.CMND_TOPIC.format(id=self.id), self.CMND_OFF_PAYLOAD)
@property
def state(self):
return self._state
@property
def id(self):
return self._id
class LightsGroup:
def __init__(self, _id: str, client: Client, lights: [Light]):
""" Create a virtual mqtt device that handle multiple light """
self._id = _id
self._lights = lights
for l in self._lights:
l.setChangeCallback(self.onLightStateChange)
self._client = client
self._client.subscribe_callback('cmnd/%s/POWER' % self._id, self.cmndPower)
self._previous_state = all([l.state for l in self._lights])
def cmndPower(self, client, userdata, msg):
value = msg.payload.decode()
if value == 'ON':
self.on()
elif value == 'OFF':
self.off()
elif value == '':
self._client.publish('stat/%s/POWER' % self.id, 'ON' if self.state else 'OFF' )
else:
raise Exception("Unexpected value %s for stat topic" % value)
def onLightStateChange(self, light):
state = all([l.state for l in self._lights])
if state != self._previous_state:
self._client.publish('stat/%s/POWER' % self.id, 'ON' if state else 'OFF')
self._previous_state = state
def on(self):
for l in self._lights:
l.on()
def off(self):
for l in self._lights:
l.off()
@property
def state(self):
return self._previous_state
@property
def id(self):
return self._id

View File

@ -0,0 +1,52 @@
import logging
import typer
from citadel.mqtt import Client
from . import Light, LightsGroup
def main(\
mqtt_user: str = typer.Option(... , envvar="DEVICES_MQTT_USER"),\
mqtt_pwd: str = typer.Option(... , envvar="DEVICES_MQTT_PWD"),\
mqtt_host: str = typer.Option(... , envvar="DEVICES_MQTT_HOST"),\
mqtt_port: int = typer.Option(... , envvar="DEVICES_MQTT_PORT")\
):
client = Client()
client.setup(mqtt_host, mqtt_port, mqtt_user, mqtt_pwd)
lights1 = LightsGroup('KitchenLed',
client, [Light('KitchenCeiling1', client),
Light('KitchenCeiling2', client)])
lights2 = LightsGroup('KitchenCelling',
client, [
Light('KitchenCeiling1', client,
STATE_TOPIC='stat/{id}/POWER2',
CMND_TOPIC='cmnd/{id}/POWER2'),
Light('KitchenCeiling2', client,
STATE_TOPIC='stat/{id}/POWER2',
CMND_TOPIC='cmnd/{id}/POWER2')])
client.loop_forever()
if __name__ == "__main__":
# create logger
logger = logging.getLogger('')
logger.setLevel(logging.DEBUG)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)
typer.run(main)