Commit d1bb9a60 authored by RBD Lab's avatar RBD Lab

Merge branch '101-saucers-synth' into 'master'

August WIP

See merge request !1
parents df2e03d8 ddb9097d
......@@ -14,6 +14,12 @@ def get_first_rigid_body(optirx_packet):
return None
def get_all_rigid_bodies(optirx_packet):
if type(optirx_packet) is rx.FrameOfData and len(optirx_packet.rigid_bodies) > 0:
return optirx_packet.rigid_bodies
return None
def get_rigid_body_by_id(id, optirx_packet):
if type(optirx_packet) is rx.FrameOfData and len(optirx_packet.rigid_bodies) > 0:
for body in optirx_packet.rigid_bodies:
......
......@@ -5,7 +5,6 @@ This class is responsible for creating a PYO server
and manipulating the PYO commands that actually create the sound
"""
class PyoHandler:
def __init__(self):
# start the pyo server with the following parameters
......@@ -13,26 +12,89 @@ class PyoHandler:
self.s.start()
# each synth is a sin sound wave
#self.synth = pyo.SuperSaw()
self.synth = pyo.Sine()
self.synth2 = pyo.Sine(.2, mul=0.5, add=0.5).out(2)
self.synth3 = pyo.Sine(.2, mul=0.5, add=0.5).out(3)
#self.synth4 = pyo.Sine(.2, mul=0.5, add=0.5).out(4)
self.synth4 = pyo.SuperSaw()
self._next_out = 1
self.synths = []
#self.make_beat()
self._make_synths()
self._make_synths()
def _get_next_out(self):
o = self._next_out
self._next_out += 1
return o
def make_beat(self):
# Builds an amplitude envelope in a linear table
env = pyo.LinTable([(0, 0), (190, .8), (1000, .5), (4300, .1), (8191, 0)], size=8192)
env.graph()
# Metronome provided by Beat
met = pyo.Beat(time=.125, taps=16, w1=90, w2=50, w3=30).play()
# Reads the amp envelope for each trigger from Beat
amp = pyo.TrigEnv(met, table=env, dur=met['dur'], mul=met['amp'])
# Generates a midi note for each trigger from Beat in a pseudo-random distribution
fr = pyo.TrigXnoiseMidi(met, dist=12, x1=1, x2=.3, scale=0, mrange=(48, 85))
# Receives the midi note from XnoiseMidi and scale it into C harmonic minor (try others!)
frsnap = pyo.Snap(fr, choice=[0, 2, 3, 5, 7, 8, 11], scale=1)
# This instrument receives a frequency from Snap and molde it inside an envelop from TrigEnv
lfo = pyo.Sine(freq=.05, mul=.05, add=.08)
gen = pyo.SineLoop(freq=frsnap, feedback=lfo, mul=amp * .5).out(0)
# Output the same signal with some delay in the right speaker (try a 'real' counterpoint!)
rev = pyo.Delay(gen, delay=[.25, .5], feedback=.3, mul=.8).out(1)
self.s.gui(locals())
def _make_synths(self):
s = []
# #### 1 ####
# s.append(pyo.Sine().out(self._get_next_out()))
# s.append(pyo.Sine(.2, mul=0.5, add=0.5).out(self._get_next_out()))
# s.append(pyo.Sine(.2, mul=0.5, add=0.5).out(self._get_next_out()))
# s.append(pyo.Sine(.2, mul=0.5, add=0.5).out(self._get_next_out()))
# #s.append(pyo.SuperSaw().out(self._get_next_out()))
#### 2 ####
w = pyo.Sine(freq=7).out(self._get_next_out())
l = pyo.LFO(freq=w, type=2).out(self._get_next_out())
#sl = pyo.SineLoop(freq=w, feedback=l, mul=1).out(self._get_next_out())
s.append(w)
s.append(l)
#rev = pyo.Delay(sl, delay=[.25, .5]).out(self._get_next_out())
self.synths.append(s)
def change_pyo(self, idx, fr, bal, mul):
# #### 1 ####
# # change the pyo parameters:
# # fr = frequency
# # mul = volume
# fr = fr*fr/600
# # start the first sound wave
# self.synths[idx][0].setFreq(fr)
# self.synths[idx][0].setMul(mul)
# # adding more sin waves for the Y axis
# self.synths[idx][1].setFreq(fr * 7 / 3)
# self.synths[idx][2].setFreq(fr * 1.25)
# self.synths[idx][3].setFreq(fr * 7 / 1.5)
# # changing the amplitude of the waves according to position in Y axis
# self.synths[idx][1].setMul(mul*bal)
# self.synths[idx][2].setMul(mul*bal)
# self.synths[idx][3].setMul(mul*bal)
def change_pyo(self, fr, bal, mul):
#### 2 ####
# change the pyo parameters:
# fr = frequency
# mul = volume
fr = fr*fr/600
# start the first sound wave
self.synth.out(1)
self.synth.setFreq(fr)
self.synth.setMul(mul)
# adding more sin waves for the Y axis
self.synth2.setFreq(fr * 15 / 3)
self.synth3.setFreq(fr * 1.25)
self.synth4.setFreq(fr * 7 / 2)
# changing the amplitude of the waves according to position in Y axis
self.synth2.setMul(mul*bal)
self.synth3.setMul(mul*bal)
self.synth4.setMul(mul*bal)
\ No newline at end of file
self.synths[idx][0].setFreq(fr)
self.synths[idx][0].setMul(mul*bal)
#self.synths[idx][0].setPhase(bal)
\ No newline at end of file
File added
......@@ -2,6 +2,7 @@ import threading
import time
from rtscs_controllers.base_rtscs_controller import BaseRtscsController
from pyo_handler import PyoHandler
import optirx_utils
class SequentialRtscsController(BaseRtscsController):
......@@ -22,10 +23,16 @@ class SequentialRtscsController(BaseRtscsController):
a = PyoHandler()
while not self._is_shutdown_requested():
# get rtscs params form BaseRtscsParamReceiver
transposition, vel_shift, tempo_factor, hint, is_sustain = self._param_receiver.get_rtscs_params()
PyoHandler.change_pyo(a, transposition, vel_shift, tempo_factor)
packet = self._param_receiver._optirx_packet_receiver.get_last_packet()
bodies = optirx_utils.get_all_rigid_bodies(packet)
idx = 0
for body in bodies:
transposition, vel_shift, tempo_factor, hint, is_sustain = self._param_receiver.get_rtscs_params_body(body)
PyoHandler.change_pyo(a, idx, transposition, vel_shift, tempo_factor)
idx += 1
# sleep for duration of msg.time between msgs
time.sleep(0.05)
time.sleep(0.01)
......
import threading
#from eventlet.green import threading
import optirx as rx
DEFAULT_MULTICAST_ADDRESS = '239.255.42.99'
......@@ -18,9 +19,7 @@ class OptitrackPacketReceiver(threading.Thread):
super(OptitrackPacketReceiver, self).__init__()
self._natnetsdk_version = natnetsdk_version
self._last_packet = None
self._packet_lock = threading.Lock()
self._is_shutdown_request = False
self._shutdown_lock = threading.Lock()
# open UDP port to Optitrack server
self._dsock = rx.mkdatasock(ip_address=server_ip_addr, multicast_address=multicast_addr, port=data_port)
......@@ -28,10 +27,8 @@ class OptitrackPacketReceiver(threading.Thread):
while not self._is_shutdown_requested():
# receive UDP data packet from Optitrack server
data = self._dsock.recv(rx.MAX_PACKETSIZE)
self._packet_lock.acquire()
# parse packet and store it locally
self._last_packet = rx.unpack(data, self._natnetsdk_version)
self._packet_lock.release()
# update natnet sdk version to what Optitrack software says it is
if type(self._last_packet) is rx.SenderData:
self._natnetsdk_version = self._last_packet.natnet_version
......@@ -40,18 +37,12 @@ class OptitrackPacketReceiver(threading.Thread):
"""
Returns the last packet recieved from the OptiTrack system as a optirx object
"""
self._packet_lock.acquire()
last_packet = self._last_packet
self._packet_lock.release()
return last_packet
def _is_shutdown_requested(self):
self._shutdown_lock.acquire()
is_shutdown_request = self._is_shutdown_request
self._shutdown_lock.release()
return is_shutdown_request
def shutdown(self):
self._shutdown_lock.acquire()
self._is_shutdown_request = True
self._shutdown_lock.release()
......@@ -7,9 +7,18 @@ import optirx_utils
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {'x': (-1.5, 1.3), 'y': (0.0, 2.0), 'z': (-1.5, 1.7), 'rh_roll': (-180, 180)}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {'frequency': (0, 600), 'sins': (0, 0.5),
'amplitude': (0.1, 1), 'numerical_hint': (0, 1)}
DEFAULT_OPTITRACK_RANGES_DICT = {
'x': (-0.9, 1.5),
'y': (-2.1, 1.2),
'z': (-0.1, 2),
'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
'frequency': (0, 700),
'sins': (0, 0.45),
'amplitude': (0.1, 1),
'numerical_hint': (0, 1)
}
class OptitrackParamReceiver(BaseRtscsParamReceiver):
......@@ -97,6 +106,29 @@ class OptitrackParamReceiver(BaseRtscsParamReceiver):
return transformed_params
def get_rtscs_params_body(self, rh):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
# print rh_position
# print rh_roll; return
transformed_params = self.transform_params(rh_position, rh_roll)
# print transformed_params
return transformed_params
def get_rtscs_params(self):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
......
import optirx as rx
import optirx_utils
import eventlet
import socketio
eventlet.monkey_patch()
from utils import clip_value, remap_range, round_to_precision
import math
sio = socketio.Server(cors_allowed_origins=None)
app = socketio.WSGIApp(sio)
@sio.on('connect')
def connect(sid, environ):
print('connect ', sid)
#eventlet.spawn(deb, sio)
#sio.emit("rh", [1, 2, 3])
@sio.on('my message')
def message(sid, data):
print('message ', data)
@sio.on('disconnect')
def disconnect(sid):
print('disconnect ', sid)
def start_sio():
eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {
'x': (-0.7, 0.5),
'y': (-0.7, 0.55),
'z': (-0.4, 1.0),
'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
#'frequency': (0, 700),
#'sins': (0, 0.45),
'frequency': (0, 7),
'sins': (0, 7),
'amplitude': (5, 200),
'numerical_hint': (0, 1)
}
def transform_params(rh_position, rh_roll, lh_position=None):
"""
Transforms OptiTrack params into rtscs params
:param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
:param rh_roll: roll angle in degrees of the right hand "rigid body"
:param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
:return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
"""
is_sustain = False # TODO: derive is_sustain from distance between rh_position and lh_position
transformed_params = rh_position + [rh_roll, is_sustain]
# Remap rh position onto rtscs internal ranges
# x -> frequency
# y -> number of waves which define the complex wave.
# z -> amplitude.
for i, mapped_pair in enumerate((('x', 'frequency'), ('y', 'sins'),
('z', 'amplitude'), ('rh_roll', 'numerical_hint'))):
source_low = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][0]
source_high = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][1]
destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]
# clip Optritrack param to the limits of its defined valid range
transformed_params[i] = clip_value(transformed_params[i],
source_low, source_high)
# translate Optritrack param to rtscs param
transformed_params[i] = remap_range(transformed_params[i],
source_low, source_high,
destination_low, destination_high)
# convert frequency and number of sins to int
for i in range(1):
transformed_params[i] = int(round(transformed_params[i]))
# round amplitude factor to precision
transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)
return transformed_params
def get_rtscs_params_body(rh):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
return transform_params(rh_position, rh_roll)
def get_optirx_data():
dsock = rx.mkdatasock(ip_address="127.0.0.1", multicast_address='239.255.42.99', port=1511)
natnetsdk_version = (2, 8, 0, 0)
while True:
data = dsock.recv(rx.MAX_PACKETSIZE)
# parse packet and store it locally
packet = rx.unpack(data, natnetsdk_version)
if type(packet) is rx.SenderData:
natnetsdk_version = packet.natnet_version
rh = optirx_utils.get_first_rigid_body(packet)
if rh is not None:
params = get_rtscs_params_body(rh)
print(params)
sio.emit("rh", params)
eventlet.spawn(start_sio)
t_data = eventlet.spawn(get_optirx_data)
t_data.wait()
\ No newline at end of file
import threading
import eventlet
import socketio
import time
eventlet.monkey_patch()
from rtscs_param_receivers.optitrack_param_receiver.optitrack_packet_receiver import OptitrackPacketReceiver
import optirx_utils
from utils import clip_value, remap_range, round_to_precision
import math
#eventlet.monkey_patch()
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {
'x': (-0.9, 1.5),
'y': (-2.1, 1.2),
'z': (-0.1, 2),
'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
#'frequency': (0, 700),
#'sins': (0, 0.45),
'frequency': (0, 7),
'sins': (0, 7),
'amplitude': (0.1, 1),
'numerical_hint': (0, 1)
}
sio = socketio.Server(cors_allowed_origins=None)
app = socketio.WSGIApp(sio)
@sio.on('connect')
def connect(sid, environ):
print('connect ', sid)
#eventlet.spawn(deb, sio)
#sio.emit("rh", [1, 2, 3])
@sio.on('my message')
def message(sid, data):
print('message ', data)
@sio.on('disconnect')
def disconnect(sid):
print('disconnect ', sid)
def start_sio():
eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
def transform_params(rh_position, rh_roll, lh_position=None):
"""
Transforms OptiTrack params into rtscs params
:param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
:param rh_roll: roll angle in degrees of the right hand "rigid body"
:param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
:return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
"""
is_sustain = False # TODO: derive is_sustain from distance between rh_position and lh_position
transformed_params = rh_position + [rh_roll, is_sustain]
# Remap rh position onto rtscs internal ranges
# x -> frequency
# y -> number of waves which define the complex wave.
# z -> amplitude.
for i, mapped_pair in enumerate((('x', 'frequency'), ('y', 'sins'),
('z', 'amplitude'), ('rh_roll', 'numerical_hint'))):
source_low = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][0]
source_high = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][1]
destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]
# clip Optritrack param to the limits of its defined valid range
transformed_params[i] = clip_value(transformed_params[i],
source_low, source_high)
# translate Optritrack param to rtscs param
transformed_params[i] = remap_range(transformed_params[i],
source_low, source_high,
destination_low, destination_high)
# convert frequency and number of sins to int
for i in range(1):
transformed_params[i] = int(round(transformed_params[i]))
# round amplitude factor to precision
transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)
return transformed_params
def get_rtscs_params_body(rh):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
# print rh_position
# print rh_roll; return
return transform_params(rh_position, rh_roll)
optritrack_packet_recv = OptitrackPacketReceiver('127.0.0.1') # adjust OptiTrack server IP if needed
optritrack_packet_recv.start()
def motive_data():
while True:
packet = optritrack_packet_recv.get_last_packet()
rh = optirx_utils.get_first_rigid_body(packet)
if rh is not None:
p = get_rtscs_params_body(rh)
print(p)
sio.emit("rh", p)
eventlet.sleep(1)
def deb(s):
while True:
s.emit("rh", [1, 2])
eventlet.sleep(1)
print("ok")
#eventlet.spawn(motive_data)
#eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
#motive_data()
while True:
packet = optritrack_packet_recv.get_last_packet()
rh = optirx_utils.get_first_rigid_body(packet)
print(rh)
\ No newline at end of file
import optirx as rx
import optirx_utils
import eventlet
import math
from oscpy.client import OSCClient
from utils import clip_value, remap_range, round_to_precision
import json
eventlet.monkey_patch()
UDP_IP = "192.168.43.186"
UDP_PORT = 5000
max_udp_client = OSCClient(UDP_IP, UDP_PORT)
def send_udp(message):
max_udp_client.send_message("/", message)
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {
'x': (-0.88, 0.19),
'y': (-0.03, 0.49),
'z': (0.07, 1.3),
'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
'tempo': (75, 170),
'senda': (0, 100),
'mastervol': (-55, 0),
'numerical_hint': (0, 1)
}
def transform_params(rh_position, rh_roll, lh_position=None):
"""
Transforms OptiTrack params into rtscs params
:param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
:param rh_roll: roll angle in degrees of the right hand "rigid body"
:param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
:return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
"""
is_sustain = False # TODO: derive is_sustain from distance between rh_position and lh_position
transformed_params = rh_position + [rh_roll, is_sustain]
# Remap rh position onto rtscs internal ranges
# x -> frequency
# y -> number of waves which define the complex wave.
# z -> amplitude.
for i, mapped_pair in enumerate((('x', 'tempo'), ('y', 'senda'),
('z', 'mastervol'), ('rh_roll', 'numerical_hint'))):
source_low = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][0]
source_high = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][1]
destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]
# clip Optritrack param to the limits of its defined valid range
transformed_params[i] = clip_value(transformed_params[i],
source_low, source_high)
# translate Optritrack param to rtscs param
transformed_params[i] = remap_range(transformed_params[i],
source_low, source_high,
destination_low, destination_high)
# convert frequency and number of sins to int
for i in range(1):
transformed_params[i] = int(round(transformed_params[i]))
# round amplitude factor to precision
transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)
return transformed_params
def get_rtscs_params_body(rh):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
return transform_params(rh_position, rh_roll)
def get_optirx_data():
dsock = rx.mkdatasock(ip_address="127.0.0.1", multicast_address='239.255.42.99', port=1511)
natnetsdk_version = (2, 8, 0, 0)
while True:
data = dsock.recv(rx.MAX_PACKETSIZE)
# parse packet and store it locally
packet = rx.unpack(data, natnetsdk_version)
if type(packet) is rx.SenderData:
natnetsdk_version = packet.natnet_version
rh = optirx_utils.get_first_rigid_body(packet)
if rh is not None:
print(rh.position)
params = get_rtscs_params_body(rh)
if params != (0, 0, 1, 0, False):
print(params)
msg = [json.dumps({
'tempo': params[0],
'senda': params[1],
'mastervol': params[2],
})]
send_udp(msg)
print(msg)
eventlet.sleep(0.1)
t_data = eventlet.spawn(get_optirx_data)
t_data.wait()
\ No newline at end of file
File added
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment