Simple inegration python script for xtoys (osr2 only)

First of all this is my first post and i haven’t done html so it’s not gonna be pretty but that’s not the point.
Having said that here i present you with a very simple python script that allows the osr2 to work with xtoys.

!!(Big disclaimer: this was only tested on the esp32 version of osr2)!!

import serial
import time
import asyncio
import websockets
import json

ser = serial.Serial()
ser.port = ‘COM4’
ser.baudrate = 115200
ser.open()

header = ‘\r\n’
max = 75
min = 20

async def write(websocket):
async for message in websocket:
parsed = json.loads(message)
pos = parsed[“position”]
if pos > max:
pos = max
if pos < min:
pos = min
pos = str(pos)
if len(pos) == 1:
mess = str(“L000” + pos + header)
ser.write(mess.encode())
elif len(pos) == 2:
mess = “L0” + pos + “0” + header
ser.write(mess.encode())
else:
mess == “L999” + header
ser.write(mess.encode())
print(mess)

async def main():
async with websockets.serve(write, “localhost”, 8765):
await asyncio.Future() # run forever

asyncio.run(main())

When it comes to setting this up, in xtoys you have to go to the custom toys section and create a speed + position stroker new toy with websockets, then in its setting name it whatever you want and change the websocket info to ws://localhost:8765 or whatever you set the port in the script.
I wrote this for virtual succubus and in this configuration it works quite well.

Also i wrote this script which follows your mouse movement when you right click and stops when you right click again, this one i wrote for slave matrix but any game with similar mechanics should work.

import win32api
import time
import serial
import pyautogui

state_right = win32api.GetKeyState(0x02)
header = ‘\r\n’

ser = serial.Serial()
ser.port = ‘COM4’
ser.baudrate = 115200
ser.open()

max = 750
min = 60

while True:
b = win32api.GetKeyState(0x02)

if b != state_right: 
    state_right = b
    if b < 0:
        print('Right Button Pressed')
        while True:
            b = win32api.GetKeyState(0x02)

            if b != state_right:  
                state_right = b
                if b > 0:
                    print('Right Button Released')
                    break

            x, y = pyautogui.position()
            if y > max:
                y = max
            if y < min:
                y = min
            y=str(y)
            print(y)            
            if len(y) == 1:
                mess = str("L000" + y + header)
                ser.write(mess.encode())
            elif len(y) == 2:
                mess = str("L00" + y + header)
                ser.write(mess.encode())
            elif len(y) == 3:
                mess = str("L0" + y + header)
                ser.write(mess.encode())            
time.sleep(0.001)

Also for both toys you have to set the appropriate serial port in script. And remember for this to work you have to have installed python3 and the appropriate libraries which are serial, websockets and pyautogui. And also if your esp32 is not showing up then you need to install these drivers: CP210x USB to UART Bridge VCP Drivers - Silicon Labs.

Here is a link for google drive with these files so that you don’t have to copy and paste to a file since the formatting may be a bit screwed up: OSR2 - Google Drive

4 Likes

Looks like the formatting on your code got messed up. It won’t run when copy-pasted.
I tried to fix it myself (with no knowledge of how python works) but whenever I try to connect with XToys it throws me following error:

connection handler failed
Traceback (most recent call last):
  File "D:\Users\X\AppData\Local\Programs\Python\Python311\Lib\site-packages\websockets\legacy\server.py", line 236, in handler
    await self.ws_handler(self)
  File "<stdin>", line 4, in write
KeyError: 'position'

And here’s how I formatted the code:

import serial
import time
import asyncio
import websockets
import json

ser = serial.Serial()
ser.port = 'COM3'
ser.baudrate = 115200
ser.open()

header = '\r\n'
max = 75
min = 20

async def write(websocket):
    async for message in websocket:
        parsed = json.loads(message)
        pos = parsed["position"]
        if pos > max:
            pos = max
        if pos < min:
            pos = min
        pos = str(pos)
        if len(pos) == 1:
            mess = str("L000" + pos + header)
            ser.write(mess.encode())
        elif len(pos) == 2:
            mess = "L0" + pos + "0" + header
            ser.write(mess.encode())
        else:
            mess == "L999" + header
            ser.write(mess.encode())
            print(mess)

