diff options
author | Jakob Stendahl <jakob.stendahl@outlook.com> | 2021-09-19 19:43:11 +0200 |
---|---|---|
committer | Jakob Stendahl <jakob.stendahl@outlook.com> | 2021-09-19 19:43:11 +0200 |
commit | 7bdce37fd3f18e2712e18c4e2c64cac69af0aca1 (patch) | |
tree | b7ad3f1cca92e2dfd2664ae9e65652bd03ff58b2 /NeoRuntime/Runtime | |
parent | e6880cd8ccf82d993f222cb14b4860581654acb8 (diff) | |
download | Luxcena-Neo-7bdce37fd3f18e2712e18c4e2c64cac69af0aca1.tar.gz Luxcena-Neo-7bdce37fd3f18e2712e18c4e2c64cac69af0aca1.zip |
:boom: Introduce new UI based on svelte, and rewrite a lot of the node app and the NeoRuntime
Diffstat (limited to 'NeoRuntime/Runtime')
-rw-r--r-- | NeoRuntime/Runtime/luxcena_neo/__init__.py | 2 | ||||
-rw-r--r-- | NeoRuntime/Runtime/luxcena_neo/color_utils.py | 21 | ||||
-rw-r--r-- | NeoRuntime/Runtime/luxcena_neo/matrix.py | 73 | ||||
-rw-r--r-- | NeoRuntime/Runtime/luxcena_neo/neo_behaviour.py | 149 | ||||
-rw-r--r-- | NeoRuntime/Runtime/luxcena_neo/power_calc.py | 6 | ||||
-rw-r--r-- | NeoRuntime/Runtime/luxcena_neo/strip.py | 184 | ||||
-rw-r--r-- | NeoRuntime/Runtime/neo_runtime.py | 177 | ||||
-rw-r--r-- | NeoRuntime/Runtime/requirements.txt | 1 |
8 files changed, 613 insertions, 0 deletions
diff --git a/NeoRuntime/Runtime/luxcena_neo/__init__.py b/NeoRuntime/Runtime/luxcena_neo/__init__.py new file mode 100644 index 0000000..dfec639 --- /dev/null +++ b/NeoRuntime/Runtime/luxcena_neo/__init__.py @@ -0,0 +1,2 @@ +from .neo_behaviour import NeoBehaviour, VariableType +import luxcena_neo.color_utils as utils
\ No newline at end of file diff --git a/NeoRuntime/Runtime/luxcena_neo/color_utils.py b/NeoRuntime/Runtime/luxcena_neo/color_utils.py new file mode 100644 index 0000000..a17d8e4 --- /dev/null +++ b/NeoRuntime/Runtime/luxcena_neo/color_utils.py @@ -0,0 +1,21 @@ +def hex_to_rgb(value: str) -> tuple: + """ Convert hex color to rgb tuple. """ + value = value.lstrip("#") + lv = len(value) + return tuple(int(value[i:i + lv // 3], 16) for i in range(0, lv, lv // 3)) + +def rgb_to_hex(rgb: tuple) -> str: + """ Convert rgb color to hex string. """ + return '#%02x%02x%02x' % rgb + +def rgb_from_24bit(color: int) -> tuple: + """ Convert 24-bit color value into a tuple representing the rgb values. """ + # w = (color & 0xFF000000) >> 24 + r = (color & 0x00FF0000) >> 16 + g = (color & 0x0000FF00) >> 8 + b = (color & 0x000000FF) + return (r, g, b) + +def hex_from_24bit(color: int) -> str: + """ Convert 24-bit color value into hex str. """ + rgb_to_hex(rgb_from_24bit(color)) diff --git a/NeoRuntime/Runtime/luxcena_neo/matrix.py b/NeoRuntime/Runtime/luxcena_neo/matrix.py new file mode 100644 index 0000000..b842789 --- /dev/null +++ b/NeoRuntime/Runtime/luxcena_neo/matrix.py @@ -0,0 +1,73 @@ +def get_segment_range(segments, n): + """ Return a list of all the actual led-numbers in a segment """ + # Sum all the segments prior to the one we are looking for + i = 0 + start = 0 + while True: + if i >= n: break + start += segments[i] # Add number of leds in this segment to the start-index + i += 1 + + # Add all numbers in the segment we are looking for to a list + i = start + breakPoint = i + segments[n] + range = [] + while True: + range.append(i) + i += 1 + if i >= breakPoint: break + return range + + +class Matrix: + + def __init__(self, segments, matrix): + self.matrix = [] # Holds the matrix + self.x_len = 0 # The width of the matrix + self.y_len = len(matrix) # The heigth of the matrix + + for y_pos in range(len(matrix)): + y_val = matrix[y_pos] + + this_y = [] + for x_pos in range(len(y_val)): + # This gets the range of segment n + segment_range = get_segment_range(segments, matrix[y_pos][x_pos][0]) + + # This adds the range to the current row's list + # if in the config [<segment_num>, <reversed>] + # reversed == true, revese the list before adding it + this_y += reversed(segment_range) if matrix[y_pos][x_pos][1] else segment_range + + # This just finds the longest row in the matrix + if (len(this_y) > self.x_len): + self.x_len = len(this_y) + + self.matrix.append(this_y) + + def get(self, x, y): + """ Return the value of a place in the matrix given x and y coordinates """ + return self.matrix[y][x] + + def dump(self): + n_spacers = (self.x_len*6) // 2 - 6 + print( ("=" * n_spacers) + "Matrix dump" + ("=" * n_spacers) ) + + for y in self.matrix: + this_y_line = "" + for x in y: + this_y_line += ( ' ' * (5 - len(str(x))) ) + str(x) + ' ' + print(this_y_line) + + print("=" * (self.x_len*6)) + + +if __name__ == "__main__": + test_matrix = Matrix( + [2, 2, 2, 2, 2, 2, 2, 2, 2], + [ + [[0, False], [1, True], [2, False]], + [[3, True], [4, False], [5, True]], + [[6, False], [7, True], [8, False]] + ] + ) diff --git a/NeoRuntime/Runtime/luxcena_neo/neo_behaviour.py b/NeoRuntime/Runtime/luxcena_neo/neo_behaviour.py new file mode 100644 index 0000000..9920ca4 --- /dev/null +++ b/NeoRuntime/Runtime/luxcena_neo/neo_behaviour.py @@ -0,0 +1,149 @@ +import json +from os import path +from enum import Enum + +class NeoBehaviour: + """ + This is the base-class "main" should inherit from! + All methods are blocking :) This means that you could potentially loose a "tick" + For example, if "eachSecond" is taking up the thread, and the clock goes from 11:58 to 12:02, "eachHour", will not be called. + """ + + def __init__(self, package_path): + """ + THIS METHOD SHOULD NOT BE OVERIDDEN! Use onStart if you want a setup-method!!! + Contains basic setup + """ + self.vars = Variables(package_path) + + def declare_variables(self): + """ This should be overridden, and ALL variables should be declared here. """ + return + + def on_start(self): + """ This method will be run right after __init__ """ + return + + def each_tick(self): + """ This method will be run every tick, that means every time the program has time """ + return + + def each_second(self): + """ This method is called every second (on the clock), given that the thread is open """ + return + + def each_minute(self): + """ This method is called every mintue (on the clock), given that the thread is open """ + return + + def each_hour(self): + """ This method is called every whole hour (on the clock), given that the thread is open """ + return + + def each_day(self): + """ This method is called every day at noon, given that the thread is open """ + return + +class VariableType(Enum): + TEXT = 1 + INT = 2 + RANGE = 3 + COLOR = 4 + +class Variables: + + def __init__(self, package_path): + self.__vars = {} + self.__vars_save_file = f"{package_path}/variables.json" + self.__saved_variables = {} + self.read_saved_variables() + + def __getattr__(self, name): + if name in ["_Variables__vars", "_Variables__vars_save_file", "_Variables__saved_variables"]: + return super(Variables, self).__getattr__(name) + return self.__vars[name].value + + def __setattr__(self, name, value): + if name in ["_Variables__vars", "_Variables__vars_save_file", "_Variables__saved_variables"]: + super(Variables, self).__setattr__(name, value) + elif type(value) == Variable: + self.__vars[name] = value + else: + self.__vars[name].value = value + + def __delattr__(self, name): + if name in ["_Variables__vars", "_Variables__vars_save_file", "_Variables__saved_variables"]: + super(Variables, self).__delattr__(name) + else: + del self.__vars[name] + + def __getitem__(self, name): + return self.__vars[name] + + def __setitem__(self, name, value): + self.__vars[name].value = value + + def __contains__(self, name): + return name in self.__vars + + def __repr__(self): + return json.dumps({name: var.value for name, var in self.__vars.items()}) + + def __str__(self): + return repr(self) + + def __dir__(self): + return super(Variables, self).__dir__() + [name for name in self.__vars.keys()] + + def __len__(self): + return len(self.__vars) + + def __iter__(self): + return iter(self.__vars.items()) + + def declare(self, name: str, default: any, var_type: VariableType, on_change = None, **kwargs): + """ Declare a new variable. """ + if name in self.__saved_variables: + default = self.__saved_variables[name] + var = Variable(self.save_variables, name, default, var_type, on_change, **kwargs) + self.__setattr__(name, var) + + def read_saved_variables(self): + """ Read saved variable values from file. """ + if not path.exists(self.__vars_save_file): return + try: + with open(self.__vars_save_file, "r") as f: + self.__saved_variables = json.load(f) + except: + print("Could not load saved variables") + + def save_variables(self): + """ Save variable values to file. """ + self.__saved_variables = {name: var.value for name, var in self.__vars.items()} + with open(self.__vars_save_file, "w") as f: + f.write(json.dumps(self.__saved_variables)) + +class Variable: + + def __init__(self, save_func, key, default, var_type, on_change): + self.__save_func = save_func + self.__key = key + self.__value = default + self.__var_type = var_type + self.__on_change = on_change + + @property + def key(self): return self.__key + + @property + def value(self): return self.__value + + @value.setter + def value(self, value): + self.__value = value + self.__save_func() + if self.__on_change is not None: + self.__on_change(self.value) + + @property + def var_type(self): return self.__var_type.name diff --git a/NeoRuntime/Runtime/luxcena_neo/power_calc.py b/NeoRuntime/Runtime/luxcena_neo/power_calc.py new file mode 100644 index 0000000..4f6a6d6 --- /dev/null +++ b/NeoRuntime/Runtime/luxcena_neo/power_calc.py @@ -0,0 +1,6 @@ + + +def calcCurrent(pixels): + current = 0 + for pixel in pixels: + break diff --git a/NeoRuntime/Runtime/luxcena_neo/strip.py b/NeoRuntime/Runtime/luxcena_neo/strip.py new file mode 100644 index 0000000..a65b3f0 --- /dev/null +++ b/NeoRuntime/Runtime/luxcena_neo/strip.py @@ -0,0 +1,184 @@ +import json +from os import path +from .neopixel import * +from .matrix import Matrix, get_segment_range +from .power_calc import calcCurrent + + +class Strip: + + def __init__(self, strip_conf): + self.SEGMENTS = strip_conf["segments"] + + self.LED_FREQ_HZ = strip_conf["led_freq_hz"] # LED signal frequency in hertz (usually 800khz) + self.LED_CHANNEL = strip_conf["led_channel"] # Set to '1' for GPIOs 13, 19, 41, 45, 53 + self.LED_INVERT = strip_conf["led_invert"] # True to invert the signal, (when using NPN transistor level shift) + self.LED_PIN = strip_conf["led_pin"] # 18 uses PWM, 10 uses SPI /dev/spidev0.0 + self.LED_DMA = strip_conf["led_dma"] # DMA channel for generating the signal, on the newer ones, try 10 + self.LED_COUNT = sum(self.SEGMENTS) # Number of LEDs in strip + + if ("color_calibration" in strip_conf) and (strip_conf["color_calibration"] != ""): + self.COLOR_CALIBRATION = strip_conf["led_calibration"] + else: + self.COLOR_CALIBRATION = [(1,1,1) for x in range(self.LED_COUNT)] + + self.TMPCOLORSTATE = [0 for x in range(self.LED_COUNT)] + self.COLORSTATE = [0 for x in range(self.LED_COUNT)] + + self.LED_BRIGHTNESS = 255 + + self.strip = Adafruit_NeoPixel( + self.LED_COUNT, + self.LED_PIN, + self.LED_FREQ_HZ, + self.LED_DMA, + self.LED_INVERT, + self.LED_BRIGHTNESS, + self.LED_CHANNEL + ) + + self.strip.begin() + + # Blank out all the LEDs + i = 0 + while True: + self.strip.setPixelColor(i, 0) + i += 1 + if (i > self.LED_COUNT): break + self.strip.show() + + # Setup matrix + print(" * Generating matrix") + # try: + self.pixelMatrix = Matrix(self.SEGMENTS, strip_conf["matrix"]) + self.pixelMatrix.dump() + # except: + # print("Something went wrong while setting up your self-defined matrix.") + + self.__power_on = True + self.__brightness = 255 + self.__actual_brightness = self.__brightness + + self.__globvars_path = path.join(path.split(path.dirname(path.abspath(__file__)))[0], "globvars.json") + if path.exists(self.__globvars_path): + try: + with open(self.__globvars_path, "r") as f: + globvars = json.load(f) + self.power_on = globvars["power_on"] + self.brightness = globvars["brightness"] + except: + print("Could not load saved globvars...") + + + def save_globvars(self): + with open(self.__globvars_path, "w") as f: + f.write(json.dumps({ + "power_on": self.__power_on, + "brightness": self.__brightness + })) + + @property + def power_on(self): + return self.__power_on + + @power_on.setter + def power_on(self, value: bool): + self.__power_on = value + if (self.power_on): + self.__actual_brightness = self.__brightness + # self.strip.setBrightness(self.__brightness) + else: + self.__actual_brightness = 0 + # self.strip.setBrightness(0) + self.save_globvars() + + @property + def brightness(self): + # return self.strip.getBrightness() + return self.__actual_brightness + + @brightness.setter + def brightness(self, value: int): + if 0 <= value <= 255: + self.__brightness = value + if (self.power_on): + self.__actual_brightness = value + # self.strip.setBrightness(value) + self.save_globvars() + else: + raise Exception(f"Value ({value}) outside allowed range (0-255)") + + def show(self): + """Update the display with the data from the LED buffer.""" + self.COLORSTATE = self.TMPCOLORSTATE + self.strip.show() + + def set_pixel_color(self, n, *color): + """Set LED at position n to the provided 24-bit color value (in RGB order). + """ + c = detect_format_convert_color(*color) + self.TMPCOLORSTATE[n] = c + # self.strip.setPixelColor(n, ) + + def set_pixel_color_XY(self, x, y, *color): + """Set LED at position n to the provided 24-bit color value (in RGB order). + """ + self.set_pixel_color(self.pixelMatrix.get(x, y), *color) + + def set_segment_color(self, segment, *color): + """Set a whole segment to the provided red, green and blue color. + Each color component should be a value from 0 to 255 (where 0 is the + lowest intensity and 255 is the highest intensity).""" + for n in get_segment_range(self.SEGMENTS, segment): + self.set_pixel_color(n, *color) + + def get_pixels(self): + """Return an object which allows access to the LED display data as if + it were a sequence of 24-bit RGB values. + """ + return self.strip.getPixels() + + def num_pixels(self): + """Return the number of pixels in the display.""" + return self.LED_COUNT + + def get_pixel_color(self, n): + """Get the 24-bit RGB color value for the LED at position n.""" + return self.strip.getPixelColor(n) + + +def color_from_rgb(red, green, blue, white=0): + """ + Convert the provided red, green, blue color to a 24-bit color value. + Each color component should be a value 0-255 + where 0 is the lowest intensity and 255 is the highest intensity. + """ + return (white << 24) | (red << 16) | (green << 8) | blue + + +def color_from_hex(hex_color: str): + """ Convert the provided hex code to a 24-bit color value. """ + value = hex_color.lstrip('#') + lv = len(value) + rgb = tuple(int(value[i:i+lv//3], 16) for i in range(0, lv, lv//3)) + return color_from_rgb(red=rgb[1], green=rgb[0], blue=rgb[2]) + + +def detect_format_convert_color(*color) -> int: + """ + Detect format of a color and return its 24-bit color value. + + If parameter is only a str, it will be treated as a hex value. + If parameter is a tuple, the first three items in that tuple will be treated as a rgb value. + If parameter is a int, it will be treated as a 24-bit color value. + If there are 3 parameters, these will be treated as a rgb value. + """ + if (len(color) == 1) and (isinstance(color[0], str)): + return color_from_hex(color[0]) + if (len(color) == 1) and (isinstance(color[0], tuple)): + return color_from_rgb(*(color[0])) + if (len(color) == 1) and (isinstance(color[0], int)): + return color[0] + if (len(color) == 3): + return color_from_rgb(*color) + raise ValueError("Invalid parameters provided, check documentation.")
\ No newline at end of file diff --git a/NeoRuntime/Runtime/neo_runtime.py b/NeoRuntime/Runtime/neo_runtime.py new file mode 100644 index 0000000..e5941e2 --- /dev/null +++ b/NeoRuntime/Runtime/neo_runtime.py @@ -0,0 +1,177 @@ +# This is the entry-point for all Luxcena-Neo python-scripts +# The script should be in the same folder as this, and be named "script.py" +# In the future you could possibly have more files and stuff alongside the "script.py"-file as well +import sys +import json +import importlib +import datetime +import argparse +import configparser +import time +import threading +import select +import traceback +from os import path + +from luxcena_neo.strip import Strip + +def init_strip(strip_config_file): + """ Initialize a strip object with a config file path. """ + print("> Loading pixel-configuration...") + strip_config_obj = configparser.ConfigParser() + strip_config_obj.read(args.strip_config) + strip_config = dict(strip_config_obj.items("DEFAULT")) + strip_config["matrix"] = json.loads(strip_config["matrix"].replace('"', "")) + strip_config["segments"] = [int(x) for x in strip_config["segments"].split(" ")] + strip_config["led_channel"] = int(strip_config["led_channel"]) + strip_config["led_dma"] = int(strip_config["led_dma"]) + strip_config["led_freq_hz"] = int(strip_config["led_freq_hz"]) + strip_config["led_invert"] = (strip_config["led_invert"] == "false") + strip_config["led_pin"] = int(strip_config["led_pin"]) + strip = Strip(strip_config) + return strip + +def init_package(package_path, entry_module, strip): + """ Initialize the package we are going to run. """ + print ("> Initializing package (mode)...") + sys.path.append(package_path) + module = importlib.import_module(entry_module) + module_entry_instance = module.Main(package_path) + + # Make the strip instance available in our modules + setattr(module, "strip", strip) + + module_entry_instance.declare_variables() + return module_entry_instance + +def exec_module(module_executor_loop_func): + """ Create and start a new thread to run the package loop. """ + th = threading.Thread(target=module_executor_loop_func, daemon=True) + th.start() + return th + +class NeoRuntime: + + + def __init__(self, package_path, entry_module, strip_config_file): + self.__strip = init_strip(strip_config_file) + self.__module_entry_instance = init_package(package_path, entry_module, self.__strip) + self.__module_th = None + + + def start(self): + # The mode is starting in it's own thread + print("> Running the mode...") + self.__module_th = exec_module(self.__module_loop) + + # This will run in this thread. + print("> Starting to listen on stdin") + try: + self.__command_listener_loop() + except KeyboardInterrupt: + print("Exiting...") + except Exception as e: + traceback.print_exc() + + + def __command_listener_loop(self): + last_send = time.perf_counter() + while True: + if not self.__module_th.is_alive(): break + while sys.stdin in select.select([sys.stdin], [], [], 0)[0]: + line = sys.stdin.readline() + if line: + line = line.replace("\n", "") + if (line[0:10] == ":::setvar:"): + name, value = (line.split(" ", 1)[1]).replace("\"", "").split(":", 1) + if name in self.__module_entry_instance.vars: + self.__module_entry_instance.vars[name] = value + elif (line[0:11] == ":::setglob:"): + name, value = (line.split(" ", 1)[1]).replace("\"", "").split(":", 1) + if name == "brightness": + self.__strip.brightness = int(value) + elif name == "power_on": + self.__strip.power_on = value == "true" + else: + print(f"Unknown globvar \"{name}\"") + else: + if (time.perf_counter() - last_send) > 0.5: + _vars = "{" + for name, var in self.__module_entry_instance.vars: + _vars += f" \"{name}\" : {{ \"value\": \"{var.value}\", \"var_type\": \"{var.var_type}\" }}, " + if len(_vars) > 2: + _vars = _vars[0:-2] + _vars += "}" + + globvars = "{ \"power_on\": " + str(self.__strip.power_on).lower() + ", " + globvars += " \"brightness\":" + str(self.__strip.brightness) + " }" + print(f"{{ \":::data:\": {{ \"variables\": {_vars}, \"globvars\": {globvars} }} }}") + last_send = time.perf_counter() + + + + def __module_loop(self): + self.__module_entry_instance.on_start() + + self.__module_last_tick = time.perf_counter() + self.__module_last_second = time.perf_counter() + self.__module_last_minute = self.__module_last_second + self.__module_last_hour = self.__module_last_second + self.__module_last_day = self.__module_last_second + + while True: + c_time = time.perf_counter() + try: + self.__module_tick(c_time, c_time - self.__module_last_tick) + except Exception as e: + traceback.print_exc() + self.__module_last_tick = time.perf_counter() + + + def __module_tick(self, runningtime, deltatime): + self.__module_entry_instance.each_tick() + + if (runningtime - self.__module_last_second > 1): + self.__module_entry_instance.each_second() + self.__module_last_second = time.perf_counter() + + if (((runningtime - self.__module_last_minute) % 60) > 1): + self.__module_entry_instance.each_minute() + self.__module_last_minute = time.perf_counter() + + if (((runningtime - self.__module_last_hour) % 3600) > 1): + self.__module_entry_instance.each_minute() + self.__module_last_hour = time.perf_counter() + + if (((runningtime - self.__module_last_day) % 86400) > 1): + self.__module_entry_instance.each_minute() + self.__module_last_day = time.perf_counter() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('--strip-config', help='Path to the strip config file.') + parser.add_argument('--mode-path', help='Path of the folder the mode is in.') + parser.add_argument('--mode-entry', help='Path of the module that is the entry-point of the module.') + args = parser.parse_args() + + args.strip_config = args.strip_config.replace("\"", "") + args.mode_path = args.mode_path.replace("\"", "") + args.mode_entry = args.mode_entry.replace("\"", "") + if not path.exists(args.strip_config): + print(f"Strip config not found ({args.strip_config}).") + sys.exit(1) + if not path.exists(args.mode_path): + print(f"Mode path not found ({args.mode_path}).") + sys.exit(1) + if not path.exists(f"{args.mode_path}/{args.mode_entry}.py"): + print(f"Mode entry not found in mode path ({args.mode_path}/{args.mode_entry}).") + sys.exit(1) + + print(f"StripConfig: {args.strip_config}") + print(f"Module : {args.mode_path}/{args.mode_entry}") + + print(f"> Starting \"{args.mode_path}\" in NeoRuntime.") + runtime = NeoRuntime(args.mode_path, args.mode_entry, args.strip_config) + runtime.start() + print ("> NeoRuntime exited...") diff --git a/NeoRuntime/Runtime/requirements.txt b/NeoRuntime/Runtime/requirements.txt new file mode 100644 index 0000000..7a38911 --- /dev/null +++ b/NeoRuntime/Runtime/requirements.txt @@ -0,0 +1 @@ +websockets
\ No newline at end of file |