Sine Dancer - Minimalistic Audio Based Funscript Generation via Python

The script itself is short and includes all pertinent information so I am just going to quote it here:

#!/usr/bin/env python3

# Description
#-----------------------------
'''
Sine Dancer generates a Funscript by modulating a waveform
according to changes in audio properties. It can operate as
either a imported python module or independent script.

Consider it a light-weight alternative to the following:
    funscript_dancer    (@ncdxncdx - Github)
    python_dancer       (@NodudeWasTaken - Github)

Run this to view command line functionality:
    python sine_dancer.py -h

See below for
    - An overview of the methodology
    - Example usage as a python module
    - Source code (heavily commented for ease of comprehension)

Author Note:
    I do not have direct experience writing funscripts and expect there is
    much that could be improved upon here. Hence: 
        “To the extent possible under law, the author of this work hereby waives
        all copyright and related rights worldwide. This work is placed in the
        public domain. Anyone may copy, modify, distribute, perform, and display
        the work, without asking for permission.”
    All I ask is that you consider sharing your creations
    with others just as I have done for you.
'''

# Methodology
#-----------------------------
'''
Utilizes the librosa library to extract the following audio properties:
    1) Beat Based Onset Envelope (via PLP Estimation)
    2) Signal RMS
    3) Pitch Frequency
A waveform is then computed whereby:
    Amplitude   => Beat Based Onset Envelope
    Frequency   => Signal RMS
    Offset      => Pitch Frequency
This waveform data is then directly compiled into a funscript.

Audio properties are computed every frame where a frame is defined as:
    frame = sample_rate*2048//22050 [number of samples]
Hence the output funscript is sampled at about 10% of the audio's sample rate.
Depending on the usage case this may be excessive so a toggle called reduction
is provided which subsamples to only include local minimum & maximum.
Additional configuration options and toggles are also provided to allow the user
to fine tune the output.
'''

# Example Usage
#-----------------------------
'''
import sine_dancer

audio_file = 'files/test.mp3'
output_file = 'funscripts/test.funscript' 

# Result is saved directly to the filesystem when out is specified 
sine_dancer(audio_file,out=output_file) #'funscripts/test.funscript' now created/updated

# Result is also returned directly as a Funscript object when out is specified
result = sine_dancer(audio_file,out=output_file) #'funscripts/test.funscript' is still created/updated
print(f'time_ms: {result.x*1000}')
print(f'pos_percent: {result.y*100}')
os.remove(output_file) # this can be used to cleanup an unused output file

# Modified config settings can be provided via the config function argument
config = {
    chunk_size_s: 10,   # number of seconds in stream/chunk window
    f_min: 0.2,         # minimum movement speed [cycles/s]
    f_max: 5,           # maximum movement speed [cycles/s]
    A_bias: 0.2,        # envelope bias factor [0,10] (0:1=greater avg amplitude, 1.0=nominal, 1:10=lower avg amplitude)
    A_min: 0,           # envelope minimum amplitude [0,1]
    A_max: 1,           # envelope maximum amplitude [0,1]
    C_min: 0.3,         # minimum waveform offset [0,1]
    C_max: 0.7,         # maximum waveform offset [0,1]
    reduction: true,    # flag to reduce funscript to local extrema [bool]
    debug: false        # flat to also output extracted audio properties [bool]
}
sine_dancer(audio_file,out=output_file,config=config) # Note: any missing entries will revert to defaults

# A function can be provided to track/report execution progress 
def print_progress(x):
    print(f'Current Progress: {x}%')
sine_dancer(audio_file,out=output_file,progress_callback=print_progress)

# When out is absent or None a generator is returned that gives results per chunk of audio
config2 = {chunk_size_s: 2.0}
data_stream = sine_dancer(audio_file,config=config2)
for data in data_stream:
    print(f'time_s: {data[0]}')
    print(f'pos_norm: {data[1]}')
    # This could be leveraged for realtime applications like rotor/servo control
    # particularly if you set config.chunk_size_s to a small value
    period_length = data[0][-1]-data[0][0]
    avg_amplitude = data[1].mean()
    vibrate_rotor(durration=period_length,strength=avg_amplitude)
    # However the total compute delay for 1h of audio is <2 minutes
    # so in most case it may be better to just precompute

# When debug=true in config then beat,rms,pitch are also returned either as independant
# funscripts when out is specified or as data[2:4] when using the generator method
'''

# Source Code
#-----------------------------
import argparse,pathlib
import librosa
import numpy as np
from funscript import Funscript # credit (@diglet48 - Github)

def sine_dancer(target,out=None,config={},progress_callback=None):
    '''
    target = input audio file (mp3,wav) [string or pathlib.Path]
    out    = output funscript file [string or pathlib.Path]
    config = funscript waveform settings (see Load Config) [dict]
    progress_callback = function to handle progress reporting [function]
    
    returns:
        Funscript (if out != None) -> also saves result to filesystem based on out's value
        generator (if out == None)
            # generator computes the next chunk's worth of output data each time it is called
            data = generator()
            data[0] # funscript x data in seconds [numpy array]
            data[1] # normalized funscript y data [numpy array]
            data[2] # raw beat envelope data    (if debug = true) [numpy array]
            data[3] # raw rms data              (if debug = true) [numpy array]
            data[4] # raw pitch frequency data  (if debug = true) [numpy array]
    '''
    # Set Progress to Zero Initally
    if progress_callback:
        progress_callback(0.0)
    # Load Config
    chunk_size_s = config.get('chunk_size_s',10)    # number of seconds in stream/chunk window
    f_min = config.get('f_min',0.2)                 # minimum movement speed [cycles/s]
    f_max = config.get('f_max',5.0)                 # maximum movement speed [cycles/s]
    A_bias = config.get('A_bias',0.2)               # envelope bias factor [0,10] (0:1=greater avg amplitude, 1.0=nominal, 1:10=lower avg amplitude)
    A_min = config.get('A_min',0.0)                 # envelope minimum amplitude [0,1]
    A_max = config.get('A_max',1.0)                 # envelope maximum amplitude [0,1]
    C_min = config.get('C_min',0.3)                 # minimum waveform offset [0,1]
    C_max = config.get('C_max',0.7)                 # maximum waveform offset [0,1]
    reduction = config.get('reduction',False)       # flag to reduce funscript to local extrema [bool]
    debug = config.get('debug',False)               # flat to output auxilauxiliarylary funscripts alongside input media [bool]
    # Initialize Audio Data Stream
    sr = librosa.get_samplerate(pathlib.Path(target))
    duration = librosa.get_duration(path=pathlib.Path(target))
    frame_length = (2048 * sr) // 22050
    hop_length = (512 * sr) // 22050
    block_length=chunk_size_s*sr//hop_length
    stream = librosa.stream(
                    pathlib.Path(target),
                    mono=False,
                    block_length=block_length,
                    frame_length=frame_length,
                    hop_length=hop_length,
                    fill_value = 0
                )
    # Create Function to Iterate on Audio in Chunks Subdivided into Frames
    def chunk_operate():
        for i,y in enumerate(stream):
            # Gather Chunk Information
            n_chan = 1 if y.ndim == 1 else y.shape[0]
            # Compute Time [s] for each Frame
            start_t = librosa.blocks_to_time(i,block_length=block_length,hop_length=hop_length,sr=sr)
            frame_t = start_t+librosa.frames_to_time(np.arange(0,block_length),sr=sr,hop_length=hop_length)
            frame_freq = 1/(frame_t[1]-frame_t[0])
            # Magnitude Spectrum
            D = librosa.stft(y=y,n_fft=frame_length,hop_length=hop_length,center=False)
            # Onset Envelope
            onset_env = librosa.onset.onset_strength(S=np.abs(D),sr=sr,hop_length=hop_length,center=False)
            # PLP Beat Estimation
            beat_f = librosa.beat.plp(onset_envelope=onset_env,sr=sr,hop_length=hop_length)
            # RMS Detection
            rms_f = librosa.feature.rms(y=y,frame_length=frame_length,hop_length=hop_length,center=False)[:, 0, :]
            # Pitch Estimation
            pitch_f = librosa.yin(y=y,fmin=100,fmax=500,sr=sr,frame_length=frame_length,hop_length=hop_length,center=False)
            pitch_f = pitch_f - pitch_f.mean()
            # Perform Multi Channel Mixing (Averaging)
            if n_chan > 1:
                for j in range(1,n_chan):
                    beat_f[0] += beat_f[j]/n_chan
                    rms_f[0] += rms_f[j]/n_chan
                    pitch_f[0] += pitch_f[j]/n_chan
                beat_f = beat_f[0]
                rms_f = rms_f[0]
                pitch_f = pitch_f[0]
            # RMS & Pitch Filtering (moving average, window=20*hop_length)
            kernel = np.ones(20, dtype=float) / 20
            pitch_f = np.convolve(pitch_f,kernel,mode='same')
            rms_f = np.convolve(rms_f,kernel,mode='same')
            # Create Funscript Waveform
            A = np.interp(beat_f,(beat_f.min(),beat_f.max()),(A_min,A_max))**(A_bias)   # beat based envelope
            C = np.interp(pitch_f,(pitch_f.min(),pitch_f.max()),(C_min,C_max))          # pitch based offset
            f = np.interp(rms_f,(rms_f.min(),rms_f.max()),(f_min,f_max))                # rms based frequency
            f = np.mod(np.cumsum(2*np.pi*f/frame_freq),2*np.pi)                         # wrap phase vector
            amplitude = 0.5*A*np.sin(f) + C                                             # assemble waveform
            amplitude = np.clip(amplitude,0,1)                                          # normalize via clipping
            # Perform Reduction
            if reduction:
                sign = np.sign(np.diff(amplitude, prepend=amplitude[0]))            # sign of every point diff
                dir_changed = np.where(sign != np.append(sign[0], sign[:-1]))[0]    # indexes where sign changed
                amplitude = amplitude[dir_changed]
                frame_t = frame_t[dir_changed]
                if debug:
                    beat_f = beat_f[dir_changed]
                    rms_f = rms_f[dir_changed]
                    pitch_f = pitch_f[dir_changed]
            # Update Progress
            if progress_callback:
                progress_callback(start_t/duration*100)
            # Provide Chunk Output Data
            if debug:
                yield frame_t,amplitude,beat_f,rms_f,pitch_f
            else:
                yield frame_t,amplitude
    # Automatically Perform Full Calculation When Output File is Specified
    if out:
        # Initialize Funscripts
        funscripts = {
            ''     : Funscript([],[]),
            'beat' : Funscript([],[]) if debug else None,
            'rms'  : Funscript([],[]) if debug else None,
            'pitch': Funscript([],[]) if debug else None
        }
        # Perform Chunk Calculations and Collect Funscript Data
        for chunk_data in chunk_operate():
            for i,(key,funscript) in enumerate(funscripts.items()):
                if funscript:
                    funscript.x = np.concatenate((funscript.x,chunk_data[0]))   # time [s]
                    funscript.y = np.concatenate((funscript.y,chunk_data[i+1])) # pos [0,1]
        # Save Funscripts (also scale them to [0,1], mostly used for debug scripts as out is already clipped)
        for key,funscript in funscripts.items():
            if funscript:
                funscript.y = np.interp(funscript.y,(funscript.y.min(),funscript.y.max()),(0,1))
                if key == '':
                    funscript.save_to_path(pathlib.Path(out))
                else:
                    funscript.save_to_path(pathlib.Path(out).with_suffix(f'.{key}.funscript'))
        # Set Progress to Completed
        if progress_callback:
            progress_callback(100.0)
        # Also Provide Resulting Funscript to Calling Function
        return funscripts['']
    # Provide Internal Chunk Generator to Calling Script
    else:
        return chunk_operate()
    
