Simple inegration python script for xtoys (osr2 only)

So first, you’ll need to open CMD (press windows key, type cmd, press enter) and run 3 commands, one after another to install the dependencies:
pip install pyserial
pip install websockets
and finally
pip install pyautogui
(Python should come with pip, but if it says that “pip” isn’t a recognized command, I’d recommend reinstalling Python)

Once that’s done (you can close that window), you make a text file in which you paste the code.
There’s a specific line that you might need to modify (at least I had to), so the script finds your OSR2. Depending on which script on here you’re going for that would be:

ser.port = '/dev/ttyACM0'

or

ser.port = 'COM3'

For that, you need to know what USB port your OSR2 is connected to. I found mine by running Multi Fun Player, which usually auto detects and shows the port.
At the moment, I’m using COM6 for example, so I had to change the line to:

ser.port = 'COM6'

Finally, you save the file with a .py extension. (“script.py” for example)

And then, in theory, double clicking the file should open a CMD window, which stays open. The latest script on here should also read: Websocket at port 8765 is ready to recieve connections!.

I haven’t touched XToys in a good while, so I can’t really help you with setting up a custom toy atm, but the script at least should be working like that.

thank you for the reply. It is not in fact recognizing pip as a command so im reinstalling but i had another question. As this is all entirely new to me it might be a dumb question. rather than the osr2 will this also work with the FUNSR1. it shows as FUNOSR in my devices.

That, I’ve got no clue on.
I don’t know nearly enough about coding to know if this is is sending semi universal commands, or something specific to the OSR2, nor do I have an SSR1/FunSR1 to check myself.
All I can say is that, since it all accepts funscript and the software was made by the same guy, I think there’s a decent chance it would work. (Maybe…?)

If anyone more knowledgeable would like to chime in, please feel free ^^

I don’t have an SSR1 in front of me to test, but spending a minute in the arduino code… it looks like the SSR1 does in fact respond to TCode commands on L0, so yes, this OUGHT to just work out of the box. @TheSilver100

thanks for the help so far. Iv gotten as far as getting python working and setting up the script, but when i open it it just closes a split second later. Im unsure where I went wrong. The script should stay open right?

@TheSilver100
Pretty sure it should, yeah…
My best guess is that you didn’t set the correct port. At least that’s how I can get it to crash, even with all dependencies installed.

I had chatGPT modify the code to stay open and print an error message instead of instantly closing. If it works correctly, running this should tell you what went wrong:

import serial
import time
import asyncio
import websockets
import json
import threading
import sys

# Function to check for missing dependencies
def check_dependencies():
    try:
        import serial
        import websockets
        import pyautogui
    except ImportError as e:
        missing_lib = str(e).split("'")[1]  # Extract the library name
        print(f"Error: The library '{missing_lib}' is missing. Please install it using pip.")
        input("Press Enter to exit...")
        sys.exit(1)

# Check if required dependencies are installed
check_dependencies()

# Com Port Settings
try:
    ser = serial.Serial()
    ser.port = 'COM6'  # Update with your correct port if necessary
    ser.baudrate = 115200
    ser.open()
except serial.SerialException as e:
    print(f"Error: Could not connect to {ser.port} at baudrate {ser.baudrate}.")
    print("Details:", e)
    input("Press Enter to exit...")
    sys.exit(1)

# Declare global variables
speed = upper = lower = whileloop = None
websocket_port = 8765

# Function for sending serial message
def send_message(messtype, pos, decSpeed, mod):
    position_formats = {  # Dictionary lookup for what type of format to use
        1: "0" + pos + "0",
        2: pos + "0",
        3: "999"
    }
    pos_formated = position_formats.get(len(pos))
    header = '\r\n'

    if mod in ("positionLR", "positionS", "positionBF"):
        mess = messtype + pos_formated + header
    elif mod == "speedS":
        mess = messtype + pos_formated + "S" + decSpeed + header
    try:
        ser.write(mess.encode())
    except:
        print("Error - Unable to send message, check serial connection.")

