Source code for CPAC.utils.create_flame_model_files

# Copyright (C) 2016-2024  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
from CPAC.utils.monitoring import WFLOGGER


[docs] def create_dummy_string(length): ppstring = "" for i in range(0, length): ppstring += "\t" + "%1.5e" % (1.0) ppstring += "\n" return ppstring
[docs] def write_mat_file( design_matrix, output_dir, model_name, depatsified_EV_names, current_output=None ): import os import numpy as np dimx = None dimy = None if len(design_matrix.shape) == 1: dimy = 1 dimx = design_matrix.shape[0] else: dimx, dimy = design_matrix.shape ppstring = "/PPheights" for i in range(0, dimy): ppstring += "\t" + "%1.5e" % (1.0) ppstring += "\n" filename = model_name + ".mat" out_file = os.path.join(output_dir, filename) if not os.path.exists(output_dir): os.makedirs(output_dir) with open(out_file, "wt") as f: print("/NumWaves\t%d" % dimy, file=f) print("/NumPoints\t%d" % dimx, file=f) print(ppstring, file=f) # print labels for the columns - mainly for double-checking your model col_string = "\n" for col in depatsified_EV_names: col_string = col_string + col + "\t" print(col_string, "\n", file=f) print("/Matrix", file=f) np.savetxt(f, design_matrix, fmt="%1.5e", delimiter="\t") return out_file
[docs] def create_grp_file(design_matrix, grp_file_vector, output_dir, model_name): import os import numpy as np dimx = None dimy = None if len(design_matrix.shape) == 1: dimy = 1 dimx = design_matrix.shape[0] else: dimx, dimy = design_matrix.shape grp_file_vector = [int(x) for x in grp_file_vector] out_file = os.path.join(output_dir, model_name + ".grp") with open(out_file, "wt") as f: print("/NumWaves\t1", file=f) print("/NumPoints\t%d\n" % dimx, file=f) print("/Matrix", file=f) np.savetxt(f, grp_file_vector, fmt="%d", delimiter="\t") return out_file
[docs] def create_con_file( con_vecs, con_names, col_names, model_name, current_output, out_dir ): import os out_file = os.path.join(out_dir, model_name) + ".con" with open(out_file, "w+") as f: # write header num = 1 for key in con_names: f.write("/ContrastName%s\t%s\n" % (num, key)) num += 1 f.write("/NumWaves\t%d\n" % len(col_names)) f.write("/NumContrasts\t%d\n" % len(con_names)) f.write("/PPheights%s" % create_dummy_string(len(con_vecs))) f.write("/RequiredEffect%s" % create_dummy_string(len(con_vecs))) f.write("\n\n") # print labels for the columns - mainly for double-checking your # model col_string = "\n" for col in col_names: col_string = col_string + col + "\t" print(col_string, "\n", file=f) # write data f.write("/Matrix\n") for vector in con_vecs: for v in vector: f.write("%1.5e\t" % v) f.write("\n") return out_file
[docs] def create_fts_file(ftest_list, con_names, model_name, current_output, out_dir): import os import numpy as np WFLOGGER.info("\nFound f-tests in your model, writing f-tests file (.fts)..\n") try: out_file = os.path.join(out_dir, model_name + ".fts") with open(out_file, "w") as f: print("/NumWaves\t", len(con_names), file=f) print("/NumContrasts\t", len(ftest_list), file=f) # process each f-test ftst = [] for ftest_string in ftest_list: ftest_vector = [] cons_in_ftest = ftest_string.split(",") for con in con_names: if con in cons_in_ftest: ftest_vector.append(1) else: ftest_vector.append(0) ftst.append(ftest_vector) fts_n = np.array(ftst) # print labels for the columns - mainly for double-checking your # model col_string = "\n" for con in con_names: col_string = col_string + con + "\t" print(col_string, "\n", file=f) print("/Matrix", file=f) for i in range(fts_n.shape[0]): print(" ".join(fts_n[i].astype("str")), file=f) except Exception as e: filepath = os.path.join( out_dir, "model_files", current_output, model_name + ".fts" ) errmsg = ( "\n\n[!] CPAC says: Could not create .fts file for " "FLAMEO or write it to disk.\nAttempted filepath: %s\n" "Error details: %s\n\n" % (filepath, e) ) raise Exception(errmsg) return out_file
[docs] def create_con_ftst_file( con_file, model_name, current_output, output_dir, column_names, coding_scheme, group_sep, ): """Create the contrasts and fts file.""" import os import numpy as np column_names = [x for x in list(column_names) if "participant_id" not in x] # Read the header of the contrasts file, which should contain the columns # of the design matrix and f-tests (if any) with open(con_file, "r") as f: evs = f.readline() evs = evs.rstrip("\r\n").split(",") if evs[0].strip().lower() != "contrasts": msg = "first cell in contrasts file should contain 'Contrasts'" raise ValueError(msg) # remove "Contrasts" label and replace it with "Intercept" # evs[0] = "Intercept" # Count the number of f tests defined count_ftests = len([ev for ev in evs if "f_test" in ev]) # Whether any f tests are defined fTest = count_ftests > 0 # Now read the actual contrasts try: contrasts_data = np.genfromtxt(con_file, names=True, delimiter=",", dtype=None) except: msg = f"Could not successfully read in contrast file: {con_file}" raise OSError(msg) lst = contrasts_data.tolist() # lst = list of rows of the contrast matrix (each row represents a # contrast, i.e. a name of the contrast, and then coefficients for each of # the design matrix columns, and finally coefficients for each of the # f tests specifying whether this contrast is part of that particular # f test). if isinstance(lst, tuple): lst = [lst] fts_columns = [] contrasts = [] contrast_names = [] length = len(list(lst[0])) for contr in lst: # tp = tuple in the format (contrast_name, 0, 0, 0, 0, ...) # with the zeroes being the vector of contrasts for that contrast # extract the name of the contrast contrast_names.append(contr[0]) # create a list of integers that is the vector for the contrast # ex. [0, 1, 1, 0, ..] con_vector = list(contr)[1 : (length - count_ftests)] # fts_vector tells us which f-tests this contrast is a part of. fts_vector = list(contr)[(length - count_ftests) : length] fts_columns.append(fts_vector) # add Intercept column if not group_sep: if False: # The following insertion gives an error further down the # line, because this suggests that there will be an intercept # column in the design matrix but such an intercept is never # actually added. if coding_scheme == "Treatment": con_vector.insert(0, 0) elif coding_scheme == "Sum": con_vector.insert(0, 1) contrasts.append(con_vector) # contrast_names = list of the names of the contrasts (not regressors) # contrasts = list of lists with the contrast vectors num_EVs_in_con_file = len(contrasts[0]) contrasts = np.array(contrasts, dtype=np.float16) fts_columns = np.array(fts_columns) # if there are f-tests, create the array for them if fTest: ## TODO: Probably it would be more accurate to check that each ## f test itself contains enough contrasts, rather than whether ## there are in principle enough contrasts to form f tests. if len(contrast_names) < 2: errmsg = ( "\n\n[!] CPAC says: Not enough contrasts for running " "f-tests.\nTip: Do you have only one contrast in your " "contrasts file? f-tests require more than one contrast.\n" "Either remove the f-tests or include more contrasts.\n\n" ) raise Exception(errmsg) fts_n = fts_columns.T if len(column_names) != (num_EVs_in_con_file): WFLOGGER.error( "\n\n[!] CPAC says: The number of EVs in your model design matrix (found" " in the %s.mat file) does not match the number of EVs (columns) in your" " custom contrasts matrix CSV file.\n\nCustom contrasts matrix file:" " %s\n\nNumber of EVs in design matrix: %d\nNumber of EVs in contrasts" " file: %d\n\nThe column labels in the design matrix should match those in" "your contrasts .CSV file.\nColumn labels in design matrix:\n%s", model_name, con_file, len(column_names), num_EVs_in_con_file, str(column_names), ) return None, None for design_mat_col, con_csv_col in zip(column_names, evs[1:]): if con_csv_col not in design_mat_col: WFLOGGER.error( "\n\n[!] CPAC says: The names of the EVs in your custom contrasts .csv" " file do not match the names or order of the EVs in the design" " matrix. Please make sure these are consistent.\nDesign matrix EV" " columns: %s\nYour contrasts matrix columns: %s\n\n", column_names, evs[1:], ) return None, None out_file = os.path.join(output_dir, model_name + ".con") with open(out_file, "wt") as f: idx = 1 pp_str = "/PPheights" re_str = "/RequiredEffect" for name in contrast_names: print("/ContrastName%d" % idx, "\t", name, file=f) pp_str += "\t%1.5e" % (1) re_str += "\t%1.5e" % (1) idx += 1 print("/NumWaves\t", (contrasts.shape)[1], file=f) print("/NumContrasts\t", (contrasts.shape)[0], file=f) print(pp_str, file=f) print(re_str + "\n", file=f) # print labels for the columns - mainly for double-checking your model col_string = "\n" for ev in evs: if "contrast" not in ev and "Contrast" not in ev: col_string = col_string + ev + "\t" print(col_string, "\n", file=f) print("/Matrix", file=f) np.savetxt(f, contrasts, fmt="%1.5e", delimiter="\t") ftest_out_file = None if fTest: WFLOGGER.info("\nFound f-tests in your model, writing f-tests file (.fts)..\n") ftest_out_file = os.path.join(output_dir, model_name + ".fts") with open(ftest_out_file, "wt") as f: print("/NumWaves\t", (contrasts.shape)[0], file=f) print("/NumContrasts\t", count_ftests, file=f) # print labels for the columns - mainly for double-checking your # model col_string = "\n" for con in contrast_names: col_string = col_string + con + "\t" print(col_string, "\n", file=f) print("/Matrix", file=f) for i in range(fts_n.shape[0]): print(" ".join(fts_n[i].astype("str")), file=f) return out_file, ftest_out_file
[docs] def create_flame_model_files( design_matrix, col_names, contrasts_vectors, contrast_names, custom_contrasts_csv, ftest_list, group_sep, grouping_vector, coding_scheme, model_name, output_measure, output_dir, ): if contrasts_vectors: con_file = create_con_file( contrasts_vectors, contrast_names, col_names, model_name, output_measure, output_dir, ) if len(ftest_list) > 0: fts_file = create_fts_file( ftest_list, contrast_names, model_name, output_measure, output_dir ) else: fts_file = None elif custom_contrasts_csv: con_file, fts_file = create_con_ftst_file( custom_contrasts_csv, model_name, output_measure, output_dir, col_names, coding_scheme, group_sep, ) if not con_file: # don't write out the rest of the files if this didn't work out return None, None, None, None mat_file = write_mat_file( design_matrix, output_dir, model_name, col_names, output_measure ) grp_file = create_grp_file(design_matrix, grouping_vector, output_dir, model_name) return mat_file, grp_file, con_file, fts_file