# Command Line Interface
#-----------------------------
if __name__ == "__main__":
    parser = argparse.ArgumentParser(
            prog='sine dancer',
            description='generates a Funscript by modulating a sinewave according to changes in audio properties'
            )
    parser.add_argument('target',type=pathlib.Path,help='input audio source (mp3,wav,etc)')
    parser.add_argument('-o','--output',type=pathlib.Path,default=None,help='output funscript destination (default is alongside input')
    parser.add_argument('-s','--chunk_size',type=float,default=argparse.SUPPRESS,help='number of seconds in stream/chunk window')
    parser.add_argument('-r','--reduction',action='store_true',help='reduce funscript to local extrema')
    parser.add_argument('-d','--debug',action='store_true',help='output auxiliary audio funscripts alongside output')
    parser.add_argument('--f_min' ,type=float,default=argparse.SUPPRESS,help='minimum movement speed [cycles/s]')
    parser.add_argument('--f_max' ,type=float,default=argparse.SUPPRESS,help='maximum movement speed [cycles/s]')
    parser.add_argument('--a_bias',type=float,default=argparse.SUPPRESS,help='envelope bias factor [0,10]')
    parser.add_argument('--a_min' ,type=float,default=argparse.SUPPRESS,help='envelope minimum amplitude [0,1]')
    parser.add_argument('--a_max' ,type=float,default=argparse.SUPPRESS,help='envelope maximum amplitude [0,1]')
    parser.add_argument('--c_min' ,type=float,default=argparse.SUPPRESS,help='minimum waveform offset [0,1]')
    parser.add_argument('--c_max' ,type=float,default=argparse.SUPPRESS,help='maximum waveform offset [0,1]')
    args = parser.parse_args()
    if not args.output:
        args.output = args.target.with_suffix('.funscript')
    # Progress Reporting Function (Overrides Itself)
    def progress(prog):
        print(f'\r    Progress: {prog:.2f}%'+'\033[K',end='',flush=True)
    # Call the Main Process
    sine_dancer(args.target,out=args.output,config=vars(args),progress_callback=progress)
    print('done')

