import curses, threading, queue, time from microscope import microscope_fsm, microscope_init, Operation, microscope_cleanup, COMMANDS import http.server import socketserver import threading import json import sys import urllib.parse def dprint(msg): print(f"[DEBUG:webserver] {msg}") class MyRequestHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): parsed_path = urllib.parse.urlparse(self.path) path = parsed_path.path query_params = urllib.parse.parse_qs(parsed_path.query) self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', '*') self.end_headers() response_data = {} if path == '/state': response_data = { "operation": self.server.stage.operation.name, "x": { "pos": self.server.stage.x.pos, "home": self.server.stage.x.home, "upper": self.server.stage.x.upper, "lower": self.server.stage.x.lower, "limit": self.server.stage.x.upper_limit.state(), }, "y": { "pos": self.server.stage.y.pos, "home": self.server.stage.y.home, "upper": self.server.stage.y.upper, "lower": self.server.stage.y.lower, "limit": self.server.stage.y.upper_limit.state(), }, "image": { "pos": self.server.stage.image.pos, }, "time_remaining": self.server.stage.time_remaining, } else: self.send_response(404) self.end_headers() self.wfile.write(json.dumps({'error': 'Not Found'}).encode()) return self.wfile.write(json.dumps(response_data).encode()) def do_PUT(self): parsed_path = urllib.parse.urlparse(self.path) path = parsed_path.path query_params = urllib.parse.parse_qs(parsed_path.query) dprint(query_params) self.send_response(200) self.send_header('Content-type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', '*') self.end_headers() if path == '/cancel': self.server.cmd_queue.put((COMMANDS["cancel"],)) elif path == '/home': self.server.cmd_queue.put((COMMANDS["home"],)) elif path == '/center': self.server.cmd_queue.put((COMMANDS["center"],)) elif path == '/toggle_halfstep': self.server.cmd_queue.put((COMMANDS["toggle_halfstep"],)) elif path == '/toggle_debugtiming': self.server.cmd_queue.put((COMMANDS["toggle_debugtiming"],)) elif path == '/target': self.server.cmd_queue.put(( COMMANDS["target"], int(query_params["x"][0]), int(query_params["y"][0]) )) elif path == '/set_signal': signal = query_params.get('signal', [''])[0] signal_value = query_params.get('value', [''])[0] dprint(f"set_signal {signal} {bool(int(signal_value))}") if signal == "xInc": self.server.stage.signal_x_inc._state = bool(int(signal_value)) if signal == "xDec": self.server.stage.signal_x_dec._state = bool(int(signal_value)) if signal == "yInc": self.server.stage.signal_y_inc._state = bool(int(signal_value)) if signal == "yDec": self.server.stage.signal_y_dec._state = bool(int(signal_value)) else: self.send_response(404) self.end_headers() self.wfile.write(json.dumps({'error': 'Not Found'}).encode()) return def do_OPTIONS(self): parsed_path = urllib.parse.urlparse(self.path) path = parsed_path.path query_params = urllib.parse.parse_qs(parsed_path.query) self.send_response(200) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', '*') self.end_headers() def run_server(stage, cmd_queue): PORT = 8000 httpd = socketserver.TCPServer(("", PORT), MyRequestHandler) dprint(f"Serving at port {PORT}") httpd.daemon_threads = True httpd.stage = stage httpd.cmd_queue = cmd_queue return httpd if __name__ == "__main__": stage = microscope_init() cmd_queue = queue.Queue() fsm = threading.Thread(target=microscope_fsm, args=(cmd_queue, stage)) fsm.daemon = True fsm.start() web_server = run_server(stage, cmd_queue) try: web_server.serve_forever() except KeyboardInterrupt: dprint("Keyboard Interrupt received, shutting down server gracefully...") finally: dprint("Shutting down webserver") web_server.shutdown() dprint("Closing webserver") web_server.server_close() dprint("Closing FSM") cmd_queue.put((COMMANDS['quit'],)) dprint("Wait for FSM to exit") fsm.join() dprint("Cleanup GPIO") microscope_cleanup(stage) dprint("Exit") sys.exit(0)