Skip to content

iSense

iSense

Bases: Thread

Source code in src\eConEXG\iSense\device.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class iSense(Thread):
    class Dev(Enum):
        SIGNAL = 10  # self.Dev.SIGNAL transmision mode
        SIGNAL_START = 11
        IMPEDANCE = 20  # self.Dev.IMPEDANCE transmision mode
        IMPEDANCE_START = 21
        IDLE = 30  # self.Dev.IDLE mode
        IDLE_START = 31
        TERMINATE = 40  # Init state
        TERMINATE_START = 41

    def __init__(self, fs: int):
        from .data_parser import Parser
        from .dev_socket import iSenseUSB

        print("initing iSense")
        super().__init__(daemon=True)
        if fs not in self.get_available_frequency():
            raise ValueError(
                "Frequency is unsupported. Available frequencies: 250, 500, 1000, 2000, 4000, 8000, 16000"
            )
        self.fs = fs
        self.__socket_flag = Queue()
        self.__save_data = Queue()
        self.__batt = 0
        self.__status = self.Dev.TERMINATE
        try:
            self.__parser = Parser(fs=self.fs)
            self.__dev = iSenseUSB(self.fs, self.__parser.pkt_size)
            self.__dev.connect_socket()
            self.__dev.stop_recv()
            self.__socket_flag.put("Connected")
            self.__status = self.Dev.IDLE_START
            self.start()
        except Exception as e:
            traceback.print_exc()
            self.__socket_flag.put(f"Error: {e}")
            return

    @staticmethod
    def get_available_frequency() -> list:
        """Get available sample frequencies of iSense devices.
        Returns:
            Available sample frequencies in Hz.
        """
        return [250, 500, 1000, 2000, 4000, 8000]

    def start_acquisition_data(self) -> None:
        """
        Send data acquisition command to device, block until data acquisition started or failed.
        """
        if self.__status == self.Dev.TERMINATE:
            return  # TODO: add raise exception
        if self.__status == self.Dev.SIGNAL:
            return
        if self.__status == self.Dev.IMPEDANCE:
            self.stop_acquisition()
        self.__status = self.Dev.SIGNAL_START
        while self.__status not in [self.Dev.SIGNAL, self.Dev.TERMINATE]:
            time.sleep(0.01)

    def get_data(self, timeout: Optional[float] = 0.01) -> list[Optional[list]]:
        """
        Acquire amplifier data, make sure this function is called in a loop so that it can continuously read the data.

        Args:
            timeout: it blocks at most `timeout` seconds and return, otherwise it returns until new data is available.

        Returns:
            A list of frames, each frame is a list contains all wanted eeg channels and triggerbox channel,
                eeg channels can be updatd by `update_channels()`.

        Data Unit:
            - eeg: microvolts (µV)
            - triggerbox: int, from `0` to `255`

        Raises:
            Exception: if device not connected or in data acquisition mode.
        """
        # if self.__status != self.Dev.SIGNAL:
        #     raise Exception("Data acquisition not started, please start first.")
        try:
            data: list = self.__save_data.get(timeout=timeout)
        except queue.Empty:
            return []
        while not self.__save_data.empty():
            data.extend(self.__save_data.get())
        return data

    def stop_acquisition(self) -> None:
        """
        Stop data or self.Dev.IMPEDANCE acquisition, block until data acquisition stopped or failed.
        """
        if self.__status in [self.Dev.IDLE, self.Dev.TERMINATE]:
            return
        self.__status = self.Dev.IDLE_START
        while self.__status not in [self.Dev.IDLE, self.Dev.TERMINATE]:
            time.sleep(0.01)

    def start_acquisition_impedance(self) -> None:
        """
        Send self.Dev.IMPEDANCE acquisition command to device, block until data acquisition started or failed.
        """
        if self.__status == self.Dev.TERMINATE:
            return  # TODO: add raise exception
        if self.__status == self.Dev.IMPEDANCE:
            return None
        if self.__status == self.Dev.SIGNAL:
            self.stop_acquisition()
        self.__status = self.Dev.IMPEDANCE_START
        while self.__status not in [self.Dev.IMPEDANCE, self.Dev.TERMINATE]:
            time.sleep(0.01)
        if self.__status != self.Dev.IMPEDANCE:
            return  # TODO: add raise exception
        return None

    def get_impedance(self) -> Optional[list]:
        """
        Acquire channel impedances, return immediatly, self.Dev.IMPEDANCE update interval is about 2000ms.

        Returns:
            A list of channel self.Dev.IMPEDANCE ranging from `0` to `math.nan` if available, oterwise None.

        Data Unit:
            - self.Dev.IMPEDANCE: ohm (Ω)
        """
        return self.__parser.impedance

    def close_dev(self) -> None:
        """
        Close device connection and release resources.
        """
        if self.__status not in [self.Dev.TERMINATE]:
            # ensure socket is closed correctly
            self.__status = self.Dev.TERMINATE_START
            self.join()

    def get_battery_value(self) -> int:
        """
        Query battery level.

        Returns:
            battery level in percentage, range from `0` to `100`.
        """
        if (self.__parser.batt_val >= 0) and (self.__parser.batt_val <= 100):
            self.__batt = self.__parser.batt_val
        return self.__batt

    def open_lsl_stream(self, chs_info: dict[int, str]):
        """
        Open LSL stream, can be invoked after `start_acquisition_data()`,
            each frame is the same as described in `get_data()`.

        Args:
            chs_info: Label the information of channels in LSL Stream

        Raises:
            Exception: if data acquisition not started or LSL stream already opened.
            LSLException: if LSL stream creation failed.
            importError: if `pylsl` not installed or liblsl not installed on unix like system.
        """
        if self.__status != iSense.Dev.SIGNAL:
            raise Exception("Data acquisition not started, please start first.")
        if hasattr(self, "_lsl_stream"):
            raise Exception("LSL stream already opened.")
        from ..utils.lslWrapper import lslSender

        self.chs_index = [i for i in chs_info.keys()] + [136]
        self._lsl_stream = lslSender(
            chs_info,
            "iSense",
            "BioSignal",
            self.fs,
            with_trigger=True,
        )

    def close_lsl_stream(self):
        """
        Close LSL stream manually, invoked automatically after `stop_acquisition()` or `close_dev()`
        """
        if hasattr(self, "_lsl_stream"):
            del self._lsl_stream
            del self.chs_index

    def get_dev_flag(self) -> Optional[str]:
        """
        Query device status

        Returns:
            A list of strings.
            Possible results: Connected, Connected lost, Error, Initialization failed...
        """
        try:
            return self.__socket_flag.get_nowait()
        except queue.Empty:
            return

    def run(self):
        while self.__status not in [self.Dev.TERMINATE_START]:
            if self.__status == self.Dev.SIGNAL_START:
                self.__recv_data(imp_mode=False)
            elif self.__status == self.Dev.IMPEDANCE_START:
                self.__recv_data(imp_mode=True)
            elif self.__status in [self.Dev.IDLE_START]:
                self.__idle_state()
            else:
                print(f"Unknown status: {self.__status}")
                break
        try:
            self.__dev.close_socket()
        except Exception:
            pass
        self.__status = self.Dev.TERMINATE
        print("iSense disconnected")

    def __recv_data(self, imp_mode=True):
        self.__parser.imp_flag = imp_mode
        try:
            if self.__parser.imp_flag:
                self.__dev.start_impe()
                self.__status = self.Dev.IMPEDANCE
            else:
                self.__dev.start_data()
                self.__status = self.Dev.SIGNAL
        except Exception as e:
            self.__socket_flag.put(f"Data/IMPEDANCE initialization failed: {e}")
            self.__status = self.Dev.TERMINATE_START

        try:
            while self.__status in [self.Dev.SIGNAL, self.Dev.IMPEDANCE]:
                data = self.__dev.recv_socket()
                if not data:
                    raise Exception("Remote end closed.")
                ret = self.__parser.parse_data(data)
                if ret:
                    self.__save_data.put(ret)
                    if hasattr(self, "_lsl_stream"):
                        ret = np.array(ret)
                        self._lsl_stream.push_chunk(ret[:, self.chs_index].tolist())
        except Exception as e:
            traceback.print_exc()
            self.__socket_flag.put(f"Transmission error: {e}")
            self.__status = self.Dev.TERMINATE_START

        try:
            self.__dev.stop_recv()
        except Exception as e:
            if self.__status == self.Dev.IDLE_START:
                traceback.print_exc()
                self.__socket_flag.put(f"IDLE initialization failed: {e}")
            self.__status = self.Dev.TERMINATE_START

        self.__parser.clear_buffer()
        self.__save_data.put(None)
        while self.__save_data.get() is not None:
            continue
        print(f"iSense data thread closed. {datetime.now()}")

    def __idle_state(self):
        timestamp = time.time()
        self.__status = self.Dev.IDLE
        while self.__status in [self.Dev.IDLE]:
            if (time.time() - timestamp) < 10:
                time.sleep(0.2)  # to reduce cpu usage
                continue
            try:  # heartbeat to keep socket alive and update battery level
                self.__dev.stop_recv()
                timestamp = time.time()
                # print("Ah, ah, ah, ah\nStayin' alive, stayin' alive")
            except Exception:
                traceback.print_exc()
                self.__socket_flag.put("Connection Lost!")
                self.__status = self.Dev.TERMINATE_START

