@TheSilver100
Pretty sure it should, yeah…
My best guess is that you didn’t set the correct port. At least that’s how I can get it to crash, even with all dependencies installed.
I had chatGPT modify the code to stay open and print an error message instead of instantly closing. If it works correctly, running this should tell you what went wrong:
import serial
import time
import asyncio
import websockets
import json
import threading
import sys
# Function to check for missing dependencies
def check_dependencies():
try:
import serial
import websockets
import pyautogui
except ImportError as e:
missing_lib = str(e).split("'")[1] # Extract the library name
print(f"Error: The library '{missing_lib}' is missing. Please install it using pip.")
input("Press Enter to exit...")
sys.exit(1)
# Check if required dependencies are installed
check_dependencies()
# Com Port Settings
try:
ser = serial.Serial()
ser.port = 'COM6' # Update with your correct port if necessary
ser.baudrate = 115200
ser.open()
except serial.SerialException as e:
print(f"Error: Could not connect to {ser.port} at baudrate {ser.baudrate}.")
print("Details:", e)
input("Press Enter to exit...")
sys.exit(1)
# Declare global variables
speed = upper = lower = whileloop = None
websocket_port = 8765
# Function for sending serial message
def send_message(messtype, pos, decSpeed, mod):
position_formats = { # Dictionary lookup for what type of format to use
1: "0" + pos + "0",
2: pos + "0",
3: "999"
}
pos_formated = position_formats.get(len(pos))
header = '\r\n'
if mod in ("positionLR", "positionS", "positionBF"):
mess = messtype + pos_formated + header
elif mod == "speedS":
mess = messtype + pos_formated + "S" + decSpeed + header
try:
ser.write(mess.encode())
except:
print("Error - Unable to send message, check serial connection.")
def calculate_speed(calc_upper, calc_lower):
calc_speed = speed / 10
msspeed = int((calc_speed*1000)/2)
if calc_speed != 0: # Calculate time delay between action
timedelay = (((calc_upper - calc_lower) * 0.1) / calc_speed) / 6
else:
timedelay = 1
if timedelay > 0.16 and timedelay < 0.25:
timedelay = 0.25
elif timedelay < 0.16:
timedelay = 0.15
return timedelay, msspeed, calc_speed
# Function loop for speed position
def speed_background():
while True:
mode = "speedS"
upperloc = upper
lowerloc = lower
timedelay = calculate_speed(upperloc, lowerloc)
if timedelay[2] > 0 and whileloop not in ("wait", "stop"): # Up
send_message("L0", str(upperloc), str(timedelay[1]), str(mode))
while whileloop in ("wait", "stop"): # Set thread to sleep if websocket disconnect or mode switch
time.sleep(1)
time.sleep(timedelay[0])
timedelay = calculate_speed(upperloc, lowerloc)
if timedelay[2] > 0 and whileloop not in ("wait", "stop"): # Down
send_message("L0", str(lowerloc), str(timedelay[1]), str(mode))
time.sleep(timedelay[0])
# Async function for handling incoming websocket messages
async def write(websocket):
try:
async for message in websocket:
parsed = json.loads(message)
global speed, upper, lower, whileloop
mode = parsed["mode"] + parsed["id"]
id = parsed["id"]
threadrunning = speedthread.is_alive()
if mode == "positionS":
speed = 0
pos = str(parsed["position"])
whileloop = "wait"
send_message("L0", pos, "NA", mode)
elif mode == "positionLR":
pos = str(parsed["position"])
send_message("R1", pos, "NA", mode)
elif mode == "positionBF":
pos = str(parsed["position"])
send_message("R2", pos, "NA", mode)
elif mode == "speedS":
speed = int(parsed["speed"])
upper = parsed["upper"]
lower = parsed["lower"]
whileloop = "ok"
if threadrunning == False:
speedthread.start()
else:
print("Error - Speed mode is not supported on this axis, please change mode to Position. Only Up/down axis supports speed mode.")
except Exception as e:
print(f"Error: Disconnect or exception in processing message. Details: {e}")
if id == "S":
whileloop = "stop"
# Main websocket server function
async def main():
try:
async with websockets.serve(write, "localhost", websocket_port):
print("Websocket at port", websocket_port, "is ready to receive connections!")
await asyncio.Future() # Run forever
except Exception as e:
print(f"Error: Failed to start websocket server. Details: {e}")
input("Press Enter to exit...")
sys.exit(1)
speedthread = threading.Thread(name='speed_background', target=speed_background)
# Run the main asyncio function
try:
asyncio.run(main())
except Exception as e:
print(f"Error: Unhandled exception. Details: {e}")
input("Press Enter to exit...")
sys.exit(1)
I believe it’ll have messages for missing dependecies, wrong port or issues with the websocket. (Though my guess is, it’s one of the first two).
One mistake I made when I originally tried this out was installing “serial” instead of “pyserial”. If you did too, and it’s saying there’s an issue with it, I’d suggest running both pip uninstall pyserial
and pip uninstall serial
, before installing only “pyserial” again.
Oh, and don’t forget to set the port in this one too. Again, line that reads ser.port = 'COM6'
; replace the COM6
with whatever port you have your device on.