...
 
Commits (2)
venv/
.idea
*.pyc
.vscode
# Byte-compiled / optimized / DLL files
__pycache__/
......
"""
A utility to help determine the desired bounding box of the space in which the system operator will be working,
and the extreme angles of rotation that the operator hand can be held and at which it is still detected by the cameras.
This script is part of the calibration flow.
While it runs, it maps the room and calculates the boundaries of the 3D space that the Motive system can track.
It outputs the value ranges of each axis (mininum, maximum) in a format that can be directly copied
to config.py.
"""
from tabulate import tabulate
#from tabulate import tabulate
import time
import math
from rtscs_param_receivers.optitrack_param_receiver.optitrack_packet_receiver import OptitrackPacketReceiver
from utils import make_motive_sock, read_motive_packet
import optirx_utils
import sys
......@@ -22,10 +24,9 @@ totalNumbers = {
}
if __name__ == '__main__':
optritrack_packet_recv = OptitrackPacketReceiver('127.0.0.1') # adjust OptriTrack server IP if needed
optritrack_packet_recv.start()
sock = make_motive_sock()
while True:
packet = optritrack_packet_recv.get_last_packet()
packet = read_motive_packet(sock)
# print packet
rh = optirx_utils.get_first_rigid_body(packet)
......@@ -34,15 +35,18 @@ if __name__ == '__main__':
rh_position = list(rh.position)
rh_position[0] = -rh_position[0]
# rh_position = map(lambda coordinate: -1 * coordinate, rh.position)
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
rh_roll = math.degrees(
optirx_utils.orientation2radians(rh.orientation)[0])
currentNumbers = rh_position + [rh_roll]
for method in ["min", "max"]:
for idx, value in enumerate(currentNumbers, start=0):
totalNumbers[method][idx] = fns[method](totalNumbers[method][idx], value)
totalNumbers[method][idx] = fns[method](
totalNumbers[method][idx], value)
print '\n' * 20
print tabulate([totalNumbers["min"], totalNumbers["max"], currentNumbers], headers=["x", "y", "z", "roll"])
# print tabulate([totalNumbers["min"], totalNumbers["max"],
# currentNumbers], headers=["x", "y", "z", "roll"])
print """
AXIS_RANGES = {
'x': (%0.3f, %0.3f),
......@@ -50,6 +54,6 @@ AXIS_RANGES = {
'z': (%0.3f, %0.3f),
'roll': (-180, 180)
}""" % (totalNumbers["min"][0], totalNumbers["max"][0],
totalNumbers["min"][1], totalNumbers["max"][1],
totalNumbers["min"][2], totalNumbers["max"][2])
totalNumbers["min"][1], totalNumbers["max"][1],
totalNumbers["min"][2], totalNumbers["max"][2])
time.sleep(0.05)
motiveConfig = {
'ip': "127.0.0.1",
'multicast_address': '239.255.42.99',
'port': 1511,
'natnetsdk_version': (2, 9, 0, 0),
}
abletonMaxConfig = {
'ip': "127.0.0.1",
'port': 5000,
}
AXIS_RANGES = {
'x': (-0.941, 0.972),
'y': (-1.312, 0.768),
'z': (-1.356, 0.779),
'roll': (-180, 180)
}
import abc
class BaseRtscsParamReceiver(object):
"""
A base class which defines an interface for getting the system's internal params
from a human interface device (be it hardware or software).
Driving classes should be able to take the input from a human interface device, interpret it as they wish
and convert it to the defined internal system params as defined below in get_rtscs_params().
The type of the human interface device doesn't matter as long as input data can be collected from it,
it could be a camera, mouse, joystick or a GUI, or anything else.
Depending on the scenario, it might be advisable that the derived class would utilize a separate thread/process
to continuously collect the input data.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get_rtscs_params(self):
"""
Reads the input from the human interface device, converts it to internal rtscs_params and returns it.
The internal rtscs_params are as follows:
* frequency
* amplitude
* number of sine waves
:return: (frequency, amplitude, number of sine waves)
"""
return
@abc.abstractmethod
def shutdown_receiver(self):
"""
Free all threads / other resources used by param receiver.
If no such resources are used can be implemented as an empty method.
"""
return
import threading
#from eventlet.green import threading
import optirx as rx
DEFAULT_MULTICAST_ADDRESS = '239.255.42.99'
DEFAULT_DATA_PORT = 1511
DEFAULT_NATNETSDK_VERSION = (2, 9, 0, 0)
class OptitrackPacketReceiver(threading.Thread):
"""
Responsible for constantly receiving UDP packets form the OptiTrack system, parsing them to optirx objects,
and storing them locally to make them available when needed.
The OptitrackPacketReceiver is a separate thread, so it continues receiving UDP packets regardless of the
activity of the rest of the system.
"""
def __init__(self, server_ip_addr, natnetsdk_version=DEFAULT_NATNETSDK_VERSION,
multicast_addr=DEFAULT_MULTICAST_ADDRESS, data_port=DEFAULT_DATA_PORT):
super(OptitrackPacketReceiver, self).__init__()
self._natnetsdk_version = natnetsdk_version
self._last_packet = None
self._is_shutdown_request = False
# open UDP port to Optitrack server
self._dsock = rx.mkdatasock(ip_address=server_ip_addr, multicast_address=multicast_addr, port=data_port)
def run(self):
while not self._is_shutdown_requested():
# receive UDP data packet from Optitrack server
data = self._dsock.recv(rx.MAX_PACKETSIZE)
# parse packet and store it locally
self._last_packet = rx.unpack(data, self._natnetsdk_version)
# update natnet sdk version to what Optitrack software says it is
if type(self._last_packet) is rx.SenderData:
self._natnetsdk_version = self._last_packet.natnet_version
def get_last_packet(self):
"""
Returns the last packet recieved from the OptiTrack system as a optirx object
"""
last_packet = self._last_packet
return last_packet
def _is_shutdown_requested(self):
is_shutdown_request = self._is_shutdown_request
return is_shutdown_request
def shutdown(self):
self._is_shutdown_request = True
import time
import math
from rtscs_param_receivers.base_rtscs_param_receiver import BaseRtscsParamReceiver
from optitrack_packet_receiver import OptitrackPacketReceiver
from utils import clip_value, remap_range, round_to_precision, dicts_have_same_keys
import optirx_utils
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {
'x': (-0.9, 1.5),
'y': (-2.1, 1.2),
'z': (-0.1, 2),
'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
'frequency': (0, 700),
'sins': (0, 0.45),
'amplitude': (0.1, 1),
'numerical_hint': (0, 1)
}
class OptitrackParamReceiver(BaseRtscsParamReceiver):
"""
A concrete class derived form BaseRtscsParamReceiver.
When requested, takes the last packet received from the OptiTrack server and transforms the
data it carries regrading objects in space into rtscs params.
Please consult the RTSMCS setup guide to get a better understanding of how to use OptitrackParamReceiver.
For the right hand "rigid body", the transformation is as follows:
- X coordinate is remapped to frequency
- Y coordinate is remapped to Number of sine waves
- Z coordinate is remapped to Amplitude
"""
def __init__(self, server_ip_address,
optitrx_valid_ranges_dict=DEFAULT_OPTITRACK_RANGES_DICT,
rtscs_valid_ranges_dict=DEFAULT_RTSCS_PARAM_RANGES_DICT,
note_duration_precision=DEFAULT_NOTE_DURATION_PRECISION):
"""
:param server_ip_address: IP address of the OptiTrack server
:param optitrx_valid_ranges_dict: A dict with ranges of the bounding box of the space in which
the system operator will be working, and the extreme angles of rotation that
the operator's hand can be held and at which it is still
detected by the cameras.
The dict is to be structured as AXIS_RANGES.
Bounds can be measured using "measurement_util.py" in setup_utils
:param rtscs_valid_ranges_dict: A dict with desired ranges of rtscs params.
The dict is to be structured as MAP_RANGES.
:param note_duration_precision: duration_factor precision to be rounded to
"""
self._optirx_valid_ranges_dict = optitrx_valid_ranges_dict
self._rtscs_valid_ranges_dict = rtscs_valid_ranges_dict
self._validate_user_dict_keys()
self._note_duration_precision = note_duration_precision
# Thread that continuously collects data from OptiTrack streaming engine
self._optirx_packet_receiver = OptitrackPacketReceiver(server_ip_address)
self._optirx_packet_receiver.start()
def _validate_user_dict_keys(self):
"""
Check that the users ranges dict are structured correctly.
"""
if not dicts_have_same_keys(self._optirx_valid_ranges_dict, DEFAULT_OPTITRACK_RANGES_DICT):
raise ValueError('The keys of optirx_valid_ranges_dict are invalid')
if not dicts_have_same_keys(self._rtscs_valid_ranges_dict, DEFAULT_RTSCS_PARAM_RANGES_DICT):
raise ValueError('The keys of _rtscs_valid_ranges_dict are invalid')
def transform_params(self, rh_position, rh_roll, lh_position=None):
"""
Transforms OptiTrack params into rtscs params
:param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
:param rh_roll: roll angle in degrees of the right hand "rigid body"
:param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
:return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
"""
is_sustain = False # TODO: derive is_sustain from distance between rh_position and lh_position
transformed_params = rh_position + [rh_roll, is_sustain]
# Remap rh position onto rtscs internal ranges
# x -> frequency
# y -> number of waves which define the complex wave.
# z -> amplitude.
for i, mapped_pair in enumerate((('x', 'frequency'), ('y', 'sins'),
('z', 'amplitude'), ('rh_roll', 'numerical_hint'))):
source_low = self._optirx_valid_ranges_dict[mapped_pair[0]][0]
source_high = self._optirx_valid_ranges_dict[mapped_pair[0]][1]
destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]
# clip Optritrack param to the limits of its defined valid range
transformed_params[i] = clip_value(transformed_params[i],
source_low, source_high)
# translate Optritrack param to rtscs param
transformed_params[i] = remap_range(transformed_params[i],
source_low, source_high,
destination_low, destination_high)
# convert frequency and number of sins to int
for i in range(1):
transformed_params[i] = int(round(transformed_params[i]))
# round amplitude factor to precision
transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)
return transformed_params
def get_rtscs_params_body(self, rh):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
# print rh_position
# print rh_roll; return
transformed_params = self.transform_params(rh_position, rh_roll)
# print transformed_params
return transformed_params
def get_rtscs_params(self):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
packet = self._optirx_packet_receiver.get_last_packet()
rh = optirx_utils.get_first_rigid_body(packet)
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
# print rh_position
# print rh_roll; return
transformed_params = self.transform_params(rh_position, rh_roll)
# print transformed_params
return transformed_params
def shutdown_receiver(self):
self._optirx_packet_receiver.shutdown()
if __name__ == '__main__':
optirxrec = OptitrackParamReceiver('127.0.0.1')
while True:
time.sleep(0.2)
optirxrec.get_rtscs_params()
\ No newline at end of file
......@@ -3,29 +3,19 @@ import optirx_utils
import eventlet
import math
from oscpy.client import OSCClient
from utils import clip_value, remap_range
from utils import clip_value, remap_range, make_motive_sock, read_motive_packet
from config import motiveConfig, abletonMaxConfig, AXIS_RANGES
import json
eventlet.monkey_patch(all=True)
UDP_IP = "127.0.0.1"
UDP_PORT = 5000
max_udp_client = OSCClient(UDP_IP, UDP_PORT)
max_udp_client = OSCClient(abletonMaxConfig['ip'], abletonMaxConfig['port'])
def send_udp(message):
max_udp_client.send_message("/", message)
AXIS_RANGES = {
'x': (-0.941, 0.972),
'y': (-1.312, 0.768),
'z': (-1.356, 0.779),
'roll': (-180, 180)
}
def axis_length(axis):
values = AXIS_RANGES[axis]
return abs(values[1] - values[0])
......@@ -66,8 +56,8 @@ def remap_and_clip(source_low, source_high, destination_low, destination_high, v
value = clip_value(value, source_low, source_high)
# translate Optritrack param to rtscs param
value = remap_range(value,
source_low, source_high,
destination_low, destination_high)
source_low, source_high,
destination_low, destination_high)
return value
......@@ -95,7 +85,8 @@ def get_rtscs_params_body(rh):
return 0, 0, 0, 0
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# map(lambda coordinate: -1 * coordinate, rh.position)
rh_position = list(rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
......@@ -107,21 +98,14 @@ def get_rtscs_params_body(rh):
udp_msg_to_send = None
xAxisLength = axis_length("x")
zAxisMidpoint = axis_midpoint("z") + axis_length("z")/4
def get_optirx_data():
dsock = rx.mkdatasock(ip_address="127.0.0.1", multicast_address='239.255.42.99', port=1511)
natnetsdk_version = (2, 9, 0, 0)
sock = make_motive_sock()
while True:
data = dsock.recv(rx.MAX_PACKETSIZE)
# parse packet and store it locally
packet = rx.unpack(data, natnetsdk_version)
if type(packet) is rx.SenderData:
natnetsdk_version = packet.natnet_version
packet = read_motive_packet(sock)
bodies = optirx_utils.get_all_rigid_bodies(packet)
if len(bodies) == 0:
continue
......@@ -139,7 +123,7 @@ def get_optirx_data():
'drumVolume': remap_value_of_axis_range("y", 0.0, 1.0, params1[1]),
'groupCVolume': remap_value_of_axis_range("y", 0.0, 1.0, params2[1]),
'tempo': int(remap_and_clip(-xAxisLength, xAxisLength, 60, 132, xDistance)),
'berlin': 0 if berlinPos < zAxisMidpoint else int(remap_value_of_axis_range("z", 0, 50, params1[2]))
'berlin': 0 if berlinPos < zAxisMidpoint else int(remap_and_clip(zAxisMidpoint, AXIS_RANGES['z'][1], 0, 50, params1[2]))
})]
global udp_msg_to_send
......@@ -161,4 +145,4 @@ eventlet.spawn(send_udp_interval)
t_data = eventlet.spawn(get_optirx_data)
t_data.wait()
\ No newline at end of file
t_data.wait()
import optirx as rx
import optirx_utils
import eventlet
import socketio
eventlet.monkey_patch()
from utils import clip_value, remap_range, round_to_precision
import math
sio = socketio.Server(cors_allowed_origins=None)
app = socketio.WSGIApp(sio)
@sio.on('connect')
def connect(sid, environ):
print('connect ', sid)
#eventlet.spawn(deb, sio)
#sio.emit("rh", [1, 2, 3])
@sio.on('my message')
def message(sid, data):
print('message ', data)
@sio.on('disconnect')
def disconnect(sid):
print('disconnect ', sid)
def start_sio():
eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {
'x': (-0.7, 0.5),
'y': (-0.7, 0.55),
'z': (-0.4, 1.0),
'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
#'frequency': (0, 700),
#'sins': (0, 0.45),
'frequency': (0, 7),
'sins': (0, 7),
'amplitude': (5, 200),
'numerical_hint': (0, 1)
}
def transform_params(rh_position, rh_roll, lh_position=None):
"""
Transforms OptiTrack params into rtscs params
:param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
:param rh_roll: roll angle in degrees of the right hand "rigid body"
:param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
:return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
"""
is_sustain = False # TODO: derive is_sustain from distance between rh_position and lh_position
transformed_params = rh_position + [rh_roll, is_sustain]
# Remap rh position onto rtscs internal ranges
# x -> frequency
# y -> number of waves which define the complex wave.
# z -> amplitude.
for i, mapped_pair in enumerate((('x', 'frequency'), ('y', 'sins'),
('z', 'amplitude'), ('rh_roll', 'numerical_hint'))):
source_low = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][0]
source_high = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][1]
destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]
# clip Optritrack param to the limits of its defined valid range
transformed_params[i] = clip_value(transformed_params[i],
source_low, source_high)
# translate Optritrack param to rtscs param
transformed_params[i] = remap_range(transformed_params[i],
source_low, source_high,
destination_low, destination_high)
# convert frequency and number of sins to int
for i in range(1):
transformed_params[i] = int(round(transformed_params[i]))
# round amplitude factor to precision
transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)
return transformed_params
def get_rtscs_params_body(rh):
"""
Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
"""
if rh is None or rh.mrk_mean_error == 0:
return 0, 0, 1, 0, False
# mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
rh_position = list(rh.position) # map(lambda coordinate: -1 * coordinate, rh.position)
# Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
rh_position[0] = -rh_position[0]
# For convenience convert roll angle from radians to degrees
rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])
return transform_params(rh_position, rh_roll)
def get_optirx_data():
dsock = rx.mkdatasock(ip_address="127.0.0.1", multicast_address='239.255.42.99', port=1511)
natnetsdk_version = (2, 8, 0, 0)
while True:
data = dsock.recv(rx.MAX_PACKETSIZE)
# parse packet and store it locally
packet = rx.unpack(data, natnetsdk_version)
if type(packet) is rx.SenderData:
natnetsdk_version = packet.natnet_version
rh = optirx_utils.get_first_rigid_body(packet)
if rh is not None:
params = get_rtscs_params_body(rh)
print(params)
sio.emit("rh", params)
eventlet.spawn(start_sio)
t_data = eventlet.spawn(get_optirx_data)
t_data.wait()
\ No newline at end of file
from config import motiveConfig
import optirx as rx
def make_motive_sock():
return rx.mkdatasock(ip_address=motiveConfig['ip'],
multicast_address=motiveConfig['multicast_address'], port=motiveConfig['port'])
def read_motive_packet(socket):
data = socket.recv(rx.MAX_PACKETSIZE)
# parse packet and store it locally
packet = rx.unpack(data, motiveConfig['natnetsdk_version'])
if type(packet) is rx.SenderData:
motiveConfig['natnetsdk_version'] = packet.natnet_version
return packet
def clip_value(value, lower, upper):
return max(lower, min(upper, value))
def remap_range(value, range1_from, range1_to, range2_from=0, range2_to=1):
if range1_from >= range1_to or range2_from >= range2_to:
raise ValueError('One or more of the given ranges is either an empty range or out of order')
raise ValueError(
'One or more of the given ranges is either an empty range or out of order')
if value < range1_from or value > range1_to:
raise ValueError('Value {} is not in given range ({}, {})'.format(value, range1_from, range1_to))
raise ValueError('Value {} is not in given range ({}, {})'.format(
value, range1_from, range1_to))
return float(value - range1_from) / (range1_from - range1_to) * (range2_from - range2_to) + range2_from
......@@ -21,7 +42,8 @@ def dicts_have_same_keys(dict1, dict2):
def display_shutdown_prompt(shutdown_token='q'):
shutdown_msg = 'Enter "{}" to shutdown the system and end the session:'.format(shutdown_token)
shutdown_msg = 'Enter "{}" to shutdown the system and end the session:'.format(
shutdown_token)
print shutdown_msg
while raw_input().lower() != shutdown_token:
print 'Invalid input. ' + shutdown_msg