Here are some examples generated using the default settings:
Queen - We Will Rock You.funscript (282.8 KB)
Celine Dion - My Heart Will Go On.funscript (591.7 KB)
And here is a visual showcasing the resulting funscript alongside it’s audio constituents:

The quality is obviously inferior however it has served me well as a preprocessor for other tools, particularly https://github.com/diglet48/restim and https://github.com/edger477/funscript-tools

Speaking of @edger477 if you find this useful feel free to incorperate it into your toolkit. No credit or permission needed.

4 Likes

How does this compare to funscript dancer?

Personally, I think Funscript Dancer produces better funscripts but I also find it hard to integrate with other projects, mainly due to its dependence on external programs. The python dancer port is better in that regard, but its code is difficult to comprehend whereby it performs a lot of extra optimization steps and output shaping. I also found this overhead to be error-prone, sometimes generating funscripts with exceptionally low/high center positions which then required me to manually intervene. Don’t get me wrong, the bells and whistles are nice, but ultimately, I found them unnecessary for my purpose.

I produce funscripts purely to feed into Restim for use in E-stim devices. These devices do not have the same limitations as traditional linear actuators and the sensations produced are also different. In my opinion, poorly optimized or more sporadic funscripts are better for E-stim, and so I wrote this script to fulfill that role.

Sine Dancer is more efficient, easier to understand, and is specifically designed to be integrated into other applications. The trade-off here being that the signals it produces are rougher (mostly because I choose to use a minimal amount of inline filtering).

1 Like

Will this work for beat-based joi videos? Im coyote user via howl app and I’m looking for a method to generate funscripts based on audio for my estim device, especially for h joi vids.

You will need to extract the audio from said video and then run the script on that. Any sound will be detected and the output funscript will produce periodic movement proportional to said sound’s intensity and extent. Meaning it will trigger on beats, voices, sound effects, and even background noise.

So yes it will work and if the beats are consistant then you will have a consistant output. However I am not familar with the howl app’s algorithm so please be mindful of the following:

  • generated funscripts are detailed and you may need to subsample for input to howl
  • the arguement “f_max” does not guarentee the output funscript will not exceed that value. Abrupt changes can also be created via beat detection so if your audio is abrupt the funscript may go from zero to 100 rapidly.

If you do end up giving it a try let me know how it goes. I would be happy to assist with adjustments/improvements.