async def main():
    async with websockets.serve(write, "localhost", 8765):
        await asyncio.Future() # run forever

asyncio.run(main())

Did I mess it up, or might the error come from the fact I’m using a Romeo BLE mini instead of an ESP32? (or something else…)

1 Like

Thanks! awesome work!

Tested it on OSR2 with Romeo BLE mini

Also @Rag3Quid seems like this script only work with “position” and not speed, you can change the mode at the bottom, then connect and see :slight_smile:

Edit: relized i had made an indentation error in the code i posted ops

2 Likes

Out of curiosity how well did this work with the BLE mini as i only tested on the esp32.

Like magsoul said you are probably trying to connect in the speed + position mode when this only works in position mode, try to change it in the xtoys dashboard and reconnect. If that doesn’t work then download the files from the gdrive link and try again. And if that doesn’t work then post the error you are getting.

1 Like

Thanks for the help!
Switching to position mode did get rid of that error, but I still can’t get it to work…

Opening the XToys.py you provided directly makes python show up for a split second and close immediately.
Running the module throgh IDLE errors out with

Traceback (most recent call last):
  File "E:\Filepath\OSR2 Python script\Xtoys.py", line 8, in <module>
    ser = serial.Serial()
AttributeError: module 'serial' has no attribute 'Serial'

(I do have the serial library installed, so that’s weird…)
And finally, copying the code and pasting it manually in the Python console fails with this message, either when connecting or changing the position in XToys:

connection handler failed
Traceback (most recent call last):
  File "D:\Users\X\AppData\Local\Programs\Python\Python311\Lib\site-packages\websockets\legacy\server.py", line 236, in handler
    await self.ws_handler(self)
  File "<stdin>", line 15, in write
NameError: name 'ser' is not defined
1 Like

Both of the errors imply that the serial library is not installed, and i always launched the script by opening power shell and running it with the py.exe so try that also launch this only with python 3 as that was the version this was written and tested in. If those doesn’t work ensure that you have the libraries install with pip3 and not pip.

1 Like

Hm… I must be doing something wrong…
So I reinstalled all required modules with pip3. When I do “pip3 list” in power shell, they show up:

Package     Version
----------- -------
future      0.18.2
iso8601     1.1.0
MouseInfo   0.1.3
pip         22.3.1
PyAutoGUI   0.9.53
PyGetWindow 0.0.9
PyMsgBox    1.0.9
pyperclip   1.8.2
PyRect      0.2.0
PyScreeze   0.1.28
pytweening  1.0.4
PyYAML      6.0
serial      0.0.97
setuptools  65.5.0
websockets  10.4

But then when I do “py Xtoys.py” or “python Xtoys.py” (excuted in the folder the script is saved in) the same error pops up as before.

Traceback (most recent call last):
  File "E:\Filepath\Xtoys.py", line 8, in <module>
    ser = serial.Serial()
          ^^^^^^^^^^^^^
AttributeError: module 'serial' has no attribute 'Serial'

Running power shell with admin rights doesn’t help.
I’m running Python 3.11.0 64bit on Win10.
Any ideas? :confused:

1 Like

I only use linux but usually you can do pip install Serial
or is it pip install pyserial

something like that. should get it going for ya :slight_smile:

1 Like

tested this script with Virtual succubus but couldnt get it to work that well.

Not sure if i setup my layout correctly tho

It’s pyserial!!
I only did “pip install serial” originally. With pyserial installed it works now. Thank you!

2 Likes

So i took the liberty to use this script to learn python (so im an amateur)
But I expanded this script so “Speed” mode works

two things that are not perfect:
not perfect time calculation but it works.
the sub thread that runs the speed code doesnt stop unless you switch mode or put speed at 0 or ofc if you exit the script.
Tried to write something that would stop the while loop if you disconnected from xtoys but have not figured that out yet.

tested it with Virtual Succubus and now that game works much better with OSR2 (tested on linux and with Romeo BLE mini chip on my OSR2) have also tested the script on windows to make sure it works.

import serial
import time
import asyncio
import websockets
import json
import threading

#Com Port Settings
ser = serial.Serial()
ser.port = 'COM3'
ser.baudrate = 115200
ser.open()

#Declare global variables
speed = None
upper = None
lower = None
mode = None
header = '\r\n'

