aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjakobst1n <jakob@jakobstendahl.no>2025-12-01 20:45:09 +0100
committerjakobst1n <jakob@jakobstendahl.no>2025-12-01 20:45:09 +0100
commit8822f73d4eebb90a20bb54def27ceb47a9d693d6 (patch)
treee960bdbe9f22d086d771ac58b4ebe07a237b3698
downloadLINSC-8822f73d4eebb90a20bb54def27ceb47a9d693d6.tar.gz
LINSC-8822f73d4eebb90a20bb54def27ceb47a9d693d6.zip
Initial commit
-rw-r--r--README.md29
-rw-r--r--microscope.py333
-rw-r--r--webserver.py142
3 files changed, 504 insertions, 0 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8c2ad11
--- /dev/null
+++ b/README.md
@@ -0,0 +1,29 @@
+Demo pinout
+
+this example simply does use random pins without regarding pin types at all
+
++----+-----+--------+--------+-----+----+
+| | Pin | Name | Name | Pin | |
++----+-----+--------+--------+-----+----+
+| | 1 | 3.3v | 5v | 2 | |
+| x1 | 3 | SDA | 5v | 4 | |
+| x2 | 5 | SCL | GND | 6 | |
+| x3 | 7 | G4 | TXO | 8 | |
+| | 9 | GND | RXI | 10 | |
+| x4 | 11 | G17 | G18 | 12 | |
+| y1 | 13 | G27 | GND | 14 | |
+| y2 | 15 | G22 | G23 | 16 | |
+| | 17 | 3.3v | G24 | 18 | |
+| y3 | 19 | MOSI | GND | 20 | |
+| y4 | 21 | MISO | G25 | 22 | |
+| | 23 | CLK | CD0 | 24 | |
+| | 25 | GND | CE1 | 26 | |
+| | 27 | IDSD | IDSC | 28 | |
+| i1 | 29 | G05 | GND | 30 | |
+| i2 | 31 | G6 | G12 | 32 | |
+| i3 | 33 | G13 | GND | 34 | |
+| i4 | 35 | G19 | G16 | 36 | |
+| | 37 | G26 | G20 | 38 | s2 |
+| | 39 | GND | G21 | 40 | s1 |
++----+-----+--------+--------+-----+----+
+
diff --git a/microscope.py b/microscope.py
new file mode 100644
index 0000000..a251f90
--- /dev/null
+++ b/microscope.py
@@ -0,0 +1,333 @@
+import threading, queue, time
+from dataclasses import dataclass
+from enum import Enum
+
+try:
+ import RPi.GPIO as GPIO
+except ImportError:
+ print("WARNING: RPi.GPIO NOT AVAILABLE, USING Mock.GPIO")
+ import Mock.GPIO as GPIO
+
+halfstep_seq = [
+ [1,0,0,0], [1,1,0,0], [0,1,0,0], [0,1,1,0],
+ [0,0,1,0], [0,0,1,1], [0,0,0,1], [1,0,0,1]
+]
+wholestep_seq = [
+ [1,1,0,0], [0,1,1,0], [0,0,1,1], [1,0,0,1],
+]
+
+COMMANDS = {
+ "quit": 1,
+ "cancel": 2,
+ "home": 3,
+ "record": 4,
+ "replay": 5,
+ "checkpoint": 6,
+ "toggle_halfstep": 7,
+ "toggle_debugtiming": 8,
+ "center": 9,
+ "target": 10,
+}
+
+def dprint(msg):
+ print(f"[DEBUG:microscope] {msg}")
+
+@dataclass
+class InputSwitch:
+ pin: int
+ nc: bool = True
+ _state: bool = False
+
+ def state(self):
+ return (self._state) if self.nc else (not self._state)
+
+ def update(self):
+ self._state = GPIO.input(self.pin)
+
+ def __post_init__(self):
+ GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
+
+
+@dataclass
+class InputSignal:
+ """
+ Placeholder, used to simulate input signals,
+ _may_ be combined at some point to allow multiple types of input
+ """
+ _state: bool = False
+
+ def state(self):
+ return (self._state)
+
+@dataclass
+class Stepper:
+ pins: list[int]
+ upper_limit: InputSwitch
+ idx: int = 0
+ pos: int = 70_808
+ home: int = 70_808
+ lower: int = 0
+ upper: int = 141_000
+ #interval: float = 0.0008
+ interval: float = 0.002
+ #interval: float = 0.8 # Debug timing
+ half_step: bool = True
+ step_count: int = 4096
+ inverted_axis: bool = True
+
+ def __post_init__(self):
+ for pin in self.pins:
+ GPIO.setup(pin, GPIO.OUT)
+ GPIO.output(pin, GPIO.LOW)
+
+ def cleanup(self):
+ for pin in self.pins:
+ GPIO.output(pin, GPIO.LOW)
+
+
+@dataclass
+class StepperDegrees:
+ pins: list[int]
+ degrees: bool = False
+ idx: int = 0
+ pos: int = 0
+ interval: float = 0.0008
+ half_step: bool = False
+
+ def __post_init__(self):
+ for pin in self.pins:
+ #GPIO.setup(pin, GPIO.OUT)
+ #GPIO.output(pin, GPIO.LOW)
+ pass
+
+ def cleanup(self):
+ for pin in self.pins:
+ #GPIO.output(pin, GPIO.LOW)
+ pass
+
+
+class Operation(Enum):
+ MANUAL = 1
+ HOMING = 2
+ CENTER = 3
+ RECORD = 4
+ REPLAY = 5
+ TARGET = 6
+ CLEANUP = 7
+ DISMANTLE = 8
+
+
+@dataclass
+class Stage:
+ x: Stepper
+ y: Stepper
+ image: StepperDegrees
+ signal_home: InputSignal
+ signal_cancel: InputSignal
+ signal_x_inc: InputSignal
+ signal_x_dec: InputSignal
+ signal_y_inc: InputSignal
+ signal_y_dec: InputSignal
+ signal_img_1: InputSignal
+ signal_img_2: InputSignal
+ signal_img_3: InputSignal
+ signal_img_4: InputSignal
+ signal_img_5: InputSignal
+ signal_img_6: InputSignal
+ operation: Operation = Operation.MANUAL
+ route: list[tuple] | None = None
+ route_i: int = 0
+ time_remaining: float = 0
+
+
+def stepper_step(stepper: Stepper, steps, _direction):
+ if stepper.inverted_axis:
+ direction = _direction ^ 1
+ if stepper.upper_limit.state() and direction > 0:
+ return
+
+ if stepper.half_step:
+ stepper.idx = (stepper.idx + (1 if direction > 0 else 7)) & 7
+ for i in range(4):
+ GPIO.output(stepper.pins[i], halfstep_seq[stepper.idx][i])
+ stepper.pos += 1 if _direction > 0 else -1
+ else:
+ stepper.idx = (stepper.idx + (1 if direction > 0 else 3)) & 3
+ for i in range(4):
+ GPIO.output(stepper.pins[i], wholestep_seq[stepper.idx][i])
+ stepper.pos += (2 * _direction)
+
+def steppers_xy(x_stepper: Stepper, y_stepper: Stepper, x: int, y: int):
+ if x_stepper.pos != x:
+ stepper_step(x_stepper, 1, 1 if x > x_stepper.pos else 0)
+ if y_stepper.pos != y:
+ stepper_step(y_stepper, 1, 1 if y > y_stepper.pos else 0)
+ time.sleep(x_stepper.interval)
+
+def steppers_dismantle(x_stepper: Stepper, y_stepper: Stepper):
+ while x_stepper.pos > 0 and y_stepper.pos > 0:
+ if not x_stepper.upper_limit.state():
+ stepper_step(x_stepper, 1, 0)
+ if not y_stepper.upper_limit.state():
+ stepper_step(y_stepper, 1, 0)
+ time.sleep(x_stepper.interval)
+
+def steppers_home(stage: Stage):
+ """
+ This will move each stepper to limit switch, and not reverse the switches.
+ Remember to move to center again.
+ """
+ if not stage.x.upper_limit.state():
+ stepper_step(stage.x, 1, 0)
+ if not stage.y.upper_limit.state():
+ stepper_step(stage.y, 1, 0)
+ time.sleep(stage.x.interval)
+
+def microscope_init():
+ #GPIO.setmode(GPIO.BCM)
+ GPIO.setmode(GPIO.BOARD)
+ dprint("Setmode")
+ return Stage(
+ x=Stepper(pins=[3,5,7,11], upper_limit=InputSwitch(40), inverted_axis=True),
+ y=Stepper(pins=[13,15,19,21], upper_limit=InputSwitch(38), inverted_axis=True),
+ image=StepperDegrees(pins=[29,31,33,35]),
+ signal_home=InputSignal(),
+ signal_cancel=InputSignal(),
+ signal_x_inc=InputSignal(),
+ signal_x_dec=InputSignal(),
+ signal_y_inc=InputSignal(),
+ signal_y_dec=InputSignal(),
+ signal_img_1=InputSignal(),
+ signal_img_2=InputSignal(),
+ signal_img_3=InputSignal(),
+ signal_img_4=InputSignal(),
+ signal_img_5=InputSignal(),
+ signal_img_6=InputSignal(),
+ )
+
+
+def microscope_cleanup(stage: Stage):
+ dprint("Initiating cleanup")
+ stage.operation = Operation.CLEANUP
+ stage.x.cleanup()
+ stage.y.cleanup()
+ stage.image.cleanup()
+ GPIO.cleanup()
+ dprint("Cleanup done")
+
+
+def process_arrows(stage: Stage):
+ if stage.signal_x_inc.state():
+ stepper_step(stage.x, 1, 1)
+ if stage.signal_x_dec.state():
+ stepper_step(stage.x, 1, 0)
+ if stage.signal_y_inc.state():
+ stepper_step(stage.y, 1, 1)
+ if stage.signal_y_dec.state():
+ stepper_step(stage.y, 1, 0)
+
+
+def microscope_fsm(cmd_queue, state):
+ dprint("Beginning fsm")
+ while True:
+ try:
+ cmdline = cmd_queue.get(timeout=0.00001)
+ cmd = cmdline[0]
+ args = cmdline[1:]
+ dprint(f"received command {cmd} {args}")
+ except queue.Empty:
+ cmd = None
+
+ if cmd == COMMANDS['quit']:
+ break
+ if cmd == COMMANDS['cancel']:
+ state.operation = Operation.MANUAL
+ cmd_queue.queue.clear()
+ if cmd == COMMANDS['toggle_halfstep']:
+ state.x.half_step = not state.x.half_step
+ state.y.half_step = not state.y.half_step
+ if cmd == COMMANDS['toggle_debugtiming']:
+ if state.x.interval == 1:
+ state.x.interval = 0.0008
+ state.y.interval = 0.0008
+ else:
+ state.x.interval = 1
+ state.y.interval = 1
+
+ state.x.upper_limit.update()
+ state.y.upper_limit.update()
+
+ if state.operation == Operation.MANUAL:
+ if cmd == COMMANDS['home']:
+ state.operation = Operation.HOMING
+ continue
+ if cmd == COMMANDS['center']:
+ state.operation = Operation.CENTER
+ continue
+ if cmd == COMMANDS['record']:
+ state.operation = Operation.RECORD
+ state.route = []
+ continue
+ if cmd == COMMANDS['replay']:
+ state.operation = Operation.REPLAY
+ state.route_i = 0
+ continue
+ if cmd == COMMANDS['target']:
+ state.operation = Operation.TARGET
+ state.route_i = 0
+ state.route = [(args[0], args[1])]
+ continue
+ process_arrows(state)
+ time.sleep(state.x.interval)
+
+ elif state.operation == Operation.HOMING:
+ steppers_home(state)
+ if state.x.upper_limit.state() and state.y.upper_limit.state():
+ state.x.pos = state.x.lower
+ state.y.pos = state.y.lower
+ state.operation = Operation.CENTER
+
+ elif state.operation == Operation.TARGET:
+ x, y = state.route[0]
+ steppers_xy(state.x, state.y, x, y)
+ state.time_remaining = (
+ max(
+ abs(state.x.pos - x),
+ abs(state.y.pos - y)
+ )
+ * state.x.interval
+ )
+ if state.x.pos == x and state.y.pos == y:
+ state.operation = Operation.MANUAL
+
+ elif state.operation == Operation.CENTER:
+ steppers_xy(state.x, state.y, state.x.home, state.y.home)
+ state.time_remaining = (
+ max(
+ abs(state.x.pos - state.x.home),
+ abs(state.y.pos - state.y.home)
+ )
+ * state.x.interval
+ )
+ if (state.x.pos == state.x.home) and (state.y.pos == state.y.pos):
+ state.operation = Operation.MANUAL
+
+ elif state.operation == Operation.RECORD:
+ if cmd == COMMANDS['checkpoint']:
+ state.route.append((state.x.pos, state.y.pos))
+ process_arrows(state)
+ time.sleep(0.002)
+
+ elif state.operation == Operation.REPLAY:
+ if state.route_i >= len(state.route):
+ state.operation = Operation.MANUAL
+ state.route_i = 0
+ continue
+
+ pt = state.route[state.route_i]
+
+ for pt in state['route']:
+ state['x'], state['y'], state['rot'] = pt
+ time.sleep(0.5)
+ state['mode'] = 'idle'
+
diff --git a/webserver.py b/webserver.py
new file mode 100644
index 0000000..81c8475
--- /dev/null
+++ b/webserver.py
@@ -0,0 +1,142 @@
+import curses, threading, queue, time
+from microscope import microscope_fsm, microscope_init, Operation, microscope_cleanup, COMMANDS
+import http.server
+import socketserver
+import threading
+import json
+import sys
+import urllib.parse
+
+def dprint(msg):
+ print(f"[DEBUG:webserver] {msg}")
+
+class MyRequestHandler(http.server.BaseHTTPRequestHandler):
+ def do_GET(self):
+ parsed_path = urllib.parse.urlparse(self.path)
+ path = parsed_path.path
+ query_params = urllib.parse.parse_qs(parsed_path.query)
+
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.send_header('Access-Control-Allow-Origin', '*')
+ self.send_header('Access-Control-Allow-Methods', '*')
+ self.end_headers()
+
+ response_data = {}
+ if path == '/state':
+ response_data = {
+ "operation": self.server.stage.operation.name,
+ "x": {
+ "pos": self.server.stage.x.pos,
+ "home": self.server.stage.x.home,
+ "upper": self.server.stage.x.upper,
+ "lower": self.server.stage.x.lower,
+ "limit": self.server.stage.x.upper_limit.state(),
+ },
+ "y": {
+ "pos": self.server.stage.y.pos,
+ "home": self.server.stage.y.home,
+ "upper": self.server.stage.y.upper,
+ "lower": self.server.stage.y.lower,
+ "limit": self.server.stage.y.upper_limit.state(),
+ },
+ "image": {
+ "pos": self.server.stage.image.pos,
+ },
+ "time_remaining": self.server.stage.time_remaining,
+ }
+ else:
+ self.send_response(404)
+ self.end_headers()
+ self.wfile.write(json.dumps({'error': 'Not Found'}).encode())
+ return
+
+ self.wfile.write(json.dumps(response_data).encode())
+
+ def do_PUT(self):
+ parsed_path = urllib.parse.urlparse(self.path)
+ path = parsed_path.path
+ query_params = urllib.parse.parse_qs(parsed_path.query)
+ dprint(query_params)
+
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.send_header('Access-Control-Allow-Origin', '*')
+ self.send_header('Access-Control-Allow-Methods', '*')
+ self.end_headers()
+
+ if path == '/cancel':
+ self.server.cmd_queue.put((COMMANDS["cancel"],))
+ elif path == '/home':
+ self.server.cmd_queue.put((COMMANDS["home"],))
+ elif path == '/center':
+ self.server.cmd_queue.put((COMMANDS["center"],))
+ elif path == '/toggle_halfstep':
+ self.server.cmd_queue.put((COMMANDS["toggle_halfstep"],))
+ elif path == '/toggle_debugtiming':
+ self.server.cmd_queue.put((COMMANDS["toggle_debugtiming"],))
+ elif path == '/target':
+ self.server.cmd_queue.put((
+ COMMANDS["target"],
+ int(query_params["x"][0]),
+ int(query_params["y"][0])
+ ))
+ elif path == '/set_signal':
+ signal = query_params.get('signal', [''])[0]
+ signal_value = query_params.get('value', [''])[0]
+ dprint(f"set_signal {signal} {bool(int(signal_value))}")
+ if signal == "xInc": self.server.stage.signal_x_inc._state = bool(int(signal_value))
+ if signal == "xDec": self.server.stage.signal_x_dec._state = bool(int(signal_value))
+ if signal == "yInc": self.server.stage.signal_y_inc._state = bool(int(signal_value))
+ if signal == "yDec": self.server.stage.signal_y_dec._state = bool(int(signal_value))
+ else:
+ self.send_response(404)
+ self.end_headers()
+ self.wfile.write(json.dumps({'error': 'Not Found'}).encode())
+ return
+
+ def do_OPTIONS(self):
+ parsed_path = urllib.parse.urlparse(self.path)
+ path = parsed_path.path
+ query_params = urllib.parse.parse_qs(parsed_path.query)
+
+ self.send_response(200)
+ self.send_header('Access-Control-Allow-Origin', '*')
+ self.send_header('Access-Control-Allow-Methods', '*')
+ self.end_headers()
+
+def run_server(stage, cmd_queue):
+ PORT = 8000
+ httpd = socketserver.TCPServer(("", PORT), MyRequestHandler)
+ dprint(f"Serving at port {PORT}")
+ httpd.daemon_threads = True
+ httpd.stage = stage
+ httpd.cmd_queue = cmd_queue
+ return httpd
+
+if __name__ == "__main__":
+ stage = microscope_init()
+ cmd_queue = queue.Queue()
+
+ fsm = threading.Thread(target=microscope_fsm, args=(cmd_queue, stage))
+ fsm.daemon = True
+ fsm.start()
+
+ web_server = run_server(stage, cmd_queue)
+ try:
+ web_server.serve_forever()
+ except KeyboardInterrupt:
+ dprint("Keyboard Interrupt received, shutting down server gracefully...")
+ finally:
+ dprint("Shutting down webserver")
+ web_server.shutdown()
+ dprint("Closing webserver")
+ web_server.server_close()
+ dprint("Closing FSM")
+ cmd_queue.put((COMMANDS['quit'],))
+ dprint("Wait for FSM to exit")
+ fsm.join()
+ dprint("Cleanup GPIO")
+ microscope_cleanup(stage)
+ dprint("Exit")
+ sys.exit(0)