aboutsummaryrefslogtreecommitdiff
path: root/NeoRuntime/Runtime/neo_runtime.py
diff options
context:
space:
mode:
authorJakob Stendahl <jakob.stendahl@outlook.com>2021-09-19 19:43:11 +0200
committerJakob Stendahl <jakob.stendahl@outlook.com>2021-09-19 19:43:11 +0200
commit7bdce37fd3f18e2712e18c4e2c64cac69af0aca1 (patch)
treeb7ad3f1cca92e2dfd2664ae9e65652bd03ff58b2 /NeoRuntime/Runtime/neo_runtime.py
parente6880cd8ccf82d993f222cb14b4860581654acb8 (diff)
downloadLuxcena-Neo-7bdce37fd3f18e2712e18c4e2c64cac69af0aca1.tar.gz
Luxcena-Neo-7bdce37fd3f18e2712e18c4e2c64cac69af0aca1.zip
:boom: Introduce new UI based on svelte, and rewrite a lot of the node app and the NeoRuntime
Diffstat (limited to 'NeoRuntime/Runtime/neo_runtime.py')
-rw-r--r--NeoRuntime/Runtime/neo_runtime.py177
1 files changed, 177 insertions, 0 deletions
diff --git a/NeoRuntime/Runtime/neo_runtime.py b/NeoRuntime/Runtime/neo_runtime.py
new file mode 100644
index 0000000..e5941e2
--- /dev/null
+++ b/NeoRuntime/Runtime/neo_runtime.py
@@ -0,0 +1,177 @@
+# This is the entry-point for all Luxcena-Neo python-scripts
+# The script should be in the same folder as this, and be named "script.py"
+# In the future you could possibly have more files and stuff alongside the "script.py"-file as well
+import sys
+import json
+import importlib
+import datetime
+import argparse
+import configparser
+import time
+import threading
+import select
+import traceback
+from os import path
+
+from luxcena_neo.strip import Strip
+
+def init_strip(strip_config_file):
+ """ Initialize a strip object with a config file path. """
+ print("> Loading pixel-configuration...")
+ strip_config_obj = configparser.ConfigParser()
+ strip_config_obj.read(args.strip_config)
+ strip_config = dict(strip_config_obj.items("DEFAULT"))
+ strip_config["matrix"] = json.loads(strip_config["matrix"].replace('"', ""))
+ strip_config["segments"] = [int(x) for x in strip_config["segments"].split(" ")]
+ strip_config["led_channel"] = int(strip_config["led_channel"])
+ strip_config["led_dma"] = int(strip_config["led_dma"])
+ strip_config["led_freq_hz"] = int(strip_config["led_freq_hz"])
+ strip_config["led_invert"] = (strip_config["led_invert"] == "false")
+ strip_config["led_pin"] = int(strip_config["led_pin"])
+ strip = Strip(strip_config)
+ return strip
+
+def init_package(package_path, entry_module, strip):
+ """ Initialize the package we are going to run. """
+ print ("> Initializing package (mode)...")
+ sys.path.append(package_path)
+ module = importlib.import_module(entry_module)
+ module_entry_instance = module.Main(package_path)
+
+ # Make the strip instance available in our modules
+ setattr(module, "strip", strip)
+
+ module_entry_instance.declare_variables()
+ return module_entry_instance
+
+def exec_module(module_executor_loop_func):
+ """ Create and start a new thread to run the package loop. """
+ th = threading.Thread(target=module_executor_loop_func, daemon=True)
+ th.start()
+ return th
+
+class NeoRuntime:
+
+
+ def __init__(self, package_path, entry_module, strip_config_file):
+ self.__strip = init_strip(strip_config_file)
+ self.__module_entry_instance = init_package(package_path, entry_module, self.__strip)
+ self.__module_th = None
+
+
+ def start(self):
+ # The mode is starting in it's own thread
+ print("> Running the mode...")
+ self.__module_th = exec_module(self.__module_loop)
+
+ # This will run in this thread.
+ print("> Starting to listen on stdin")
+ try:
+ self.__command_listener_loop()
+ except KeyboardInterrupt:
+ print("Exiting...")
+ except Exception as e:
+ traceback.print_exc()
+
+
+ def __command_listener_loop(self):
+ last_send = time.perf_counter()
+ while True:
+ if not self.__module_th.is_alive(): break
+ while sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
+ line = sys.stdin.readline()
+ if line:
+ line = line.replace("\n", "")
+ if (line[0:10] == ":::setvar:"):
+ name, value = (line.split(" ", 1)[1]).replace("\"", "").split(":", 1)
+ if name in self.__module_entry_instance.vars:
+ self.__module_entry_instance.vars[name] = value
+ elif (line[0:11] == ":::setglob:"):
+ name, value = (line.split(" ", 1)[1]).replace("\"", "").split(":", 1)
+ if name == "brightness":
+ self.__strip.brightness = int(value)
+ elif name == "power_on":
+ self.__strip.power_on = value == "true"
+ else:
+ print(f"Unknown globvar \"{name}\"")
+ else:
+ if (time.perf_counter() - last_send) > 0.5:
+ _vars = "{"
+ for name, var in self.__module_entry_instance.vars:
+ _vars += f" \"{name}\" : {{ \"value\": \"{var.value}\", \"var_type\": \"{var.var_type}\" }}, "
+ if len(_vars) > 2:
+ _vars = _vars[0:-2]
+ _vars += "}"
+
+ globvars = "{ \"power_on\": " + str(self.__strip.power_on).lower() + ", "
+ globvars += " \"brightness\":" + str(self.__strip.brightness) + " }"
+ print(f"{{ \":::data:\": {{ \"variables\": {_vars}, \"globvars\": {globvars} }} }}")
+ last_send = time.perf_counter()
+
+
+
+ def __module_loop(self):
+ self.__module_entry_instance.on_start()
+
+ self.__module_last_tick = time.perf_counter()
+ self.__module_last_second = time.perf_counter()
+ self.__module_last_minute = self.__module_last_second
+ self.__module_last_hour = self.__module_last_second
+ self.__module_last_day = self.__module_last_second
+
+ while True:
+ c_time = time.perf_counter()
+ try:
+ self.__module_tick(c_time, c_time - self.__module_last_tick)
+ except Exception as e:
+ traceback.print_exc()
+ self.__module_last_tick = time.perf_counter()
+
+
+ def __module_tick(self, runningtime, deltatime):
+ self.__module_entry_instance.each_tick()
+
+ if (runningtime - self.__module_last_second > 1):
+ self.__module_entry_instance.each_second()
+ self.__module_last_second = time.perf_counter()
+
+ if (((runningtime - self.__module_last_minute) % 60) > 1):
+ self.__module_entry_instance.each_minute()
+ self.__module_last_minute = time.perf_counter()
+
+ if (((runningtime - self.__module_last_hour) % 3600) > 1):
+ self.__module_entry_instance.each_minute()
+ self.__module_last_hour = time.perf_counter()
+
+ if (((runningtime - self.__module_last_day) % 86400) > 1):
+ self.__module_entry_instance.each_minute()
+ self.__module_last_day = time.perf_counter()
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--strip-config', help='Path to the strip config file.')
+ parser.add_argument('--mode-path', help='Path of the folder the mode is in.')
+ parser.add_argument('--mode-entry', help='Path of the module that is the entry-point of the module.')
+ args = parser.parse_args()
+
+ args.strip_config = args.strip_config.replace("\"", "")
+ args.mode_path = args.mode_path.replace("\"", "")
+ args.mode_entry = args.mode_entry.replace("\"", "")
+ if not path.exists(args.strip_config):
+ print(f"Strip config not found ({args.strip_config}).")
+ sys.exit(1)
+ if not path.exists(args.mode_path):
+ print(f"Mode path not found ({args.mode_path}).")
+ sys.exit(1)
+ if not path.exists(f"{args.mode_path}/{args.mode_entry}.py"):
+ print(f"Mode entry not found in mode path ({args.mode_path}/{args.mode_entry}).")
+ sys.exit(1)
+
+ print(f"StripConfig: {args.strip_config}")
+ print(f"Module : {args.mode_path}/{args.mode_entry}")
+
+ print(f"> Starting \"{args.mode_path}\" in NeoRuntime.")
+ runtime = NeoRuntime(args.mode_path, args.mode_entry, args.strip_config)
+ runtime.start()
+ print ("> NeoRuntime exited...")