def speed_background():
    mess = None
    while True:
        upperloc = upper
        lowerloc = lower
        speedloc = speed
        msspeed = int((speedloc*1000)/2)

        if speed != 0:
            timedelay = (((upperloc - lowerloc) * 0.1 ) / speedloc) / 6
        else:
            timedelay = 1

        if timedelay > 0.16 and timedelay < 0.25:
            timedelay = 0.25
        elif timedelay < 0.16:
            timedelay = 0.15

        if upperloc == 100:
            upperloc = 999

        #Up
        if len(str(upperloc)) == 2 and speedloc > 0:
            mess = "L0" + str(upperloc) + "0S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())
        elif len(str(upperloc)) == 3 and speedloc > 0:
            mess = "L0" + str(upperloc) + "S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())

        time.sleep(timedelay)
        
        #Down
        if len(str(lowerloc)) == 1 and speedloc > 0:
            mess = "L000" + str(lowerloc) + "S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())
        elif len(str(lowerloc)) == 2 and speedloc > 0:
            mess = "L0" + str(lowerloc) + "0S" + str(msspeed) + header
            print("message: " + mess)
            ser.write(mess.encode())

        time.sleep(timedelay)

        while mode == "position":
            time.sleep(1)

async def write(websocket):
    async for message in websocket:
        parsed = json.loads(message)
        global mode, speed, upper, lower
        mode = parsed["mode"]
        threadrunning = b.is_alive()
        if mode == "position":
            pos = parsed["position"]
            pos = str(pos)
            if len(pos) == 1:
                mess = str("L00" + pos + "0" + header)
                ser.write(mess.encode())
            elif len(pos) == 2:
                mess = "L0" + pos + "0" + header
                ser.write(mess.encode())
            elif len(pos) == 3:
                mess = "L0999" + header
                ser.write(mess.encode())
                print(mess)
        elif mode == "speed":

            speed = int(parsed["speed"])
            upper = parsed["upper"]
            lower = parsed["lower"]

            speed = speed / 10
            
            if threadrunning == False:
                b.start()         

async def main():
    async with websockets.serve(write, "localhost", 8765):
        await asyncio.Future() # run forever       


b = threading.Thread(name='speed_background', target=speed_background)

asyncio.run(main())
3 Likes

Awesome, happy to see some much needed improvements

1 Like

I know I’m rather late to this topic, but thanks a ton for all the work you guys have put in. Now because of all of this I have a basic grasp on how to use Python. fun lil language

2 Likes

Sorry to bother, not sure if this thread is really active too much. But whenever I try to set limits in the #Declare glovbal variable section it gets ignored and maxes out on it’s positions anyway? Am I missing something simple?

Hey, sorry for late reply here.
but the variables in #declare isnt meant to be modified.

what you do is you set the limits in xtoys
image

just to clarify there are no variables (except COM port) you need to edit in the python script, just run the script and setup a custom toy in xtoys with “Stroke or Thrust Toy (speed + position)”

I made further changes to the script (although been a while now since i touched that script, but it works) I basically added multiaxis

import serial
import time
import asyncio
import websockets
import json
import threading
import sys

#Com Port Settings
try:
    ser = serial.Serial()
    ser.port = '/dev/ttyACM0'
    ser.baudrate = 115200
    ser.open()
except:
    print("Error: Could not connect to", ser.port, "at baudrate", ser.baudrate)
    sys.exit(1)

#Declare global variables
speed = upper = lower = whileloop = None
websocket_port = 8765

#function for sending serial message, messtype=L0/R1/R2, pos= Position coordinate, mod= mode if its speed or position
def send_message(messtype, pos, decSpeed, mod):
    position_formats = { #Dictionary lookup for what type of format to use
        1: "0" + pos + "0",
        2: pos + "0",
        3: "999"
    }
    pos_formated = position_formats.get(len(pos))
    header = '\r\n'

    if mod in ("positionLR", "positionS", "positionBF"):
        mess = messtype + pos_formated + header
    elif mod == "speedS": 
        mess = messtype + pos_formated + "S" + decSpeed + header
    try:
        ser.write(mess.encode())
    except:
        print("Error - Unable to send message, check serial connection")

