import threading, queue, time from dataclasses import dataclass from enum import Enum try: import RPi.GPIO as GPIO except ImportError: print("WARNING: RPi.GPIO NOT AVAILABLE, USING Mock.GPIO") import Mock.GPIO as GPIO halfstep_seq = [ [1,0,0,0], [1,1,0,0], [0,1,0,0], [0,1,1,0], [0,0,1,0], [0,0,1,1], [0,0,0,1], [1,0,0,1] ] wholestep_seq = [ [1,1,0,0], [0,1,1,0], [0,0,1,1], [1,0,0,1], ] COMMANDS = { "quit": 1, "cancel": 2, "home": 3, "record": 4, "replay": 5, "checkpoint": 6, "toggle_halfstep": 7, "toggle_debugtiming": 8, "center": 9, "target": 10, } def dprint(msg): print(f"[DEBUG:microscope] {msg}") @dataclass class InputSwitch: pin: int nc: bool = True _state: bool = False def state(self): return (self._state) if self.nc else (not self._state) def update(self): self._state = GPIO.input(self.pin) def __post_init__(self): GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) @dataclass class InputSignal: """ Placeholder, used to simulate input signals, _may_ be combined at some point to allow multiple types of input """ _state: bool = False def state(self): return (self._state) @dataclass class Stepper: pins: list[int] upper_limit: InputSwitch idx: int = 0 pos: int = 70_808 home: int = 70_808 lower: int = 0 upper: int = 141_000 #interval: float = 0.0008 interval: float = 0.002 #interval: float = 0.8 # Debug timing half_step: bool = True step_count: int = 4096 inverted_axis: bool = True def __post_init__(self): for pin in self.pins: GPIO.setup(pin, GPIO.OUT) GPIO.output(pin, GPIO.LOW) def cleanup(self): for pin in self.pins: GPIO.output(pin, GPIO.LOW) @dataclass class StepperDegrees: pins: list[int] degrees: bool = False idx: int = 0 pos: int = 0 interval: float = 0.0008 half_step: bool = False def __post_init__(self): for pin in self.pins: #GPIO.setup(pin, GPIO.OUT) #GPIO.output(pin, GPIO.LOW) pass def cleanup(self): for pin in self.pins: #GPIO.output(pin, GPIO.LOW) pass class Operation(Enum): MANUAL = 1 HOMING = 2 CENTER = 3 RECORD = 4 REPLAY = 5 TARGET = 6 CLEANUP = 7 DISMANTLE = 8 @dataclass class Stage: x: Stepper y: Stepper image: StepperDegrees signal_home: InputSignal signal_cancel: InputSignal signal_x_inc: InputSignal signal_x_dec: InputSignal signal_y_inc: InputSignal signal_y_dec: InputSignal signal_img_1: InputSignal signal_img_2: InputSignal signal_img_3: InputSignal signal_img_4: InputSignal signal_img_5: InputSignal signal_img_6: InputSignal operation: Operation = Operation.MANUAL route: list[tuple] | None = None route_i: int = 0 time_remaining: float = 0 def stepper_step(stepper: Stepper, steps, _direction): if stepper.inverted_axis: direction = _direction ^ 1 if stepper.upper_limit.state() and direction > 0: return if stepper.half_step: stepper.idx = (stepper.idx + (1 if direction > 0 else 7)) & 7 for i in range(4): GPIO.output(stepper.pins[i], halfstep_seq[stepper.idx][i]) stepper.pos += 1 if _direction > 0 else -1 else: stepper.idx = (stepper.idx + (1 if direction > 0 else 3)) & 3 for i in range(4): GPIO.output(stepper.pins[i], wholestep_seq[stepper.idx][i]) stepper.pos += (2 * _direction) def steppers_xy(x_stepper: Stepper, y_stepper: Stepper, x: int, y: int): if x_stepper.pos != x: stepper_step(x_stepper, 1, 1 if x > x_stepper.pos else 0) if y_stepper.pos != y: stepper_step(y_stepper, 1, 1 if y > y_stepper.pos else 0) time.sleep(x_stepper.interval) def steppers_dismantle(x_stepper: Stepper, y_stepper: Stepper): while x_stepper.pos > 0 and y_stepper.pos > 0: if not x_stepper.upper_limit.state(): stepper_step(x_stepper, 1, 0) if not y_stepper.upper_limit.state(): stepper_step(y_stepper, 1, 0) time.sleep(x_stepper.interval) def steppers_home(stage: Stage): """ This will move each stepper to limit switch, and not reverse the switches. Remember to move to center again. """ if not stage.x.upper_limit.state(): stepper_step(stage.x, 1, 0) if not stage.y.upper_limit.state(): stepper_step(stage.y, 1, 0) time.sleep(stage.x.interval) def microscope_init(): #GPIO.setmode(GPIO.BCM) GPIO.setmode(GPIO.BOARD) dprint("Setmode") return Stage( x=Stepper(pins=[3,5,7,11], upper_limit=InputSwitch(40), inverted_axis=True), y=Stepper(pins=[13,15,19,21], upper_limit=InputSwitch(38), inverted_axis=True), image=StepperDegrees(pins=[29,31,33,35]), signal_home=InputSignal(), signal_cancel=InputSignal(), signal_x_inc=InputSignal(), signal_x_dec=InputSignal(), signal_y_inc=InputSignal(), signal_y_dec=InputSignal(), signal_img_1=InputSignal(), signal_img_2=InputSignal(), signal_img_3=InputSignal(), signal_img_4=InputSignal(), signal_img_5=InputSignal(), signal_img_6=InputSignal(), ) def microscope_cleanup(stage: Stage): dprint("Initiating cleanup") stage.operation = Operation.CLEANUP stage.x.cleanup() stage.y.cleanup() stage.image.cleanup() GPIO.cleanup() dprint("Cleanup done") def process_arrows(stage: Stage): if stage.signal_x_inc.state(): stepper_step(stage.x, 1, 1) if stage.signal_x_dec.state(): stepper_step(stage.x, 1, 0) if stage.signal_y_inc.state(): stepper_step(stage.y, 1, 1) if stage.signal_y_dec.state(): stepper_step(stage.y, 1, 0) def microscope_fsm(cmd_queue, state): dprint("Beginning fsm") while True: try: cmdline = cmd_queue.get(timeout=0.00001) cmd = cmdline[0] args = cmdline[1:] dprint(f"received command {cmd} {args}") except queue.Empty: cmd = None if cmd == COMMANDS['quit']: break if cmd == COMMANDS['cancel']: state.operation = Operation.MANUAL cmd_queue.queue.clear() if cmd == COMMANDS['toggle_halfstep']: state.x.half_step = not state.x.half_step state.y.half_step = not state.y.half_step if cmd == COMMANDS['toggle_debugtiming']: if state.x.interval == 1: state.x.interval = 0.0008 state.y.interval = 0.0008 else: state.x.interval = 1 state.y.interval = 1 state.x.upper_limit.update() state.y.upper_limit.update() if state.operation == Operation.MANUAL: if cmd == COMMANDS['home']: state.operation = Operation.HOMING continue if cmd == COMMANDS['center']: state.operation = Operation.CENTER continue if cmd == COMMANDS['record']: state.operation = Operation.RECORD state.route = [] continue if cmd == COMMANDS['replay']: state.operation = Operation.REPLAY state.route_i = 0 continue if cmd == COMMANDS['target']: state.operation = Operation.TARGET state.route_i = 0 state.route = [(args[0], args[1])] continue process_arrows(state) time.sleep(state.x.interval) elif state.operation == Operation.HOMING: steppers_home(state) if state.x.upper_limit.state() and state.y.upper_limit.state(): state.x.pos = state.x.lower state.y.pos = state.y.lower state.operation = Operation.CENTER elif state.operation == Operation.TARGET: x, y = state.route[0] steppers_xy(state.x, state.y, x, y) state.time_remaining = ( max( abs(state.x.pos - x), abs(state.y.pos - y) ) * state.x.interval ) if state.x.pos == x and state.y.pos == y: state.operation = Operation.MANUAL elif state.operation == Operation.CENTER: steppers_xy(state.x, state.y, state.x.home, state.y.home) state.time_remaining = ( max( abs(state.x.pos - state.x.home), abs(state.y.pos - state.y.home) ) * state.x.interval ) if (state.x.pos == state.x.home) and (state.y.pos == state.y.pos): state.operation = Operation.MANUAL elif state.operation == Operation.RECORD: if cmd == COMMANDS['checkpoint']: state.route.append((state.x.pos, state.y.pos)) process_arrows(state) time.sleep(0.002) elif state.operation == Operation.REPLAY: if state.route_i >= len(state.route): state.operation = Operation.MANUAL state.route_i = 0 continue pt = state.route[state.route_i] for pt in state['route']: state['x'], state['y'], state['rot'] = pt time.sleep(0.5) state['mode'] = 'idle'