Files
ksysguard-sensor-mqtt/ksysguard-sensor-mqtt.py

149 lines
4.3 KiB
Python
Raw Permalink Normal View History

2019-12-09 23:19:30 +01:00
#!/usr/bin/python3
import os
import sys
import yaml
import paho.mqtt.client as mqtt
from random import random
global config
config_file_path = os.path.dirname(os.path.abspath(__file__))+"/ksysguard-sensor-mqtt.yaml"
# Get and parse command line arguments
if __name__ == '__main__':
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("-c", "--config", required=False, default=None, help="path to the configuration file")
args = vars(ap.parse_args())
if args["config"]:
config_file_path = args["config"]
# Read configuration
with open(config_file_path, "r") as config_file:
try:
config = yaml.safe_load(config_file)
except yaml.YAMLError as err:
print("Could not load configuration file")
print(err)
# We cache the values received via MQTT, so that they are available whenever
# requested by the ksysguard client.
global values
values = dict()
for subscription in config['mqtt']['subscriptions']:
values[subscription['topic']] = None
# Given a monitor name, returns its data if the name matches the list
# of monitors in the configuration. If not, returns None.
def have_monitor(monitor):
return next(iter([subscription for subscription in config['mqtt']['subscriptions'] if subscription['monitor'] == monitor]), None)
# Given a monitor name, returns its most recent value. If the monitor
# does not exist, returns UNKNOWN COMMAND.
def monitor_value(monitor):
subscription = have_monitor(monitor)
if subscription:
return values[subscription['topic']]
else:
return "UNKNOWN COMMAND"
# Given a monitor name, returns its info string. If the monitor does
# not exist, returns UNKNOWN COMMAND.
def monitor_info(monitor):
subscription = have_monitor(monitor)
#print("# Monitor info for "+monitor)
#print(subscription)
if subscription:
descr = subscription['description'] # Shorter to type
return (descr['name'] + "\t" +
(descr['min'] if 'min' in descr else "") + "\t" +
(descr['max'] if 'max' in descr else "") + "\t" +
descr['units'])
else:
return "UNKNOWN COMMAND"
# MQTT logic
# The on_connect handler subscribes to all configured topics
# and issues the ksysguard prompt when finished. From that point,
# the client can start requesting data (although none may be available
# depending on the relative refresh rates of MQTT publishers and the
# ksysguard client.
def on_connect(client, userdata, flags, rc):
print("# Connected with result code "+str(rc))
for subscription in config['mqtt']['subscriptions']:
print("# Subscribing to " + subscription['topic'])
client.subscribe(subscription['topic'])
print("ksysguardd> ", end="", flush=True)
# The callback for when a PUBLISH message is received from the server.
# It stores the received value under the relevant topic key in the
# values dictionary, from where it will be retrieved when requested
# by the ksysguard client.
def on_message(client, userdata, msg):
global values
values[msg.topic] = msg.payload.decode("utf8").strip()
# Create the MQTT client and assign event handlers
client = mqtt.Client(client_id="ksysguardd-"+str(random())[2:])
client.on_connect = on_connect
client.on_message = on_message
if 'username' in config['mqtt'] and 'password' in config['mqtt']:
client.username_pw_set(config['mqtt']['username'], config['mqtt']['password'])
client.connect_async(config['mqtt']['host'], config['mqtt']['port'] if 'port' in config['mqtt'] else 1883, 60)
# Ksysguard logic
# Start by introducing ourselves. The protocol has been mostly deduced
# from this link https://techbase.kde.org/Development/Tutorials/Sensors
# and from the ksysguardd source code.
print("ksysguardd 4")
client.loop_start() # Asynchronously start the MQTT client
# Process lines of input from the ksysguard client
for line in sys.stdin:
line = line.strip()
if line == "monitors": # List available sensors
for subscription in config['mqtt']['subscriptions']:
print(subscription['monitor']+"\t"+subscription['type'])
elif line == "quit": # Shut down this process
print("# Quitting")
client.loop_stop()
exit()
elif len(line) and line[-1] == "?": # This is a query for the monitor's information
print(monitor_info(line[0:-1]))
else: # Assume that this is a query for the monitor's value
print("NaN" if monitor_value(line) == None else monitor_value(line))
print("ksysguardd> ", end="", flush=True)