def calculate_speed(calc_upper, calc_lower):
    calc_speed = speed / 10
    msspeed = int((calc_speed*1000)/2)

    if calc_speed != 0: #Calculate time delay between action
        timedelay = (((calc_upper - calc_lower) * 0.1 ) / calc_speed) / 6
    else:
        timedelay = 1

    if timedelay > 0.16 and timedelay < 0.25:
        timedelay = 0.25
    elif timedelay < 0.16:
        timedelay = 0.15
    return timedelay, msspeed, calc_speed

#function loop for speed position
def speed_background():
    while True:
        mode = "speedS"
        upperloc = upper
        lowerloc = lower

        timedelay = calculate_speed(upperloc, lowerloc)

        if timedelay[2] > 0 and whileloop not in ("wait", "stop"): #Up
            send_message("L0", str(upperloc), str(timedelay[1]), str(mode))

        while whileloop in ("wait", "stop"): #set thread to sleep if websocket disconnect or if we switch mode
            time.sleep(1)

        time.sleep(timedelay[0])

        timedelay = calculate_speed(upperloc, lowerloc)

        if timedelay[2] > 0 and whileloop not in ("wait", "stop"): #Down
            send_message("L0", str(lowerloc), str(timedelay[1]), str(mode))

        time.sleep(timedelay[0])

async def write(websocket):
    try:
        async for message in websocket:
            parsed = json.loads(message)
            global speed, upper, lower, whileloop
            mode = parsed["mode"] + parsed["id"]
            id = parsed["id"]
            threadrunning = speedthread.is_alive()
            if mode == "positionS":
                speed = 0
                pos = str(parsed["position"])
                whileloop = "wait"
                send_message("L0", pos, "NA", mode)
            elif mode == "positionLR":
                pos = str(parsed["position"])
                send_message("R1", pos, "NA", mode)
            elif mode == "positionBF":
                pos = str(parsed["position"])
                send_message("R2", pos, "NA", mode)
            elif mode == "speedS":
                speed = int(parsed["speed"])
                upper = parsed["upper"]
                lower = parsed["lower"]
                whileloop = "ok"

                if threadrunning == False:
                    speedthread.start()
            else:
                print("Error - Speed mode is not supported on this axis, please change mode to Position. Only Up/down axis supports speed mode.")
    except:
        print("Disconnect:", id, "Stopping actions")
        if id == "S":
            whileloop = "stop"

async def main():

    async with websockets.serve(write, "localhost", websocket_port):
        print("Websocket at port", websocket_port, "is ready to recieve connections!")
        await asyncio.Future() # run forever    

speedthread = threading.Thread(name='speed_background', target=speed_background)

asyncio.run(main())

with this script you setup 3 custom toys in Xtoys with the following settings:

Name: OSR2-Up/down
Identifier: S
Type: “Stroke or Thrust toy (Speed + Position)”
Intensity Steps: 100
Websocket info: ws://localhost:8765

Name: OSR2-Left/Right
Identifier: LR
Type: “Stroke or Thrust toy (Speed + Position)”
Intensity Steps: 100
Websocket info: ws://localhost:8765

Name: OSR2-Back/forward
Identifier: BF
Type: “Stroke or Thrust toy (Speed + Position)”
Intensity Steps: 100
Websocket info: ws://localhost:8765

with this script back/forward and Left/right only supports “Position mode”
Add all 3 toys to your layout, should look like this:

I have tested it on Linux and Windows with my OSR2+ with romeo BLE.
Not sure how useful the multiaxis support will be but at least its there. with some creativity can get it working nicely with whatever script you try (or make) :smiley:
Script probably could be improved further, but one servo on my OSR2+ broke (waiting for replacement)

3 Likes

Wow, I didn’t actually expect a reply from this thread, I guess I forgot to delete my question. The Multi axis aspect isn’t something I ever expected to see on Xtoys, but could add confusion for randoms on there. AYVA remote has pretty much replaced Xtoys for OSR2 users, much safer and can’t be messed up. Thank you for the contribution. Hopefully this shows Mumble that there is a community for serial based T-code on Xtoys!

I guess i need to check out AYVA, have heard people talk about it but have not tested it myself or know much about it

2 Likes

Check it out! I’m sure Soritesparadox would enjoy the publicity

2 Likes