close_dev()

Close device connection and release resources.

Source code in src\eConEXG\iSense\device.py
140
141
142
143
144
145
146
147
def close_dev(self) -> None:
    """
    Close device connection and release resources.
    """
    if self.__status not in [self.Dev.TERMINATE]:
        # ensure socket is closed correctly
        self.__status = self.Dev.TERMINATE_START
        self.join()

close_lsl_stream()

Close LSL stream manually, invoked automatically after stop_acquisition() or close_dev()

Source code in src\eConEXG\iSense\device.py
188
189
190
191
192
193
194
def close_lsl_stream(self):
    """
    Close LSL stream manually, invoked automatically after `stop_acquisition()` or `close_dev()`
    """
    if hasattr(self, "_lsl_stream"):
        del self._lsl_stream
        del self.chs_index

get_available_frequency() staticmethod

Get available sample frequencies of iSense devices. Returns: Available sample frequencies in Hz.

Source code in src\eConEXG\iSense\device.py
51
52
53
54
55
56
57
@staticmethod
def get_available_frequency() -> list:
    """Get available sample frequencies of iSense devices.
    Returns:
        Available sample frequencies in Hz.
    """
    return [250, 500, 1000, 2000, 4000, 8000]

get_battery_value()

Query battery level.

Returns:

Type Description
int

battery level in percentage, range from 0 to 100.

Source code in src\eConEXG\iSense\device.py
149
150
151
152
153
154
155
156
157
158
def get_battery_value(self) -> int:
    """
    Query battery level.

    Returns:
        battery level in percentage, range from `0` to `100`.
    """
    if (self.__parser.batt_val >= 0) and (self.__parser.batt_val <= 100):
        self.__batt = self.__parser.batt_val
    return self.__batt

get_data(timeout=0.01)

Acquire amplifier data, make sure this function is called in a loop so that it can continuously read the data.

Parameters:

Name Type Description Default
timeout Optional[float]

it blocks at most timeout seconds and return, otherwise it returns until new data is available.

0.01

Returns:

Type Description
list[Optional[list]]

A list of frames, each frame is a list contains all wanted eeg channels and triggerbox channel, eeg channels can be updatd by update_channels().

Data Unit
  • eeg: microvolts (µV)
  • triggerbox: int, from 0 to 255

Raises:

Type Description
Exception

if device not connected or in data acquisition mode.

Source code in src\eConEXG\iSense\device.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get_data(self, timeout: Optional[float] = 0.01) -> list[Optional[list]]:
    """
    Acquire amplifier data, make sure this function is called in a loop so that it can continuously read the data.

    Args:
        timeout: it blocks at most `timeout` seconds and return, otherwise it returns until new data is available.

    Returns:
        A list of frames, each frame is a list contains all wanted eeg channels and triggerbox channel,
            eeg channels can be updatd by `update_channels()`.

    Data Unit:
        - eeg: microvolts (µV)
        - triggerbox: int, from `0` to `255`

    Raises:
        Exception: if device not connected or in data acquisition mode.
    """
    # if self.__status != self.Dev.SIGNAL:
    #     raise Exception("Data acquisition not started, please start first.")
    try:
        data: list = self.__save_data.get(timeout=timeout)
    except queue.Empty:
        return []
    while not self.__save_data.empty():
        data.extend(self.__save_data.get())
    return data

get_dev_flag()

Query device status

Returns:

Type Description
Optional[str]

A list of strings.

Optional[str]

Possible results: Connected, Connected lost, Error, Initialization failed...

Source code in src\eConEXG\iSense\device.py
196
197
198
199
200
201
202
203
204
205
206
207
def get_dev_flag(self) -> Optional[str]:
    """
    Query device status

    Returns:
        A list of strings.
        Possible results: Connected, Connected lost, Error, Initialization failed...
    """
    try:
        return self.__socket_flag.get_nowait()
    except queue.Empty:
        return

get_impedance()

Acquire channel impedances, return immediatly, self.Dev.IMPEDANCE update interval is about 2000ms.

Returns:

Type Description
Optional[list]

A list of channel self.Dev.IMPEDANCE ranging from 0 to math.nan if available, oterwise None.

Data Unit
  • self.Dev.IMPEDANCE: ohm (Ω)
Source code in src\eConEXG\iSense\device.py
128
129
130
131
132
133
134
135
136
137
138
def get_impedance(self) -> Optional[list]:
    """
    Acquire channel impedances, return immediatly, self.Dev.IMPEDANCE update interval is about 2000ms.

    Returns:
        A list of channel self.Dev.IMPEDANCE ranging from `0` to `math.nan` if available, oterwise None.

    Data Unit:
        - self.Dev.IMPEDANCE: ohm (Ω)
    """
    return self.__parser.impedance

open_lsl_stream(chs_info)

Open LSL stream, can be invoked after start_acquisition_data(), each frame is the same as described in get_data().

Parameters:

Name Type Description Default
chs_info dict[int, str]

Label the information of channels in LSL Stream

required

Raises:

Type Description
Exception

if data acquisition not started or LSL stream already opened.

LSLException

if LSL stream creation failed.

importError

if pylsl not installed or liblsl not installed on unix like system.

Source code in src\eConEXG\iSense\device.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def open_lsl_stream(self, chs_info: dict[int, str]):
    """
    Open LSL stream, can be invoked after `start_acquisition_data()`,
        each frame is the same as described in `get_data()`.

    Args:
        chs_info: Label the information of channels in LSL Stream

    Raises:
        Exception: if data acquisition not started or LSL stream already opened.
        LSLException: if LSL stream creation failed.
        importError: if `pylsl` not installed or liblsl not installed on unix like system.
    """
    if self.__status != iSense.Dev.SIGNAL:
        raise Exception("Data acquisition not started, please start first.")
    if hasattr(self, "_lsl_stream"):
        raise Exception("LSL stream already opened.")
    from ..utils.lslWrapper import lslSender

    self.chs_index = [i for i in chs_info.keys()] + [136]
    self._lsl_stream = lslSender(
        chs_info,
        "iSense",
        "BioSignal",
        self.fs,
        with_trigger=True,
    )

start_acquisition_data()

Send data acquisition command to device, block until data acquisition started or failed.

Source code in src\eConEXG\iSense\device.py
59
60
61
62
63
64
65
66
67
68
69
70
71
def start_acquisition_data(self) -> None:
    """
    Send data acquisition command to device, block until data acquisition started or failed.
    """
    if self.__status == self.Dev.TERMINATE:
        return  # TODO: add raise exception
    if self.__status == self.Dev.SIGNAL:
        return
    if self.__status == self.Dev.IMPEDANCE:
        self.stop_acquisition()
    self.__status = self.Dev.SIGNAL_START
    while self.__status not in [self.Dev.SIGNAL, self.Dev.TERMINATE]:
        time.sleep(0.01)

start_acquisition_impedance()

Send self.Dev.IMPEDANCE acquisition command to device, block until data acquisition started or failed.

Source code in src\eConEXG\iSense\device.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def start_acquisition_impedance(self) -> None:
    """
    Send self.Dev.IMPEDANCE acquisition command to device, block until data acquisition started or failed.
    """
    if self.__status == self.Dev.TERMINATE:
        return  # TODO: add raise exception
    if self.__status == self.Dev.IMPEDANCE:
        return None
    if self.__status == self.Dev.SIGNAL:
        self.stop_acquisition()
    self.__status = self.Dev.IMPEDANCE_START
    while self.__status not in [self.Dev.IMPEDANCE, self.Dev.TERMINATE]:
        time.sleep(0.01)
    if self.__status != self.Dev.IMPEDANCE:
        return  # TODO: add raise exception
    return None

stop_acquisition()

Stop data or self.Dev.IMPEDANCE acquisition, block until data acquisition stopped or failed.

Source code in src\eConEXG\iSense\device.py
101
102
103
104
105
106
107
108
109
def stop_acquisition(self) -> None:
    """
    Stop data or self.Dev.IMPEDANCE acquisition, block until data acquisition stopped or failed.
    """
    if self.__status in [self.Dev.IDLE, self.Dev.TERMINATE]:
        return
    self.__status = self.Dev.IDLE_START
    while self.__status not in [self.Dev.IDLE, self.Dev.TERMINATE]:
        time.sleep(0.01)