-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_deeprf_empirical_batch.py
122 lines (98 loc) · 4.73 KB
/
test_deeprf_empirical_batch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Jordy Thielen ([email protected])
Script to test DeepRF on empirical data using batches and a GPU.
"""
import os
import numpy as np
from scipy.stats import pearsonr
import time
import stimulus as stim
import data_synthetic as ds
import data_empirical as de
import deeprf
#os.environ['KMP_DUPLICATE_LIB_OK']='True'
def run(root="/project/2420084.01", subject=0, cat_dim="time", n_layers=50):
# Load stimuli
stimuli = stim.BensonStimulus(os.path.join(root, "resources"))
stimuli.resample_stimulus(0.5)
# Initialize data generators
train_dataset = de.BensonDataset(stimuli, root, subject, "train", cat_dim)
valid_dataset = de.BensonDataset(stimuli, root, subject, "test", cat_dim)
n_voxels = len(train_dataset)
train_generator = ds.SyntheticDataGenerator(stimuli, n_voxels, (1, 30, 30), "train", cat_dim)
valid_generator = ds.SyntheticDataGenerator(stimuli, n_voxels, (1, 40, 40), "test", cat_dim)
# Set up model
if cat_dim == "time":
num_in_channels = 1
else:
num_in_channels = 6
if n_layers == 50:
model = deeprf.get_resnet50(num_in_channels=num_in_channels)
elif n_layers == 34:
model = deeprf.get_resnet34(num_in_channels=num_in_channels)
elif n_layers == 18:
model = deeprf.get_resnet18(num_in_channels=num_in_channels)
else:
raise Exception("Unknown n_layers:", n_layers)
# Set up tester
tester = deeprf.Tester(model, path=os.path.join(root, "derivatives"), suffix=cat_dim)
tester.load_model()
# Set bounds
h_bound = (-2, 2)
s_bound = (1/stimuli.pixperdeg, stimuli.radius_deg)
x_bound = (-stimuli.radius_deg, stimuli.radius_deg)
y_bound = (-stimuli.radius_deg, stimuli.radius_deg)
# Pre-allocate memory for results
y_train = np.zeros((n_voxels, 4), dtype="float32") # predicted parameters train (and used for valid)
r_train = np.zeros((n_voxels,), dtype="float32") # squared Pearson's correlation between predicted train data and observed train data
r_valid = np.zeros((n_voxels,), dtype="float32") # squared Pearson's correlation between predicted valid data and observed valid data
ctime = np.zeros((n_voxels,), dtype="float32") # time it took to train the model
# Get train data
X = train_dataset.data.transpose((2, 0, 1))
# Apply the model
start_time = time.time()
y_train = tester.test_model_batch(X)
ctime[:] = (time.time() - start_time) / n_voxels
# Loop individual voxels
for i in range(n_voxels):
# Post-process outputs
y_train[i, 0] = np.min([h_bound[1], np.max([h_bound[0], y_train[i, 0]])])
y_train[i, 1] = np.min([s_bound[1], np.max([s_bound[0], y_train[i, 1]])])
y_train[i, 2] = np.min([x_bound[1], np.max([x_bound[0], y_train[i, 2]])])
y_train[i, 3] = np.min([y_bound[1], np.max([y_bound[0], y_train[i, 3]])])
# Generate prediction of train data
y = train_generator.generate_prediction(*y_train[i, :])
# Compute explained variance of train data (might be overfitted)
try:
r_train[i] = pearsonr(X[i, :, :].flatten(), y.flatten())[0]**2
except:
print("warning: r could not be estimated with parameters: ", y_train[i, :])
r_train[i] = 0
# Generate valid data
x = valid_dataset.get_item(i)
# Generate prediction of the valid data using trained parameters
y = valid_generator.generate_prediction(*y_train[i, :])
# Compute explained variance of valid data
try:
r_valid[i] = pearsonr(x.flatten(), y.flatten())[0]**2
except:
print("warning: r could not be estimated with parameters: ", y_train[i, :])
r_valid[i] = 0
# Print some statistics
print("subject: {} ({:d}) voxel {:5d}/{} ctime: {:6.3f} ({:6.3f}) train: {:.2f} ({:.2f}) valid: {:.2f} ({:.2f})".format(
train_dataset.subject, subject, 1+i, n_voxels, ctime[i], np.mean(ctime[:1+i]),
r_train[i], np.mean(r_train[:1+i]), r_valid[i], np.mean(r_valid[:1+i])))
# Make output folder if it does not already exist
if not os.path.exists(os.path.join(root, "derivatives", model.name+"_"+cat_dim)):
os.makedirs(os.path.join(root, "derivatives", model.name+"_"+cat_dim))
# Save results
np.savez(os.path.join(root, "derivatives", model.name+"_"+cat_dim, "results_empirical_{}_batch.npz".format(train_dataset.subject)),
y_train=y_train, r_train=r_train, r_valid=r_valid, ctime=ctime)
if __name__ == "__main__":
root = "/project/2420084.01"
#root = "/Users/jordythielen/2420084.01"
n_subjects = 29
for i_subject in range(n_subjects):
run(root=root, subject=i_subject, cat_dim="chan", n_layers=50)