-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
115 lines (95 loc) · 4.93 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import mne
import argparse
import time
import logging
import random
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui, QtCore
import brainflow
from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds, BrainFlowError
from brainflow.data_filter import DataFilter, FilterTypes, AggOperations, WindowFunctions, DetrendOperations
class Graph:
def __init__(self, board_shim):
self.board_id = board_shim.get_board_id()
self.board_shim = board_shim
self.exg_channels = BoardShim.get_exg_channels(self.board_id)
self.sampling_rate = BoardShim.get_sampling_rate(self.board_id)
self.window_size = 4
self.num_points = self.window_size * self.sampling_rate
self.update_speed_ms = 50
self.app = QtGui.QApplication([])
timer = QtCore.QTimer()
timer.timeout.connect(self.update)
timer.start(self.update_speed_ms)
QtGui.QApplication.instance().exec_()
params = BrainFlowInputParams()
board = BoardShim(BoardIds.CROWN_BOARD.value, params)
data = board.get_board_data()
eeg_channels = BoardShim.get_eeg_channels(BoardIds.CROWN_BOARD.value)
eeg_data = data[eeg_channels, :]
eeg_data = eeg_data / 1000000 # BrainFlow returns uV, convert to V for MNE
# Creating MNE objects from brainflow data arrays
ch_types = ['eeg'] * len(eeg_channels)
ch_names = BoardShim.get_eeg_names(BoardIds.CROWN_BOARD.value)
sfreq = BoardShim.get_sampling_rate(BoardIds.CROWN_BOARD.value)
info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types)
raw = mne.io.RawArray(eeg_data, info)
def update(self):
data = self.board_shim.get_current_board_data(self.num_points)
avg_bands = [0, 0, 0, 0, 0]
for count, channel in enumerate(self.exg_channels):
# plot timeseries
DataFilter.detrend(data[channel], DetrendOperations.CONSTANT.value)
DataFilter.perform_bandpass(data[channel], self.sampling_rate, 51.0, 100.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
DataFilter.perform_bandpass(data[channel], self.sampling_rate, 51.0, 100.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
DataFilter.perform_bandstop(data[channel], self.sampling_rate, 50.0, 4.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
DataFilter.perform_bandstop(data[channel], self.sampling_rate, 60.0, 4.0, 2,
FilterTypes.BUTTERWORTH.value, 0)
self.app.processEvents()
def main():
BoardShim.enable_dev_board_logger()
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
# use docs to check which parameters are required for specific board, e.g. for Cyton - set serial port
parser.add_argument('--timeout', type=int, help='timeout for device discovery or connection', required=False,
default=0)
parser.add_argument('--ip-port', type=int, help='ip port', required=False, default=0)
parser.add_argument('--ip-protocol', type=int, help='ip protocol, check IpProtocolType enum', required=False,
default=0)
parser.add_argument('--ip-address', type=str, help='ip address', required=False, default='')
parser.add_argument('--serial-port', type=str, help='serial port', required=False, default='')
parser.add_argument('--mac-address', type=str, help='mac address', required=False, default='')
parser.add_argument('--other-info', type=str, help='other info', required=False, default='')
parser.add_argument('--streamer-params', type=str, help='streamer params', required=False, default='')
parser.add_argument('--serial-number', type=str, help='serial number', required=False, default='')
parser.add_argument('--board-id', type=int, help='board id, check docs to get a list of supported boards',
required=False, default=BoardIds.CROWN_BOARD)
parser.add_argument('--file', type=str, help='file', required=False, default='')
args = parser.parse_args()
params = BrainFlowInputParams()
params.ip_port = args.ip_port
params.serial_port = args.serial_port
params.mac_address = args.mac_address
params.other_info = args.other_info
params.serial_number = args.serial_number
params.ip_address = args.ip_address
params.ip_protocol = args.ip_protocol
params.timeout = args.timeout
params.file = args.file
try:
board_shim = BoardShim(args.board_id, params)
board_shim.prepare_session()
board_shim.start_stream()
g = Graph(board_shim)
except BaseException as e:
logging.warning('Exception', exc_info=True)
finally:
logging.info('End')
if board_shim.is_prepared():
logging.info('Releasing session')
board_shim.release_session()
if __name__ == '__main__':
main()