Commit 43feef20 authored by Kamal Nasser's avatar Kamal Nasser

Clean up

parent 19ff62de
import pyo
"""
This class is responsible for creating a PYO server
and manipulating the PYO commands that actually create the sound
"""
class PyoHandler:
def __init__(self):
# start the pyo server with the following parameters
self.s = pyo.Server(sr=48000, nchnls=2, buffersize=512, duplex=0, winhost="asio").boot()
self.s.start()
# each synth is a sin sound wave
#self.synth = pyo.SuperSaw()
self._next_out = 1
self.synths = []
#self.make_beat()
self._make_synths()
self._make_synths()
def _get_next_out(self):
o = self._next_out
self._next_out += 1
return o
def make_beat(self):
# Builds an amplitude envelope in a linear table
env = pyo.LinTable([(0, 0), (190, .8), (1000, .5), (4300, .1), (8191, 0)], size=8192)
env.graph()
# Metronome provided by Beat
met = pyo.Beat(time=.125, taps=16, w1=90, w2=50, w3=30).play()
# Reads the amp envelope for each trigger from Beat
amp = pyo.TrigEnv(met, table=env, dur=met['dur'], mul=met['amp'])
# Generates a midi note for each trigger from Beat in a pseudo-random distribution
fr = pyo.TrigXnoiseMidi(met, dist=12, x1=1, x2=.3, scale=0, mrange=(48, 85))
# Receives the midi note from XnoiseMidi and scale it into C harmonic minor (try others!)
frsnap = pyo.Snap(fr, choice=[0, 2, 3, 5, 7, 8, 11], scale=1)
# This instrument receives a frequency from Snap and molde it inside an envelop from TrigEnv
lfo = pyo.Sine(freq=.05, mul=.05, add=.08)
gen = pyo.SineLoop(freq=frsnap, feedback=lfo, mul=amp * .5).out(0)
# Output the same signal with some delay in the right speaker (try a 'real' counterpoint!)
rev = pyo.Delay(gen, delay=[.25, .5], feedback=.3, mul=.8).out(1)
self.s.gui(locals())
def _make_synths(self):
s = []
# #### 1 ####
# s.append(pyo.Sine().out(self._get_next_out()))
# s.append(pyo.Sine(.2, mul=0.5, add=0.5).out(self._get_next_out()))
# s.append(pyo.Sine(.2, mul=0.5, add=0.5).out(self._get_next_out()))
# s.append(pyo.Sine(.2, mul=0.5, add=0.5).out(self._get_next_out()))
# #s.append(pyo.SuperSaw().out(self._get_next_out()))
#### 2 ####
w = pyo.Sine(freq=7).out(self._get_next_out())
l = pyo.LFO(freq=w, type=2).out(self._get_next_out())
#sl = pyo.SineLoop(freq=w, feedback=l, mul=1).out(self._get_next_out())
s.append(w)
s.append(l)
#rev = pyo.Delay(sl, delay=[.25, .5]).out(self._get_next_out())
self.synths.append(s)
def change_pyo(self, idx, fr, bal, mul):
# #### 1 ####
# # change the pyo parameters:
# # fr = frequency
# # mul = volume
# fr = fr*fr/600
# # start the first sound wave
# self.synths[idx][0].setFreq(fr)
# self.synths[idx][0].setMul(mul)
# # adding more sin waves for the Y axis
# self.synths[idx][1].setFreq(fr * 7 / 3)
# self.synths[idx][2].setFreq(fr * 1.25)
# self.synths[idx][3].setFreq(fr * 7 / 1.5)
# # changing the amplitude of the waves according to position in Y axis
# self.synths[idx][1].setMul(mul*bal)
# self.synths[idx][2].setMul(mul*bal)
# self.synths[idx][3].setMul(mul*bal)
#### 2 ####
# change the pyo parameters:
# fr = frequency
# mul = volume
fr = fr*fr/600
# start the first sound wave
self.synths[idx][0].setFreq(fr)
self.synths[idx][0].setMul(mul*bal)
#self.synths[idx][0].setPhase(bal)
\ No newline at end of file
import mido
import abc
class BaseRtscsController(object):
"""
A base class for the system's controller - the heart of the system, which coordinates the activities of
all of the system's components.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, param_receiver, midi_adjuster, midi_out_handler):
self._param_receiver = param_receiver
self._midi_adjuster = midi_adjuster
self._midi_out_handler = midi_out_handler
@abc.abstractmethod
def start_system(self):
"""
The definition of the system's flow goes here.
"""
return
def shutdown_system(self):
"""
if overridden super must to be called at the end
"""
self._param_receiver.shutdown_receiver()
import threading
import time
from rtscs_controllers.base_rtscs_controller import BaseRtscsController
from pyo_handler import PyoHandler
import optirx_utils
class SequentialRtscsController(BaseRtscsController):
"""
A concrete class derived form BaseRtscsController.
Implements the general system flow suggested in BaseRtscsController in an uninterpretable manner,
meaning it will always complete the flow, and will never break it in the middle.
"""
def __init__(self, param_receiver,):
super(SequentialRtscsController, self).__init__(param_receiver,
None, None)
self._last_sustain_status = False
self._is_shutdown_request = False
self._shutdown_lock = threading.Lock()
def start_system(self):
# get an instance of PyoHandler which controls the sounds created by the system
a = PyoHandler()
while not self._is_shutdown_requested():
# get rtscs params form BaseRtscsParamReceiver
packet = self._param_receiver._optirx_packet_receiver.get_last_packet()
bodies = optirx_utils.get_all_rigid_bodies(packet)
idx = 0
for body in bodies:
transposition, vel_shift, tempo_factor, hint, is_sustain = self._param_receiver.get_rtscs_params_body(body)
PyoHandler.change_pyo(a, idx, transposition, vel_shift, tempo_factor)
idx += 1
# sleep for duration of msg.time between msgs
time.sleep(0.01)
def shutdown_system(self):
self._shutdown_lock.acquire()
self._is_shutdown_request = True
self._shutdown_lock.release()
super(SequentialRtscsController, self).shutdown_system()
def _is_shutdown_requested(self):
self._shutdown_lock.acquire()
is_shutdown_requested = self._is_shutdown_request
self._shutdown_lock.release()
return is_shutdown_requested
from rtscs_param_receivers.optitrack_param_receiver.optitrack_param_receiver import OptitrackParamReceiver
from rtscs_controllers.sequential_rtscs_controller import SequentialRtscsController
from rtscs_thread import RtscsThread
from utils import display_shutdown_prompt
def main():
# Init all system components, and start the system.
# Receive system params from Optitrack
# This Should be the client ip address
pyo_parameters = OptitrackParamReceiver('127.0.0.1')
#send the parameters to the controller and change the pyoHandler
pyo_controller = SequentialRtscsController(pyo_parameters)
#send the parameters from the pyoHandler to a new thread
pyo_main_thread = RtscsThread(pyo_controller)
#start the thread
pyo_main_thread.start()
display_shutdown_prompt()
#stop the thread
pyo_main_thread.shutdown()
if __name__ == '__main__':
main()
import threading
class RtscsThread(threading.Thread):
def __init__(self, rtscs_controller):
super(RtscsThread, self).__init__()
self._rtscs_controller = rtscs_controller
def run(self):
#start the system
self._rtscs_controller.start_system()
def shutdown(self):
#stop the system
self._rtscs_controller.shutdown_system()
"""
Run this utility to see which midi ports are available
"""
import mido
if __name__ == '__main__':
backend_name = 'pygame' # change if you're using another backend
mido.set_backend('mido.backends.' + backend_name)
mido.backend
print mido.get_output_names()
"""
If you have pip installed or you're using Python 2.7.9 or higher (which has pip pre-installed),
you can run this script to install of the prerequisites for RTSMCS.
"""
import os
PREREQUISITE_NAME_LIST = ['mido', 'optirx', 'nibabel', 'pygame']
if __name__ == '__main__':
for prereq_name in PREREQUISITE_NAME_LIST:
os.system('pip install ' + prereq_name)
\ No newline at end of file
import threading
import eventlet
import socketio
import time
eventlet.monkey_patch()
from rtscs_param_receivers.optitrack_param_receiver.optitrack_packet_receiver import OptitrackPacketReceiver
import optirx_utils
from utils import clip_value, remap_range, round_to_precision
import math
#eventlet.monkey_patch()
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {
'x': (-0.9, 1.5),
'y': (-2.1, 1.2),
'z': (-0.1, 2),
'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
#'frequency': (0, 700),
#'sins': (0, 0.45),
'frequency': (0, 7),
'sins': (0, 7),
'amplitude': (0.1, 1),
'numerical_hint': (0, 1)
}
sio = socketio.Server(cors_allowed_origins=None)
app = socketio.WSGIApp(sio)
@sio.on('connect')
def connect(sid, environ):
print('connect ', sid)
#eventlet.spawn(deb, sio)
#sio.emit("rh", [1, 2, 3])
@sio.on('my message')
def message(sid, data):
print('message ', data)
@sio.on('disconnect')
def disconnect(sid):
print('disconnect ', sid)
def start_sio():
eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
def transform_params(rh_position, rh_roll, lh_position=None):
"""
Transforms OptiTrack params into rtscs params
:param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
:param rh_roll: roll angle in degrees of the right hand "rigid body"
:param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
:return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
"""
is_sustain = False # TODO: derive is_sustain from distance between rh_position and lh_position
transformed_params = rh_position + [rh_roll, is_sustain]
# Remap rh position onto rtscs internal ranges
# x -> frequency
# y -> number of waves which define the complex wave.
# z -> amplitude.
for i, mapped_pair in enumerate((('x', 'frequency'), ('y', 'sins'),
('z', 'amplitude'), ('rh_roll', 'numerical_hint'))):
source_low = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][0]
source_high = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][1]
destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]
# clip Optritrack param to the limits of its defined valid range
transformed_params[i] = clip_value(transformed_params[i],
source_low, source_high)
# translate Optritrack param to rtscs param
transformed_params[i] = remap_range(transformed_params[i],
source_low, source_high,
destination_low, destination_high)
# convert frequency and number of sins to int
for i in range(1):
transformed_params[i] = int(round(transformed_params[i]))
# round amplitude factor to precision
transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)
return transformed_params
def get_rtscs_params_body(rh):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
# print rh_position
# print rh_roll; return
return transform_params(rh_position, rh_roll)
optritrack_packet_recv = OptitrackPacketReceiver('127.0.0.1') # adjust OptiTrack server IP if needed
optritrack_packet_recv.start()
def motive_data():
while True:
packet = optritrack_packet_recv.get_last_packet()
rh = optirx_utils.get_first_rigid_body(packet)
if rh is not None:
p = get_rtscs_params_body(rh)
print(p)
sio.emit("rh", p)
eventlet.sleep(1)
def deb(s):
while True:
s.emit("rh", [1, 2])
eventlet.sleep(1)
print("ok")
#eventlet.spawn(motive_data)
#eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
#motive_data()
while True:
packet = optritrack_packet_recv.get_last_packet()
rh = optirx_utils.get_first_rigid_body(packet)
print(rh)
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment