server_udp.py 4.37 KB
Newer Older
Kamal Nasser's avatar
Kamal Nasser committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import optirx as rx
import optirx_utils
import eventlet
import math
from oscpy.client import OSCClient
from utils import clip_value, remap_range, round_to_precision
import json

eventlet.monkey_patch()

UDP_IP = "192.168.43.186"
UDP_PORT = 5000

max_udp_client = OSCClient(UDP_IP, UDP_PORT)
def send_udp(message):
    max_udp_client.send_message("/", message)

DEFAULT_NOTE_DURATION_PRECISION = 0.125
# rh roll implemented for future use
DEFAULT_OPTITRACK_RANGES_DICT = {
    'x': (-0.88, 0.19),
    'y': (-0.03, 0.49),
    'z': (0.07, 1.3),
    'rh_roll': (-180, 180)
}
DEFAULT_RTSCS_PARAM_RANGES_DICT = {
    'tempo': (75, 170),
    'senda': (0, 100),
    'mastervol': (-55, 0),
    'numerical_hint': (0, 1)
}

def transform_params(rh_position, rh_roll, lh_position=None):
    """
    Transforms OptiTrack params into rtscs params
    :param rh_position: a (x, y, z) tuple holding the position of the right hand "rigid body"
    :param rh_roll: roll angle in degrees of the right hand "rigid body"
    :param lh_position: a (x, y, z) tuple holding the position of the left hand "rigid body"
    :return: rtscs params tuple as defined in BaseRtscsParamReceiver.get_rtscs_params()
    """
    is_sustain = False  # TODO: derive is_sustain from distance between rh_position and lh_position
    transformed_params = rh_position + [rh_roll, is_sustain]

    # Remap rh position onto rtscs internal ranges
    # x -> frequency
    # y -> number of waves which define the complex wave.
    # z -> amplitude.

    for i, mapped_pair in enumerate((('x', 'tempo'), ('y', 'senda'),
                                     ('z', 'mastervol'), ('rh_roll', 'numerical_hint'))):
        source_low = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][0]
        source_high = DEFAULT_OPTITRACK_RANGES_DICT[mapped_pair[0]][1]
        destination_low = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][0]
        destination_high = DEFAULT_RTSCS_PARAM_RANGES_DICT[mapped_pair[1]][1]

        # clip Optritrack param to the limits of its defined valid range
        transformed_params[i] = clip_value(transformed_params[i],
                                           source_low, source_high)
        # translate Optritrack param to rtscs param
        transformed_params[i] = remap_range(transformed_params[i],
                                            source_low, source_high,
                                            destination_low, destination_high)

    # convert frequency and number of sins to int
    for i in range(1):
        transformed_params[i] = int(round(transformed_params[i]))

    # round amplitude factor to precision
    transformed_params[2] = round_to_precision(transformed_params[2], DEFAULT_NOTE_DURATION_PRECISION)

    return transformed_params

def get_rtscs_params_body(rh):
    """
    Get input from the OptiTrack system streaming engine, transform the data to rtscs_params and return it.
    """
    if rh is None or rh.mrk_mean_error == 0:
        return 0, 0, 1, 0, False

    # mul by -1 to fix flipped coordinates if not fully compatible Motive calibration square
    rh_position = list(rh.position)  # map(lambda coordinate: -1 * coordinate, rh.position)
    # Convert right hand convention to left hand convention for Motive 1.73+ used with CS-200
    rh_position[0] = -rh_position[0]
    # For convenience convert roll angle from radians to degrees
    rh_roll = math.degrees(optirx_utils.orientation2radians(rh.orientation)[0])

    return transform_params(rh_position, rh_roll)

def get_optirx_data():
    dsock = rx.mkdatasock(ip_address="127.0.0.1", multicast_address='239.255.42.99', port=1511)
    natnetsdk_version = (2, 8, 0, 0)
    while True:
        data = dsock.recv(rx.MAX_PACKETSIZE)
        # parse packet and store it locally
        packet = rx.unpack(data, natnetsdk_version)
        if type(packet) is rx.SenderData:
            natnetsdk_version = packet.natnet_version
        rh = optirx_utils.get_first_rigid_body(packet)

        if rh is not None:
            print(rh.position)
            params = get_rtscs_params_body(rh)
            if params != (0, 0, 1, 0, False):
                print(params)
                msg = [json.dumps({
                    'tempo': params[0],
                    'senda': params[1],
                    'mastervol': params[2],
                })]
                send_udp(msg)
                print(msg)
                eventlet.sleep(0.1)

t_data = eventlet.spawn(get_optirx_data)
t_data.wait()