diff options
| -rw-r--r-- | README.md | 29 | ||||
| -rw-r--r-- | microscope.py | 333 | ||||
| -rw-r--r-- | webserver.py | 142 |
3 files changed, 504 insertions, 0 deletions
diff --git a/README.md b/README.md new file mode 100644 index 0000000..8c2ad11 --- /dev/null +++ b/README.md @@ -0,0 +1,29 @@ +Demo pinout + +this example simply does use random pins without regarding pin types at all + ++----+-----+--------+--------+-----+----+ +| | Pin | Name | Name | Pin | | ++----+-----+--------+--------+-----+----+ +| | 1 | 3.3v | 5v | 2 | | +| x1 | 3 | SDA | 5v | 4 | | +| x2 | 5 | SCL | GND | 6 | | +| x3 | 7 | G4 | TXO | 8 | | +| | 9 | GND | RXI | 10 | | +| x4 | 11 | G17 | G18 | 12 | | +| y1 | 13 | G27 | GND | 14 | | +| y2 | 15 | G22 | G23 | 16 | | +| | 17 | 3.3v | G24 | 18 | | +| y3 | 19 | MOSI | GND | 20 | | +| y4 | 21 | MISO | G25 | 22 | | +| | 23 | CLK | CD0 | 24 | | +| | 25 | GND | CE1 | 26 | | +| | 27 | IDSD | IDSC | 28 | | +| i1 | 29 | G05 | GND | 30 | | +| i2 | 31 | G6 | G12 | 32 | | +| i3 | 33 | G13 | GND | 34 | | +| i4 | 35 | G19 | G16 | 36 | | +| | 37 | G26 | G20 | 38 | s2 | +| | 39 | GND | G21 | 40 | s1 | ++----+-----+--------+--------+-----+----+ + diff --git a/microscope.py b/microscope.py new file mode 100644 index 0000000..a251f90 --- /dev/null +++ b/microscope.py @@ -0,0 +1,333 @@ +import threading, queue, time +from dataclasses import dataclass +from enum import Enum + +try: + import RPi.GPIO as GPIO +except ImportError: + print("WARNING: RPi.GPIO NOT AVAILABLE, USING Mock.GPIO") + import Mock.GPIO as GPIO + +halfstep_seq = [ + [1,0,0,0], [1,1,0,0], [0,1,0,0], [0,1,1,0], + [0,0,1,0], [0,0,1,1], [0,0,0,1], [1,0,0,1] +] +wholestep_seq = [ + [1,1,0,0], [0,1,1,0], [0,0,1,1], [1,0,0,1], +] + +COMMANDS = { + "quit": 1, + "cancel": 2, + "home": 3, + "record": 4, + "replay": 5, + "checkpoint": 6, + "toggle_halfstep": 7, + "toggle_debugtiming": 8, + "center": 9, + "target": 10, +} + +def dprint(msg): + print(f"[DEBUG:microscope] {msg}") + +@dataclass +class InputSwitch: + pin: int + nc: bool = True + _state: bool = False + + def state(self): + return (self._state) if self.nc else (not self._state) + + def update(self): + self._state = GPIO.input(self.pin) + + def __post_init__(self): + GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) + + +@dataclass +class InputSignal: + """ + Placeholder, used to simulate input signals, + _may_ be combined at some point to allow multiple types of input + """ + _state: bool = False + + def state(self): + return (self._state) + +@dataclass +class Stepper: + pins: list[int] + upper_limit: InputSwitch + idx: int = 0 + pos: int = 70_808 + home: int = 70_808 + lower: int = 0 + upper: int = 141_000 + #interval: float = 0.0008 + interval: float = 0.002 + #interval: float = 0.8 # Debug timing + half_step: bool = True + step_count: int = 4096 + inverted_axis: bool = True + + def __post_init__(self): + for pin in self.pins: + GPIO.setup(pin, GPIO.OUT) + GPIO.output(pin, GPIO.LOW) + + def cleanup(self): + for pin in self.pins: + GPIO.output(pin, GPIO.LOW) + + +@dataclass +class StepperDegrees: + pins: list[int] + degrees: bool = False + idx: int = 0 + pos: int = 0 + interval: float = 0.0008 + half_step: bool = False + + def __post_init__(self): + for pin in self.pins: + #GPIO.setup(pin, GPIO.OUT) + #GPIO.output(pin, GPIO.LOW) + pass + + def cleanup(self): + for pin in self.pins: + #GPIO.output(pin, GPIO.LOW) + pass + + +class Operation(Enum): + MANUAL = 1 + HOMING = 2 + CENTER = 3 + RECORD = 4 + REPLAY = 5 + TARGET = 6 + CLEANUP = 7 + DISMANTLE = 8 + + +@dataclass +class Stage: + x: Stepper + y: Stepper + image: StepperDegrees + signal_home: InputSignal + signal_cancel: InputSignal + signal_x_inc: InputSignal + signal_x_dec: InputSignal + signal_y_inc: InputSignal + signal_y_dec: InputSignal + signal_img_1: InputSignal + signal_img_2: InputSignal + signal_img_3: InputSignal + signal_img_4: InputSignal + signal_img_5: InputSignal + signal_img_6: InputSignal + operation: Operation = Operation.MANUAL + route: list[tuple] | None = None + route_i: int = 0 + time_remaining: float = 0 + + +def stepper_step(stepper: Stepper, steps, _direction): + if stepper.inverted_axis: + direction = _direction ^ 1 + if stepper.upper_limit.state() and direction > 0: + return + + if stepper.half_step: + stepper.idx = (stepper.idx + (1 if direction > 0 else 7)) & 7 + for i in range(4): + GPIO.output(stepper.pins[i], halfstep_seq[stepper.idx][i]) + stepper.pos += 1 if _direction > 0 else -1 + else: + stepper.idx = (stepper.idx + (1 if direction > 0 else 3)) & 3 + for i in range(4): + GPIO.output(stepper.pins[i], wholestep_seq[stepper.idx][i]) + stepper.pos += (2 * _direction) + +def steppers_xy(x_stepper: Stepper, y_stepper: Stepper, x: int, y: int): + if x_stepper.pos != x: + stepper_step(x_stepper, 1, 1 if x > x_stepper.pos else 0) + if y_stepper.pos != y: + stepper_step(y_stepper, 1, 1 if y > y_stepper.pos else 0) + time.sleep(x_stepper.interval) + +def steppers_dismantle(x_stepper: Stepper, y_stepper: Stepper): + while x_stepper.pos > 0 and y_stepper.pos > 0: + if not x_stepper.upper_limit.state(): + stepper_step(x_stepper, 1, 0) + if not y_stepper.upper_limit.state(): + stepper_step(y_stepper, 1, 0) + time.sleep(x_stepper.interval) + +def steppers_home(stage: Stage): + """ + This will move each stepper to limit switch, and not reverse the switches. + Remember to move to center again. + """ + if not stage.x.upper_limit.state(): + stepper_step(stage.x, 1, 0) + if not stage.y.upper_limit.state(): + stepper_step(stage.y, 1, 0) + time.sleep(stage.x.interval) + +def microscope_init(): + #GPIO.setmode(GPIO.BCM) + GPIO.setmode(GPIO.BOARD) + dprint("Setmode") + return Stage( + x=Stepper(pins=[3,5,7,11], upper_limit=InputSwitch(40), inverted_axis=True), + y=Stepper(pins=[13,15,19,21], upper_limit=InputSwitch(38), inverted_axis=True), + image=StepperDegrees(pins=[29,31,33,35]), + signal_home=InputSignal(), + signal_cancel=InputSignal(), + signal_x_inc=InputSignal(), + signal_x_dec=InputSignal(), + signal_y_inc=InputSignal(), + signal_y_dec=InputSignal(), + signal_img_1=InputSignal(), + signal_img_2=InputSignal(), + signal_img_3=InputSignal(), + signal_img_4=InputSignal(), + signal_img_5=InputSignal(), + signal_img_6=InputSignal(), + ) + + +def microscope_cleanup(stage: Stage): + dprint("Initiating cleanup") + stage.operation = Operation.CLEANUP + stage.x.cleanup() + stage.y.cleanup() + stage.image.cleanup() + GPIO.cleanup() + dprint("Cleanup done") + + +def process_arrows(stage: Stage): + if stage.signal_x_inc.state(): + stepper_step(stage.x, 1, 1) + if stage.signal_x_dec.state(): + stepper_step(stage.x, 1, 0) + if stage.signal_y_inc.state(): + stepper_step(stage.y, 1, 1) + if stage.signal_y_dec.state(): + stepper_step(stage.y, 1, 0) + + +def microscope_fsm(cmd_queue, state): + dprint("Beginning fsm") + while True: + try: + cmdline = cmd_queue.get(timeout=0.00001) + cmd = cmdline[0] + args = cmdline[1:] + dprint(f"received command {cmd} {args}") + except queue.Empty: + cmd = None + + if cmd == COMMANDS['quit']: + break + if cmd == COMMANDS['cancel']: + state.operation = Operation.MANUAL + cmd_queue.queue.clear() + if cmd == COMMANDS['toggle_halfstep']: + state.x.half_step = not state.x.half_step + state.y.half_step = not state.y.half_step + if cmd == COMMANDS['toggle_debugtiming']: + if state.x.interval == 1: + state.x.interval = 0.0008 + state.y.interval = 0.0008 + else: + state.x.interval = 1 + state.y.interval = 1 + + state.x.upper_limit.update() + state.y.upper_limit.update() + + if state.operation == Operation.MANUAL: + if cmd == COMMANDS['home']: + state.operation = Operation.HOMING + continue + if cmd == COMMANDS['center']: + state.operation = Operation.CENTER + continue + if cmd == COMMANDS['record']: + state.operation = Operation.RECORD + state.route = [] + continue + if cmd == COMMANDS['replay']: + state.operation = Operation.REPLAY + state.route_i = 0 + continue + if cmd == COMMANDS['target']: + state.operation = Operation.TARGET + state.route_i = 0 + state.route = [(args[0], args[1])] + continue + process_arrows(state) + time.sleep(state.x.interval) + + elif state.operation == Operation.HOMING: + steppers_home(state) + if state.x.upper_limit.state() and state.y.upper_limit.state(): + state.x.pos = state.x.lower + state.y.pos = state.y.lower + state.operation = Operation.CENTER + + elif state.operation == Operation.TARGET: + x, y = state.route[0] + steppers_xy(state.x, state.y, x, y) + state.time_remaining = ( + max( + abs(state.x.pos - x), + abs(state.y.pos - y) + ) + * state.x.interval + ) + if state.x.pos == x and state.y.pos == y: + state.operation = Operation.MANUAL + + elif state.operation == Operation.CENTER: + steppers_xy(state.x, state.y, state.x.home, state.y.home) + state.time_remaining = ( + max( + abs(state.x.pos - state.x.home), + abs(state.y.pos - state.y.home) + ) + * state.x.interval + ) + if (state.x.pos == state.x.home) and (state.y.pos == state.y.pos): + state.operation = Operation.MANUAL + + elif state.operation == Operation.RECORD: + if cmd == COMMANDS['checkpoint']: + state.route.append((state.x.pos, state.y.pos)) + process_arrows(state) + time.sleep(0.002) + + elif state.operation == Operation.REPLAY: + if state.route_i >= len(state.route): + state.operation = Operation.MANUAL + state.route_i = 0 + continue + + pt = state.route[state.route_i] + + for pt in state['route']: + state['x'], state['y'], state['rot'] = pt + time.sleep(0.5) + state['mode'] = 'idle' + diff --git a/webserver.py b/webserver.py new file mode 100644 index 0000000..81c8475 --- /dev/null +++ b/webserver.py @@ -0,0 +1,142 @@ +import curses, threading, queue, time +from microscope import microscope_fsm, microscope_init, Operation, microscope_cleanup, COMMANDS +import http.server +import socketserver +import threading +import json +import sys +import urllib.parse + +def dprint(msg): + print(f"[DEBUG:webserver] {msg}") + +class MyRequestHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + parsed_path = urllib.parse.urlparse(self.path) + path = parsed_path.path + query_params = urllib.parse.parse_qs(parsed_path.query) + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', '*') + self.end_headers() + + response_data = {} + if path == '/state': + response_data = { + "operation": self.server.stage.operation.name, + "x": { + "pos": self.server.stage.x.pos, + "home": self.server.stage.x.home, + "upper": self.server.stage.x.upper, + "lower": self.server.stage.x.lower, + "limit": self.server.stage.x.upper_limit.state(), + }, + "y": { + "pos": self.server.stage.y.pos, + "home": self.server.stage.y.home, + "upper": self.server.stage.y.upper, + "lower": self.server.stage.y.lower, + "limit": self.server.stage.y.upper_limit.state(), + }, + "image": { + "pos": self.server.stage.image.pos, + }, + "time_remaining": self.server.stage.time_remaining, + } + else: + self.send_response(404) + self.end_headers() + self.wfile.write(json.dumps({'error': 'Not Found'}).encode()) + return + + self.wfile.write(json.dumps(response_data).encode()) + + def do_PUT(self): + parsed_path = urllib.parse.urlparse(self.path) + path = parsed_path.path + query_params = urllib.parse.parse_qs(parsed_path.query) + dprint(query_params) + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', '*') + self.end_headers() + + if path == '/cancel': + self.server.cmd_queue.put((COMMANDS["cancel"],)) + elif path == '/home': + self.server.cmd_queue.put((COMMANDS["home"],)) + elif path == '/center': + self.server.cmd_queue.put((COMMANDS["center"],)) + elif path == '/toggle_halfstep': + self.server.cmd_queue.put((COMMANDS["toggle_halfstep"],)) + elif path == '/toggle_debugtiming': + self.server.cmd_queue.put((COMMANDS["toggle_debugtiming"],)) + elif path == '/target': + self.server.cmd_queue.put(( + COMMANDS["target"], + int(query_params["x"][0]), + int(query_params["y"][0]) + )) + elif path == '/set_signal': + signal = query_params.get('signal', [''])[0] + signal_value = query_params.get('value', [''])[0] + dprint(f"set_signal {signal} {bool(int(signal_value))}") + if signal == "xInc": self.server.stage.signal_x_inc._state = bool(int(signal_value)) + if signal == "xDec": self.server.stage.signal_x_dec._state = bool(int(signal_value)) + if signal == "yInc": self.server.stage.signal_y_inc._state = bool(int(signal_value)) + if signal == "yDec": self.server.stage.signal_y_dec._state = bool(int(signal_value)) + else: + self.send_response(404) + self.end_headers() + self.wfile.write(json.dumps({'error': 'Not Found'}).encode()) + return + + def do_OPTIONS(self): + parsed_path = urllib.parse.urlparse(self.path) + path = parsed_path.path + query_params = urllib.parse.parse_qs(parsed_path.query) + + self.send_response(200) + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', '*') + self.end_headers() + +def run_server(stage, cmd_queue): + PORT = 8000 + httpd = socketserver.TCPServer(("", PORT), MyRequestHandler) + dprint(f"Serving at port {PORT}") + httpd.daemon_threads = True + httpd.stage = stage + httpd.cmd_queue = cmd_queue + return httpd + +if __name__ == "__main__": + stage = microscope_init() + cmd_queue = queue.Queue() + + fsm = threading.Thread(target=microscope_fsm, args=(cmd_queue, stage)) + fsm.daemon = True + fsm.start() + + web_server = run_server(stage, cmd_queue) + try: + web_server.serve_forever() + except KeyboardInterrupt: + dprint("Keyboard Interrupt received, shutting down server gracefully...") + finally: + dprint("Shutting down webserver") + web_server.shutdown() + dprint("Closing webserver") + web_server.server_close() + dprint("Closing FSM") + cmd_queue.put((COMMANDS['quit'],)) + dprint("Wait for FSM to exit") + fsm.join() + dprint("Cleanup GPIO") + microscope_cleanup(stage) + dprint("Exit") + sys.exit(0) |