def calculate_speed(calc_upper, calc_lower):
    calc_speed = speed / 10
    msspeed = int((calc_speed*1000)/2)

    if calc_speed != 0:  # Calculate time delay between action
        timedelay = (((calc_upper - calc_lower) * 0.1) / calc_speed) / 6
    else:
        timedelay = 1

    if timedelay > 0.16 and timedelay < 0.25:
        timedelay = 0.25
    elif timedelay < 0.16:
        timedelay = 0.15
    return timedelay, msspeed, calc_speed

# Function loop for speed position
def speed_background():
    while True:
        mode = "speedS"
        upperloc = upper
        lowerloc = lower

        timedelay = calculate_speed(upperloc, lowerloc)

        if timedelay[2] > 0 and whileloop not in ("wait", "stop"):  # Up
            send_message("L0", str(upperloc), str(timedelay[1]), str(mode))

        while whileloop in ("wait", "stop"):  # Set thread to sleep if websocket disconnect or mode switch
            time.sleep(1)

        time.sleep(timedelay[0])

        timedelay = calculate_speed(upperloc, lowerloc)

        if timedelay[2] > 0 and whileloop not in ("wait", "stop"):  # Down
            send_message("L0", str(lowerloc), str(timedelay[1]), str(mode))

        time.sleep(timedelay[0])

# Async function for handling incoming websocket messages
async def write(websocket):
    try:
        async for message in websocket:
            parsed = json.loads(message)
            global speed, upper, lower, whileloop
            mode = parsed["mode"] + parsed["id"]
            id = parsed["id"]
            threadrunning = speedthread.is_alive()
            if mode == "positionS":
                speed = 0
                pos = str(parsed["position"])
                whileloop = "wait"
                send_message("L0", pos, "NA", mode)
            elif mode == "positionLR":
                pos = str(parsed["position"])
                send_message("R1", pos, "NA", mode)
            elif mode == "positionBF":
                pos = str(parsed["position"])
                send_message("R2", pos, "NA", mode)
            elif mode == "speedS":
                speed = int(parsed["speed"])
                upper = parsed["upper"]
                lower = parsed["lower"]
                whileloop = "ok"

                if threadrunning == False:
                    speedthread.start()
            else:
                print("Error - Speed mode is not supported on this axis, please change mode to Position. Only Up/down axis supports speed mode.")
    except Exception as e:
        print(f"Error: Disconnect or exception in processing message. Details: {e}")
        if id == "S":
            whileloop = "stop"

# Main websocket server function
async def main():
    try:
        async with websockets.serve(write, "localhost", websocket_port):
            print("Websocket at port", websocket_port, "is ready to receive connections!")
            await asyncio.Future()  # Run forever
    except Exception as e:
        print(f"Error: Failed to start websocket server. Details: {e}")
        input("Press Enter to exit...")
        sys.exit(1)

speedthread = threading.Thread(name='speed_background', target=speed_background)

# Run the main asyncio function
try:
    asyncio.run(main())
except Exception as e:
    print(f"Error: Unhandled exception. Details: {e}")
    input("Press Enter to exit...")
    sys.exit(1)

I believe it’ll have messages for missing dependecies, wrong port or issues with the websocket. (Though my guess is, it’s one of the first two).

One mistake I made when I originally tried this out was installing “serial” instead of “pyserial”. If you did too, and it’s saying there’s an issue with it, I’d suggest running both pip uninstall pyserial and pip uninstall serial, before installing only “pyserial” again.

Oh, and don’t forget to set the port in this one too. Again, line that reads ser.port = 'COM6'; replace the COM6 with whatever port you have your device on.

thanks for all the help so far, but after fighting with this thing and having to jump through so many hoops to get it working iv decided im just gonna bite the bullet and by the more expensive handy while its on sale. It might be a bit less powerful motor wise but its plug and play for the most part, no pulling my hair out and googling things to get it to work.