Commit 6a04ad67 authored by Kamal Nasser's avatar Kamal Nasser

initial commit

parents
venv/
.idea
\ No newline at end of file
import optirx as rx
from nibabel import eulerangles
def get_first_unlabeled_marker(optirx_packet):
if type(optirx_packet) is rx.FrameOfData and len(optirx_packet.other_markers) > 0:
return optirx_packet.other_markers[0]
return None
def get_first_rigid_body(optirx_packet):
if type(optirx_packet) is rx.FrameOfData and len(optirx_packet.rigid_bodies) > 0:
return optirx_packet.rigid_bodies[0]
return None
def get_rigid_body_by_id(id, optirx_packet):
if type(optirx_packet) is rx.FrameOfData and len(optirx_packet.rigid_bodies) > 0:
for body in optirx_packet.rigid_bodies:
if body.id == id:
return body
return None
def orientation2radians(orientation):
"""
Convert from OptiTracks internal quaternion angle representation of an object's orientation to
to euler angle representation in radians
:param orientation: An OptiTrack object object orientation in quaternion angle representation (qx, qy, qz, qw)
:return: orientation in euler angle representation in radians (roll, yaw, pitch)
"""
reordered_orientation = (orientation[-1],) + orientation[:-1] # (qx, qy, qz, qw) -> (qw, qx, qy, qz)
return eulerangles.quat2euler(reordered_orientation)
import pyo
"""
This class is responsible for creating a PYO server
and manipulating the PYO commands that actually create the sound
"""
class PyoHandler:
def __init__(self):
# start the pyo server with the following parameters
self.s = pyo.Server(sr=48000, nchnls=2, buffersize=512, duplex=0, winhost="asio").boot()
self.s.start()
# each synth is a sin sound wave
self.synth = pyo.Sine(.4, mul=0.5, add=0.5)
self.synth2 = pyo.Sine(.4, mul=0.5, add=0.5).out(2)
self.synth3 = pyo.Sine(.4, mul=0.5, add=0.5).out(3)
self.synth4 = pyo.Sine(.4, mul=0.5, add=0.5).out(4)
def change_pyo(self, fr, bal, mul):
# change the pyo parameters:
# fr = frequency
# mul = volume
fr = fr*fr/600
# start the first sound wave
self.synth.out(1)
self.synth.setFreq(fr)
self.synth.setMul(mul)
# adding more sin waves for the Y axis
self.synth2.setFreq(fr * 5 / 3)
self.synth3.setFreq(fr * 1.25)
self.synth4.setFreq(fr * 7 / 2)
# changing the amplitude of the waves according to position in Y axis
self.synth2.setMul(mul*bal)
self.synth3.setMul(mul*bal)
self.synth4.setMul(mul*bal)
\ No newline at end of file
import mido
import abc
class BaseRtscsController(object):
"""
A base class for the system's controller - the heart of the system, which coordinates the activities of
all of the system's components.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, param_receiver, midi_adjuster, midi_out_handler):
self._param_receiver = param_receiver
self._midi_adjuster = midi_adjuster
self._midi_out_handler = midi_out_handler
@abc.abstractmethod
def start_system(self):
"""
The definition of the system's flow goes here.
"""
return
def shutdown_system(self):
"""
if overridden super must to be called at the end
"""
self._param_receiver.shutdown_receiver()
import threading
import time
from rtscs_controllers.base_rtscs_controller import BaseRtscsController
from pyo_handler import PyoHandler
class SequentialRtscsController(BaseRtscsController):
"""
A concrete class derived form BaseRtscsController.
Implements the general system flow suggested in BaseRtscsController in an uninterpretable manner,
meaning it will always complete the flow, and will never break it in the middle.
"""
def __init__(self, param_receiver,):
super(SequentialRtscsController, self).__init__(param_receiver,
None, None)
self._last_sustain_status = False
self._is_shutdown_request = False
self._shutdown_lock = threading.Lock()
def start_system(self):
# get an instance of PyoHandler which controls the sounds created by the system
a = PyoHandler()
while not self._is_shutdown_requested():
# get rtscs params form BaseRtscsParamReceiver
transposition, vel_shift, tempo_factor, hint, is_sustain = self._param_receiver.get_rtscs_params()
PyoHandler.change_pyo(a, transposition, vel_shift, tempo_factor)
# sleep for duration of msg.time between msgs
time.sleep(0.05)
def shutdown_system(self):
self._shutdown_lock.acquire()
self._is_shutdown_request = True
self._shutdown_lock.release()
super(SequentialRtscsController, self).shutdown_system()
def _is_shutdown_requested(self):
self._shutdown_lock.acquire()
is_shutdown_requested = self._is_shutdown_request
self._shutdown_lock.release()
return is_shutdown_requested
from rtscs_param_receivers.optitrack_param_receiver.optitrack_param_receiver import OptitrackParamReceiver
from rtscs_controllers.sequential_rtscs_controller import SequentialRtscsController
from rtscs_thread import RtscsThread
from utils import display_shutdown_prompt
def main():
# Init all system components, and start the system.
# Receive system params from Optitrack
# This Should be the client ip address
pyo_parameters = OptitrackParamReceiver('127.0.0.1')
#send the parameters to the controller and change the pyoHandler
pyo_controller = SequentialRtscsController(pyo_parameters)
#send the parameters from the pyoHandler to a new thread
pyo_main_thread = RtscsThread(pyo_controller)
#start the thread
pyo_main_thread.start()
display_shutdown_prompt()
#stop the thread
pyo_main_thread.shutdown()
if __name__ == '__main__':
main()
import abc
class BaseRtscsParamReceiver(object):
"""
A base class which defines an interface for getting the system's internal params
from a human interface device (be it hardware or software).
Driving classes should be able to take the input from a human interface device, interpret it as they wish
and convert it to the defined internal system params as defined below in get_rtscs_params().
The type of the human interface device doesn't matter as long as input data can be collected from it,
it could be a camera, mouse, joystick or a GUI, or anything else.
Depending on the scenario, it might be advisable that the derived class would utilize a separate thread/process
to continuously collect the input data.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get_rtscs_params(self):
"""
Reads the input from the human interface device, converts it to internal rtscs_params and returns it.
The internal rtscs_params are as follows:
* frequency
* amplitude
* number of sine waves
:return: (frequency, amplitude, number of sine waves)
"""
return
@abc.abstractmethod
def shutdown_receiver(self):
"""
Free all threads / other resources used by param receiver.
If no such resources are used can be implemented as an empty method.
"""
return
import threading
import optirx as rx
DEFAULT_MULTICAST_ADDRESS = '239.255.42.99'
DEFAULT_DATA_PORT = 1511
DEFAULT_NATNETSDK_VERSION = (2, 8, 0, 0)
class OptitrackPacketReceiver(threading.Thread):
"""
Responsible for constantly receiving UDP packets form the OptiTrack system, parsing them to optirx objects,
and storing them locally to make them available when needed.
The OptitrackPacketReceiver is a separate thread, so it continues receiving UDP packets regardless of the
activity of the rest of the system.
"""
def __init__(self, server_ip_addr, natnetsdk_version=DEFAULT_NATNETSDK_VERSION,
multicast_addr=DEFAULT_MULTICAST_ADDRESS, data_port=DEFAULT_DATA_PORT):
super(OptitrackPacketReceiver, self).__init__()
self._natnetsdk_version = natnetsdk_version
self._last_packet = None
self._packet_lock = threading.Lock()
self._is_shutdown_request = False
self._shutdown_lock = threading.Lock()
# open UDP port to Optitrack server
self._dsock = rx.mkdatasock(ip_address=server_ip_addr, multicast_address=multicast_addr, port=data_port)
def run(self):
while not self._is_shutdown_requested():
# receive UDP data packet from Optitrack server
data = self._dsock.recv(rx.MAX_PACKETSIZE)
self._packet_lock.acquire()
# parse packet and store it locally
self._last_packet = rx.unpack(data, self._natnetsdk_version)
self._packet_lock.release()
# update natnet sdk version to what Optitrack software says it is
if type(self._last_packet) is rx.SenderData:
self._natnetsdk_version = self._last_packet.natnet_version
def get_last_packet(self):
"""
Returns the last packet recieved from the OptiTrack system as a optirx object
"""
self._packet_lock.acquire()
last_packet = self._last_packet
self._packet_lock.release()
return last_packet
def _is_shutdown_requested(self):
self._shutdown_lock.acquire()
is_shutdown_request = self._is_shutdown_request
self._shutdown_lock.release()
return is_shutdown_request
def shutdown(self):
self._shutdown_lock.acquire()
self._is_shutdown_request = True
self._shutdown_lock.release()
import time
import math
from rtscs_param_receivers.base_rtscs_param_receiver import BaseRtscsParamReceiver
from optitrack_packet_receiver import OptitrackPacketReceiver
from utils import clip_value, remap_range, round_to_precision, dicts_have_same_keys
import optirx_utils
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {'x': (-3, 3), 'y': (1, 1.9), 'z': (-1, 1.7), 'rh_roll': (-80, 100)}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {'frequency': (0, 1000), 'sins': (0, 0.5),
'amplitude': (0, 1), 'numerical_hint': (0, 1)}
class OptitrackParamReceiver(BaseRtscsParamReceiver):
"""
A concrete class derived form BaseRtscsParamReceiver.
When requested, takes the last packet received from the OptiTrack server and transforms the
data it carries regrading objects in space into rtscs params.
Please consult the RTSMCS setup guide to get a better understanding of how to use OptitrackParamReceiver.
For the right hand "rigid body", the transformation is as follows:
- X coordinate is remapped to frequency
- Y coordinate is remapped to Number of sine waves
- Z coordinate is remapped to Amplitude
"""
def __init__(self, server_ip_address,
optitrx_valid_ranges_dict=DEFAULT_OPTITRACK_RANGES_DICT,
rtscs_valid_ranges_dict=DEFAULT_RTSCS_PARAM_RANGES_DICT,
note_duration_precision=DEFAULT_NOTE_DURATION_PRECISION):
"""
:param server_ip_address: IP address of the OptiTrack server
:param optitrx_valid_ranges_dict: A dict with ranges of the bounding box of the space in which
the system operator will be working, and the extreme angles of rotation that
the operator's hand can be held and at which it is still
detected by the cameras.
The dict is to be structured as DEFAULT_OPTITRACK_RANGES_DICT.
Bounds can be measured using "measurement_util.py" in setup_utils
:param rtscs_valid_ranges_dict: A dict with desired ranges of rtscs params.
The dict is to be structured as DEFAULT_RTSCS_PARAM_RANGES_DICT.
:param note_duration_precision: duration_factor precision to be rounded to
"""
self._optirx_valid_ranges_dict = optitrx_valid_ranges_dict
self._rtscs_valid_ranges_dict = rtscs_valid_ranges_dict
self._validate_user_dict_keys()
self._note_duration_precision = note_duration_precision
# Thread that continuously collects data from OptiTrack streaming engine
self._optirx_packet_receiver = OptitrackPacketReceiver(server_ip_address)
self._optirx_packet_receiver.start()
def _validate_user_dict_keys(self):
"""
Check that the users ranges dict are structured correctly.
"""
if not dicts_have_same_keys(self._optirx_valid_ranges_dict, DEFAULT_OPTITRACK_RANGES_DICT):
raise ValueError('The keys of optirx_valid_ranges_dict are invalid')
if not dicts_have_same_keys(self._rtscs_valid_ranges_dict, DEFAULT_RTSCS_PARAM_RANGES_DICT):
raise ValueError('The keys of _rtscs_valid_ranges_dict are invalid')
def transform_params(self, rh_position, rh_roll, lh_position=None):
"""
Transforms OptiTrack params into rtscs params
:param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
:param rh_roll: roll angle in degrees of the right hand "rigid body"
:param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
:return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
"""
is_sustain = False # TODO: derive is_sustain from distance between rh_position and lh_position
transformed_params = rh_position + [rh_roll, is_sustain]
# Remap rh position onto rtscs internal ranges
# x -> frequency
# y -> number of waves which define the complex wave.
# z -> amplitude.
for i, mapped_pair in enumerate((('x', 'frequency'), ('y', 'sins'),
('z', 'amplitude'), ('rh_roll', 'numerical_hint'))):
source_low = self._optirx_valid_ranges_dict[mapped_pair[0]][0]
source_high = self._optirx_valid_ranges_dict[mapped_pair[0]][1]
destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]
# clip Optritrack param to the limits of its defined valid range
transformed_params[i] = clip_value(transformed_params[i],
source_low, source_high)
# translate Optritrack param to rtscs param
transformed_params[i] = remap_range(transformed_params[i],
source_low, source_high,
destination_low, destination_high)
# convert frequency and number of sins to int
for i in range(1):
transformed_params[i] = int(round(transformed_params[i]))
# round amplitude factor to precision
transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)
return transformed_params
def get_rtscs_params(self):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
packet = self._optirx_packet_receiver.get_last_packet()
rh = optirx_utils.get_first_rigid_body(packet)
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
# print rh_position
# print rh_roll; return
transformed_params = self.transform_params(rh_position, rh_roll)
# print transformed_params
return transformed_params
def shutdown_receiver(self):
self._optirx_packet_receiver.shutdown()
if __name__ == '__main__':
optirxrec = OptitrackParamReceiver('127.0.0.1')
while True:
time.sleep(0.2)
optirxrec.get_rtscs_params()
\ No newline at end of file
import threading
class RtscsThread(threading.Thread):
def __init__(self, rtscs_controller):
super(RtscsThread, self).__init__()
self._rtscs_controller = rtscs_controller
def run(self):
#start the system
self._rtscs_controller.start_system()
def shutdown(self):
#stop the system
self._rtscs_controller.shutdown_system()
"""
Run this utility to see which midi ports are available
"""
import mido
if __name__ == '__main__':
backend_name = 'pygame' # change if you're using another backend
mido.set_backend('mido.backends.' + backend_name)
mido.backend
print mido.get_output_names()
"""
If you have pip installed or you're using Python 2.7.9 or higher (which has pip pre-installed),
you can run this script to install of the prerequisites for RTSMCS.
"""
import os
PREREQUISITE_NAME_LIST = ['mido', 'optirx', 'nibabel', 'pygame']
if __name__ == '__main__':
for prereq_name in PREREQUISITE_NAME_LIST:
os.system('pip install ' + prereq_name)
\ No newline at end of file
"""
A utility to help determine the desired bounding box of the space in which the system operator will be working,
and the extreme angles of rotation that the operator hand can be held and at which it is still detected by the cameras.
"""
import time
import math
from rtscs_param_receivers.optitrack_param_receiver.optitrack_packet_receiver import OptitrackPacketReceiver
import optirx_utils
import sys
minint = -sys.maxint - 1
maxint = sys.maxint
if __name__ == '__main__':
optritrack_packet_recv = OptitrackPacketReceiver('127.0.0.1') # adjust OptriTrack server IP if needed
optritrack_packet_recv.start()
while True:
packet = optritrack_packet_recv.get_last_packet()
#print packet
rh = optirx_utils.get_first_rigid_body(packet)
totalNumbers = {
"min": [minint, minint, minint, minint],
"max": [maxint, maxint, maxint, maxint],
}
if rh is not None:
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = rh.position
#rh_position = map(lambda coordinate: -1 * coordinate, rh.position)
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
currentNumbers = rh_position + [rh_roll]
for method in ["min", "max"]:
for idx, value in enumerate(currentNumbers, start=1):
totalNumbers[method][idx] = globals()[method](value, currentNumbers[idx])
print currentNumbers
print totalNumbers
time.sleep(0.05)
def clip_value(value, lower, upper):
return max(lower, min(upper, value))
def remap_range(value, range1_from, range1_to, range2_from=0, range2_to=1):
if range1_from >= range1_to or range2_from >= range2_to:
raise ValueError('One or more of the given ranges is either an empty range or out of order')
if value < range1_from or value > range1_to:
raise ValueError('Value {} is not in given range ({}, {})'.format(value, range1_from, range1_to))
return float(value - range1_from) / (range1_from - range1_to) * (range2_from - range2_to) + range2_from
def round_to_precision(number, precision):
return round(number * 1.0 / precision) / (1.0 / precision)
def dicts_have_same_keys(dict1, dict2):
return (set(dict1.keys()) - set(dict2.keys())) == set([])
def display_shutdown_prompt(shutdown_token='q'):
shutdown_msg = 'Enter "{}" to shutdown the system and end the session:'.format(shutdown_token)
print shutdown_msg
while raw_input().lower() != shutdown_token:
print 'Invalid input. ' + shutdown_msg
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment