# Copyright (C) 2012-2024 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""General-purpose utilities for C-PAC."""
import collections.abc
from copy import deepcopy
import fnmatch
import gzip
from itertools import repeat
import json
import numbers
import os
import pickle
from typing import Any, Literal, Optional, overload
import numpy as np
from voluptuous.error import Invalid
import yaml
from CPAC.utils.configuration import Configuration
from CPAC.utils.docs import deprecated
from CPAC.utils.interfaces.function import Function
from CPAC.utils.monitoring import FMLOGGER, WFLOGGER
CONFIGS_DIR = os.path.abspath(
os.path.join(__file__, *repeat(os.path.pardir, 2), "resources/configs/")
)
with open(
os.path.join(CONFIGS_DIR, "1.7-1.8-nesting-mappings.yml"), "r", encoding="utf-8"
) as _f:
NESTED_CONFIG_MAPPING = yaml.safe_load(_f)
with open(
os.path.join(CONFIGS_DIR, "1.7-1.8-deprecations.yml"), "r", encoding="utf-8"
) as _f:
NESTED_CONFIG_DEPRECATIONS = yaml.safe_load(_f)
PE_DIRECTION = Literal["i", "i-", "j", "j-", "k", "k-", ""]
VALID_PATTERNS = [
"alt+z",
"altplus",
"alt+z2",
"alt-z",
"altminus",
"alt-z2",
"seq+z",
"seqplus",
"seq-z",
"seqminus",
]
YAML_BOOLS = {
True: ("on", "t", "true", "y", "yes"),
False: ("f", "false", "n", "no", "off"),
}
[docs]
def get_last_prov_entry(prov):
"""Get the last provenance entry."""
while not isinstance(prov[-1], str):
prov = prov[-1]
return prov[-1]
def _get_flag(in_flag):
return in_flag
[docs]
def get_flag_wf(wf_name="get_flag"):
"""Create a workflow to get a flag."""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
wf = pe.Workflow(name=wf_name)
input_node = pe.Node(util.IdentityInterface(fields=["in_flag"]), name="inputspec")
get_flag = pe.Node(
Function(input_names=["in_flag"], function=_get_flag), name="get_flag"
)
wf.connect(input_node, "in_flag", get_flag, "in_flag")
[docs]
def read_json(json_file):
"""Read a JSON file and return the contents as a dictionary."""
try:
with open(json_file, "r") as f:
json_dct = json.load(f)
except json.decoder.JSONDecodeError as err:
msg = f"\n\n{err}\n\nJSON file: {json_file}\n"
raise Exception(msg)
return json_dct
[docs]
def create_id_string(
cfg,
unique_id,
resource,
scan_id=None,
template_desc=None,
atlas_id=None,
fwhm=None,
subdir=None,
extension=None,
):
"""Create the unique key-value identifier string for BIDS-Derivatives file names.
This is used in the file renaming performed during the Datasink
connections.
Example
-------
>>> from CPAC.utils.configuration import Configuration
>>> create_id_string(Configuration(), 'sub-1_ses-1',
... 'res-derivative_desc-Mean-1_timeseries',
... scan_id='rest', atlas_id='Yeo_desc-7')
'sub-1_ses-1_task-rest_atlas-Yeo7_res-3mm_desc-Mean1_timeseries'
"""
import re
from CPAC.utils.bids_utils import combine_multiple_entity_instances, res_in_filename
if atlas_id:
if "_desc-" in atlas_id:
atlas, desc = atlas_id.split("_desc-")
if not re.match(r".*[0-9]$", atlas) and re.match(r"[a-z].*", desc):
atlas_id = f"{atlas}{desc[0].upper()}{desc[1:]}"
else:
atlas_id = atlas_id.replace("_desc-", "")
resource = f"atlas-{atlas_id}_{resource}"
part_id = unique_id.split("_")[0]
ses_id = unique_id.split("_")[1]
if "sub-" not in part_id:
part_id = f"sub-{part_id}"
if "ses-" not in ses_id:
ses_id = f"ses-{ses_id}"
if scan_id:
out_filename = f"{part_id}_{ses_id}_task-{scan_id}_{resource}"
else:
out_filename = f"{part_id}_{ses_id}_{resource}"
template_tag = template_desc.split(" -")[0] if template_desc else "*"
for prefix in ["space-", "from-", "to-"]:
for bidstag in out_filename.split("_"):
if prefix in bidstag and "template" in bidstag:
out_filename = out_filename.replace(bidstag, f"{prefix}{template_tag}")
if fwhm:
for tag in resource.split("_"):
if "desc-" in tag and "-sm" in tag:
newtag = tag.replace("-sm", f"-sm{fwhm}")
out_filename = out_filename.replace(tag, newtag)
break
else:
msg = "\n[!] FWHM provided but no desc-sm?\n"
raise Exception(msg)
if extension is not None:
out_filename = out_filename + "." + str(extension)
# drop space- entities from from native-space filenames
if subdir == "anat":
out_filename = out_filename.replace("_space-T1w_", "_")
if subdir == "func":
out_filename = out_filename.replace("_space-bold_", "_")
return combine_multiple_entity_instances(res_in_filename(cfg, out_filename))
[docs]
def write_output_json(json_data, filename, indent=3, basedir=None):
"""Write a dictionary to a JSON file."""
if not basedir:
basedir = os.getcwd()
if ".gii" in filename:
filename = os.path.splitext(filename)[0]
filename = f"{filename}.json"
if ".json" not in filename:
filename = f"{filename}.json"
json_file = os.path.join(basedir, filename)
json_data = json.dumps(json_data, indent=indent, sort_keys=True)
with open(json_file, "wt") as f:
f.write(json_data)
return json_file
[docs]
def get_zscore(map_node=False, wf_name="z_score"):
"""
Workflow to calculate z-scores.
Parameters
----------
wf_name : string
name of the workflow
Returns
-------
wf : workflow object
Notes
-----
`Source <https://github.com/FCP-INDI/C-PAC/blob/main/CPAC/network_centrality/z_score.py>`_
Workflow Inputs::
inputspec.input_file : string
path to input functional derivative file for which z score has to be calculated
inputspec.mask_file : string
path to whole brain functional mask file required to calculate zscore
Workflow Outputs::
outputspec.z_score_img : string
path to image containing Normalized Input Image Z scores across full brain.
.. exec::
from CPAC.utils import get_zscore
wf = get_zscore('mean')
wf.write_graph(
graph2use='orig',
dotfilename='./images/generated/zscore.dot'
)
High Level Workflow Graph:
.. image:: ../../images/generated/zscore.png
:width: 500
Detailed Workflow Graph:
.. image:: ../../images/generated/zscore_detailed.png
:width: 500
Example
-------
>>> wf = get_zscore('example_input')
>>> wf.inputs.inputspec.input_file = '/home/data/graph_working_dir/calculate_centrality/degree_centrality_binarize.nii.gz'
>>> wf.inputs.inputspec.mask_file = '/home/data/graphs/GraphGeneration/new_mask_3m.nii.gz'
>>> wf.run() # doctest: +SKIP
""" # pylint: disable=line-too-long
# pylint: disable=import-outside-toplevel,redefined-outer-name,reimported
from nipype.interfaces import fsl
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
wflow = pe.Workflow(name=wf_name)
inputNode = pe.Node(
util.IdentityInterface(fields=["input_file", "mask_file"]), name="inputspec"
)
outputNode = pe.Node(
util.IdentityInterface(fields=["z_score_img"]), name="outputspec"
)
if map_node:
mean = pe.MapNode(
interface=fsl.ImageStats(), name="mean", iterfield=["in_file"]
)
standard_deviation = pe.MapNode(
interface=fsl.ImageStats(), name="standard_deviation", iterfield=["in_file"]
)
op_string = pe.MapNode(
Function(
input_names=["mean", "std_dev"],
output_names=["op_string"],
function=get_operand_string,
),
name="op_string",
iterfield=["mean", "std_dev"],
)
z_score = pe.MapNode(
interface=fsl.MultiImageMaths(),
name="z_score",
iterfield=["in_file", "op_string"],
)
else:
mean = pe.Node(interface=fsl.ImageStats(), name="mean")
standard_deviation = pe.Node(
interface=fsl.ImageStats(), name="standard_deviation"
)
op_string = pe.Node(
Function(
input_names=["mean", "std_dev"],
output_names=["op_string"],
function=get_operand_string,
),
name="op_string",
)
z_score = pe.Node(interface=fsl.MultiImageMaths(), name="z_score")
# calculate the mean
mean.inputs.op_string = "-k %s -m"
wflow.connect(inputNode, "input_file", mean, "in_file")
wflow.connect(inputNode, "mask_file", mean, "mask_file")
# calculate the standard deviation
standard_deviation.inputs.op_string = "-k %s -s"
wflow.connect(inputNode, "input_file", standard_deviation, "in_file")
wflow.connect(inputNode, "mask_file", standard_deviation, "mask_file")
# calculate the z-score
wflow.connect(mean, "out_stat", op_string, "mean")
wflow.connect(standard_deviation, "out_stat", op_string, "std_dev")
# z_score.inputs.out_file = input_name + '_zstd.nii.gz'
wflow.connect(op_string, "op_string", z_score, "op_string")
wflow.connect(inputNode, "input_file", z_score, "in_file")
wflow.connect(inputNode, "mask_file", z_score, "operand_files")
wflow.connect(z_score, "out_file", outputNode, "z_score_img")
return wflow
[docs]
def get_fisher_zscore(input_name, map_node=False, wf_name="fisher_z_score"):
"""Run the compute_fisher_z_score function as part of a one-node workflow."""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
wflow = pe.Workflow(name=wf_name)
inputNode = pe.Node(
util.IdentityInterface(fields=["correlation_file", "timeseries_one_d"]),
name="inputspec",
)
outputNode = pe.Node(
util.IdentityInterface(fields=["fisher_z_score_img"]), name="outputspec"
)
if map_node:
# node to separate out
fisher_z_score = pe.MapNode(
Function(
input_names=["correlation_file", "timeseries_one_d", "input_name"],
output_names=["out_file"],
function=compute_fisher_z_score,
),
name="fisher_z_score",
iterfield=["correlation_file"],
)
else:
fisher_z_score = pe.Node(
Function(
input_names=["correlation_file", "timeseries_one_d", "input_name"],
output_names=["out_file"],
function=compute_fisher_z_score,
),
name="fisher_z_score",
)
fisher_z_score.inputs.input_name = input_name
wflow.connect(inputNode, "correlation_file", fisher_z_score, "correlation_file")
wflow.connect(inputNode, "timeseries_one_d", fisher_z_score, "timeseries_one_d")
wflow.connect(fisher_z_score, "out_file", outputNode, "fisher_z_score_img")
return wflow
[docs]
def compute_fisher_z_score(correlation_file, timeseries_one_d, input_name):
"""Compute the fisher z transform of the input correlation map.
If the correlation map contains data for multiple ROIs then
return z score for each ROI as a seperate NIfTI file.
Parameters
----------
correlation_file : string
Input correlations file
Returns
-------
out_file : list (nifti files)
list of z_scores for mask or ROI
"""
import os
import numpy as np
import nibabel as nib
# get the specific roi number
filename = correlation_file.split("/")[-1]
filename = filename.replace(".nii", "")
if ".gz" in filename:
filename = filename.replace(".gz", "")
corr_img = nib.load(correlation_file)
corr_data = corr_img.get_fdata()
hdr = corr_img.header
# calculate the Fisher r-to-z transformation
corr_data = np.log((1 + corr_data) / (1 - corr_data)) / 2.0
z_score_img = nib.Nifti1Image(corr_data, header=hdr, affine=corr_img.affine)
out_file = os.path.join(os.getcwd(), filename + "_fisher_zstd.nii.gz")
z_score_img.to_filename(out_file)
return out_file
[docs]
class ScanParameters:
"""A dictionary of scan parameters and access methods."""
def __init__(self, scan_parameters: str | dict, subject_id: str, scan: str):
"""Initialize ScanParameters dict and metadata."""
self.subject = subject_id
self.scan = scan
if ".json" in scan_parameters:
if not os.path.exists(scan_parameters):
err = (
"\n[!] WARNING: Scan parameters JSON file listed in your data"
f" configuration file does not exist:\n{scan_parameters}"
)
raise FileNotFoundError(err)
with open(scan_parameters, "r") as f:
self.params: dict = json.load(f)
elif isinstance(scan_parameters, dict):
self.params = scan_parameters
else:
err = (
"\n\n[!] Could not read the format of the scan parameters "
"information included in the data configuration file for "
f"the participant {self.subject}.\n\n"
)
raise OSError(err)
[docs]
def check(self, val_to_check: str, throw_exception: bool):
"""Check that a value is populated for a given key in a parameters dictionary."""
if val_to_check not in self.params:
if throw_exception:
msg = f"Missing Value for {val_to_check} for participant {self.subject}"
raise ValueError(msg)
return None
if isinstance(self.params[val_to_check], dict):
ret_val = self.params[val_to_check][self.scan]
else:
ret_val = self.params[val_to_check]
if ret_val == "None":
if throw_exception:
msg = (
f"'None' parameter value for {val_to_check} for"
f" participant {self.subject}."
)
raise ValueError(msg)
ret_val = None
if ret_val == "" and throw_exception:
msg = f"Missing value for {val_to_check} for participant {self.subject}."
raise ValueError(msg)
return ret_val
@overload
def fetch(
self,
keys: Optional[list[str]] = None,
*,
match_case: Literal[False],
throw_exception: bool,
) -> Any: ...
@overload
def fetch(
self,
keys: Optional[list[str]] = None,
*,
match_case: Literal[True],
throw_exception: bool,
) -> tuple[Any, tuple[str, str]]: ...
[docs]
def fetch(self, keys, *, match_case=False, throw_exception=True):
"""Fetch the first found parameter from a scan params dictionary.
Returns
-------
value
The value of the parameter.
keys, optional
The matched keys (only if ``match_case is True``)
throw_exception
Raise an exception if value is ``""`` or ``None``?
"""
if match_case:
keys = {key.lower(): key for key in keys}
scan_param_keys = {key.lower(): key for key in self.params.keys()}
scan_parameters = {key.lower(): value for key, value in self.params.items()}
else:
scan_parameters = self.params
for key in keys:
if key in scan_parameters:
if match_case:
return self.check(key, throw_exception), (
keys[key],
scan_param_keys[key],
)
return self.check(key, throw_exception)
msg = f"None of {keys} found in {list(scan_parameters.keys())}."
raise KeyError(msg)
[docs]
def fetch_and_convert(
self,
keys: list[str],
convert_to: Optional[type] = None,
fallback: Optional[Any] = None,
warn_typeerror: bool = True,
throw_exception: bool = False,
) -> Any:
"""Fetch a parameter from a scan params dictionary and convert it to a given type.
Catch TypeError exceptions and return a fallback value in those cases.
Parameters
----------
keys
if multiple keys provided, the value corresponding to the first found will be
returned
convert_to
the type to return if possible
fallback
a value to return if the keys are not found in ``scan_parameters``
warn_typeerror
log a warning if value cannot be converted to ``convert_to`` type?
throw_exception
raise an error for empty string or NoneTypes?
Returns
-------
value
The gathered parameter coerced to the specified type, if possible.
``fallback`` otherwise.
"""
value: Any = fallback
fallback_message = f"Falling back to {fallback} ({type(fallback)})."
try:
raw_value = self.fetch(keys, throw_exception=throw_exception)
except KeyError:
try:
raw_value, matched_keys = self.fetch(
keys, match_case=True, throw_exception=throw_exception
)
except KeyError:
WFLOGGER.warning(
f"None of {keys} found in {list(self.params.keys())}. "
f"{fallback_message}"
)
return fallback
WFLOGGER.warning(
f"None exact match found. Using case-insenitive match: '{matched_keys[0]}'"
f" ≅ '{matched_keys[1]}'."
)
if convert_to:
try:
value = convert_to(raw_value)
except (TypeError, ValueError):
if warn_typeerror:
WFLOGGER.warning(
f"Could not convert {value} to {convert_to}. {fallback_message}"
)
return value
[docs]
def get_operand_string(mean, std_dev):
"""Get operand string for fslmaths.
Parameters
----------
mean : string
path to img containing mean
std_dev : string
path to img containing standard deviation
Returns
-------
op_string : string
operand string
"""
str1 = "-sub %f -div %f" % (float(mean), float(std_dev))
return str1 + " -mas %s"
[docs]
def safe_shape(*vol_data):
"""Check if the volume of multiple ndarrays are the same shape.
The volume is encoded in the first three dimensions of the ndarray.
Parameters
----------
vol_data0, vol_data1, ..., vol_datan : ndarray
Volumes to check
Returns
-------
same_volume : bool
True only if all volumes have the same shape.
"""
same_volume = True
first_vol_shape = vol_data[0].shape[:3]
for vol in vol_data[1:]:
same_volume &= first_vol_shape == vol.shape[:3]
return same_volume
[docs]
def zscore(data, axis):
"""Calculate the z-score of a dataset along a given axis."""
data = data.copy()
data -= data.mean(axis=axis, keepdims=True)
data /= data.std(axis=axis, keepdims=True)
np.copyto(data, 0.0, where=np.isnan(data))
return data
[docs]
def correlation(matrix1, matrix2, match_rows=False, z_scored=False, symmetric=False):
"""Calcluate the correlation between two matrices."""
d1 = matrix1.shape[-1]
d2 = matrix2.shape[-1]
assert d1 == d2
assert matrix1.ndim <= 2 # noqa: PLR2004
assert matrix2.ndim <= 2 # noqa: PLR2004
if match_rows:
assert matrix1.shape == matrix2.shape
var = np.sqrt(d1 * d2)
if not z_scored:
matrix1 = zscore(matrix1, matrix1.ndim - 1)
matrix2 = zscore(matrix2, matrix2.ndim - 1)
if match_rows:
return np.einsum("...i,...i", matrix1, matrix2) / var
if matrix1.ndim >= matrix2.ndim:
r = np.dot(matrix1, matrix2.T) / var
else:
r = np.dot(matrix2, matrix1.T) / var
r = np.clip(r, -1.0, 1.0)
if symmetric:
return (r + r.T) / 2
return r
[docs]
def check_random_state(seed):
"""
Turn seed into a np.random.RandomState instance.
Code from scikit-learn (https://github.com/scikit-learn/scikit-learn)
Parameters
----------
seed : None | int | instance of RandomState
If seed is None, return the RandomState singleton used by np.random.
If seed is an int, return a new RandomState instance seeded with seed.
If seed is already a RandomState instance, return it.
Otherwise raise ValueError.
"""
if seed is None or seed is np.random:
return np.random.mtrand._rand
if isinstance(seed, (numbers.Integral, np.integer)):
return np.random.RandomState(seed)
if isinstance(seed, np.random.RandomState):
return seed
raise ValueError(
"%r cannot be used to seed a numpy.random.RandomState" " instance" % seed
)
[docs]
@Function.sig_imports(
[
"import json",
"import os",
"from typing import Literal, Optional",
"from CPAC.utils.utils import ScanParameters, PE_DIRECTION, VALID_PATTERNS",
]
)
def get_scan_params(
subject_id: str,
scan: str,
pipeconfig_start_indx: Optional[int | str],
pipeconfig_stop_indx: Optional[int | str],
data_config_scan_params: Optional[dict | str] = None,
) -> tuple[
Optional[str],
Optional[str],
Optional[int],
Optional[int],
Optional[int],
Optional[int],
PE_DIRECTION,
Optional[float],
]:
"""Extract slice timing correction parameters and scan parameters.
Parameters
----------
subject_id
subject id
scan
scan id
pipeconfig_start_indx
starting volume index as provided in the pipeline config yaml file
pipeconfig_stop_indx
ending volume index as provided in the pipeline config yaml file
data_config_scan_params
file path to scan parameter JSON file listed in data config yaml file or loaded
paramater dictionary
Returns
-------
tr
TR value
tpattern
slice aquisition pattern string or file path
ref_slice
index of reference slice which is used to allign all other slices
first_tr
index of starting TR or starting volume index
last_tr
index of ending TR or ending volume index
pe_direction
https://bids-specification.readthedocs.io/en/stable/glossary.html#phaseencodingdirection-metadata
effective_echo_spacing
https://bids-specification.readthedocs.io/en/stable/glossary.html#effectiveechospacing-metadata
"""
unit: Literal["ms", "s"] = "s"
if isinstance(pipeconfig_stop_indx, str):
if "End" in pipeconfig_stop_indx or "end" in pipeconfig_stop_indx:
pipeconfig_stop_indx = None
params = ScanParameters(data_config_scan_params, subject_id, scan)
# TODO: better handling of errant key values!!!
# TODO: use schema validator to deal with it
# get details from the configuration
tr: float | Literal[""] = params.fetch_and_convert(
["RepetitionTime", "TR"], float, ""
)
template: Optional[str] = params.fetch_and_convert(["Template", "template"], str)
pattern: Optional[str] = params.fetch_and_convert(
["acquisition", "SliceTiming", "SliceAcquisitionOrder"],
str,
None,
)
ref_slice: Optional[int | str] = params.fetch_and_convert(["reference"], int, None)
first_tr: Optional[int | str] = params.fetch_and_convert(
["first_TR"], int, pipeconfig_start_indx, False
)
last_tr: Optional[int | str] = params.fetch_and_convert(
["last_TR"], int, pipeconfig_stop_indx, False
)
pe_direction: PE_DIRECTION = params.fetch_and_convert(
["PhaseEncodingDirection"], str, ""
)
effective_echo_spacing: Optional[float] = params.fetch_and_convert(
["EffectiveEchoSpacing"], float
)
"""
if not pattern:
if pipeconfig_tpattern:
if "Use NIFTI Header" in pipeconfig_tpattern:
pattern = ''
else:
pattern = pipeconfig_tpattern
"""
# pattern can be one of a few keywords, a filename, or blank which
# indicates that the images header information should be used
tpattern_file = None
if pattern and pattern != "" and pattern not in VALID_PATTERNS:
if isinstance(pattern, list) or (
"[" in pattern and "]" in pattern and "," in pattern
):
# if we got the slice timing as a list, from a BIDS-format scan
# parameters JSON file
if not isinstance(pattern, list):
pattern = pattern.replace("[", "").replace("]", "").split(",")
slice_timings = [float(x) for x in pattern]
# write out a tpattern file for AFNI 3dTShift
tpattern_file = os.path.join(os.getcwd(), "tpattern.txt")
try:
with open(tpattern_file, "wt") as f:
for time in slice_timings:
f.write(f"{time}\n".replace(" ", ""))
except (OSError, TypeError) as e:
err = (
"\n[!] Could not write the slice timing file meant as "
"an input for AFNI 3dTshift (slice timing correction):"
f"\n{tpattern_file}\n\n"
)
raise OSError(err) from e
elif ".txt" in pattern and not os.path.exists(pattern):
# if the user provided an acquisition pattern text file for
# 3dTshift
msg = (
f"Invalid Pattern file path {pattern}, Please provide "
"the correct path"
)
raise Exception(msg)
elif ".txt" in pattern:
with open(pattern, "r") as f:
lines = f.readlines()
if len(lines) < 2: # noqa: PLR2004
msg = (
"Invalid slice timing file format. The file should contain only one"
" value per row. Use new line char as delimiter"
)
raise Exception(msg)
tpattern_file = pattern
slice_timings = [float(l.rstrip("\r\n")) for l in lines] # noqa: E741
else:
# this only happens if there is a non-path string set in the data
# config dictionary for acquisition pattern (like "alt+z"), except
# the pattern is not listed in that list
err = (
"\n[!] The slice timing acquisition pattern provided is "
"not supported by AFNI 3dTshift:\n"
f"{pattern!s}\n"
)
raise Exception(err)
pattern = tpattern_file
slice_timings.sort()
max_slice_offset = slice_timings[-1]
# checking if the unit of tr and slice timing match or not
# if slice timing in ms convert tr to ms as well
if tr and max_slice_offset > tr:
WFLOGGER.warning(
"TR is in seconds and slice timings are in "
"milliseconds. Converting TR into milliseconds"
)
tr = tr * 1000
WFLOGGER.info("New tr value %s ms", tr)
unit = "ms"
elif tr and tr > 10: # noqa: PLR2004
# check to see, if TR is in milliseconds, convert it into seconds
WFLOGGER.warning("TR is in milliseconds, Converting it into seconds")
tr = tr / 1000.0
WFLOGGER.info("New TR value %s s", tr)
unit = "s"
# swap back in
tr = f"{tr!s}{unit}" if tr else ""
tpattern = pattern
start_indx = first_tr
stop_indx = last_tr
return (
tr if tr else None,
tpattern if tpattern else None,
template if template else None,
ref_slice,
start_indx,
stop_indx,
pe_direction,
effective_echo_spacing,
)
[docs]
def add_afni_prefix(tpattern):
"""Add '@' prefix to tpattern.txt filename."""
if ".txt" in tpattern:
tpattern = f"@{tpattern}"
return tpattern
[docs]
def write_to_log(workflow, log_dir, index, inputs, scan_id):
"""Write into log file the status of the workflow run."""
import datetime
import os
import time
from CPAC import __version__
version = __version__
subject_id = os.path.basename(log_dir)
if scan_id is None:
scan_id = "scan_anat"
strategy = ""
ts = time.time()
stamp = datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
try:
if workflow != "DONE":
wf_path = os.path.dirname((os.getcwd()).split(workflow)[1]).strip("/")
if wf_path and wf_path != "":
if "/" in wf_path:
scan_id, strategy = wf_path.split("/", 1)
scan_id = scan_id.strip("_")
strategy = strategy.replace("/", "")
else:
scan_id = wf_path.strip("_")
file_path = os.path.join(log_dir, scan_id, workflow)
try:
os.makedirs(file_path)
except Exception:
FMLOGGER.error(
"filepath already exist, filepath- %s, curr_dir - %s",
file_path,
os.getcwd(),
)
else:
file_path = os.path.join(log_dir, scan_id)
except Exception as e:
msg = "ERROR in write log"
raise OSError(msg) from e
try:
os.makedirs(file_path)
except Exception:
FMLOGGER.error(
"filepath already exist, filepath: %s, curr_dir: %s", file_path, os.getcwd()
)
out_file = os.path.join(file_path, f"log_{strategy}.yml")
WFLOGGER.info("CPAC custom log:")
if isinstance(inputs, list):
inputs = inputs[0]
if os.path.exists(inputs):
status_msg = "wf_status: DONE"
WFLOGGER.info(
"version: %s, "
"timestamp: %s, "
"subject_id: %s, "
"scan_id: %s, "
"strategy: %s, "
"workflow: %s, "
"status: COMPLETED",
version,
stamp,
subject_id,
scan_id,
strategy,
workflow,
)
else:
status_msg = "wf_status: ERROR"
WFLOGGER.error(
"version: %s, "
"timestamp: %s, "
"subject_id: %s, "
"scan_id: %s, "
"strategy: %s, "
"workflow: %s, "
"status: ERROR",
version,
stamp,
subject_id,
scan_id,
strategy,
workflow,
)
with open(out_file, "w") as f:
f.write(f"version: {version!s}\n")
f.write(f"timestamp: {stamp!s}\n")
f.write(f"pipeline_index: {index}\n")
f.write(f"subject_id: {subject_id}\n")
f.write(f"scan_id: {scan_id}\n")
f.write(f"strategy: {strategy}\n")
f.write(f"workflow_name: {workflow}\n")
f.write(status_msg)
return out_file
[docs]
def create_log(wf_name="log", scan_id=None):
"""Workflow to create log."""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.utils.interfaces import function
wf = pe.Workflow(name=wf_name)
input_node = pe.Node(
util.IdentityInterface(fields=["workflow", "log_dir", "index", "inputs"]),
name="inputspec",
)
output_node = pe.Node(
util.IdentityInterface(fields=["out_file"]), name="outputspec"
)
write_log = pe.Node(
function.Function(
input_names=["workflow", "log_dir", "index", "inputs", "scan_id"],
output_names=["out_file"],
function=write_to_log,
as_module=True,
),
name="write_log",
)
write_log.inputs.scan_id = scan_id
wf.connect(
[
(
input_node,
write_log,
[
("workflow", "workflow"),
("log_dir", "log_dir"),
("index", "index"),
("inputs", "inputs"),
],
),
(write_log, output_node, [("out_file", "out_file")]),
]
)
return wf
[docs]
def find_files(directory, pattern):
"""Find files in directory."""
for root, dirs, files in os.walk(directory):
for basename in files:
if fnmatch.fnmatch(basename, pattern):
filename = os.path.join(root, basename)
yield filename
[docs]
def check_command_path(path):
"""Chek if command path exists."""
import os
return os.system("%s >/dev/null 2>&1" % path) != 32512 # noqa: PLR2004
[docs]
def check_system_deps(
check_ants=False,
check_ica_aroma=False,
check_centrality_degree=False,
check_centrality_lfcd=False,
):
"""Check system for neuroimaging tools AFNI, C3D, FSL and ANTs."""
missing_install = []
# Check AFNI
if not check_command_path("3dcalc"):
missing_install.append("AFNI")
# Check FSL
if not check_command_path("fslmaths"):
missing_install.append("FSL")
# Check ANTs/C3D
if check_ants:
if not check_command_path("c3d_affine_tool"):
missing_install.append("C3D")
if not check_command_path("antsRegistration"):
missing_install.append("ANTS")
if check_centrality_degree:
if not check_command_path("3dDegreeCentrality"):
missing_install.append("3dDegreeCentrality")
if check_centrality_lfcd:
if not check_command_path("3dLFCD"):
missing_install.append("3dLFCD")
# Check ICA-AROMA
if check_ica_aroma:
if not check_command_path("ICA_AROMA.py"):
missing_install.append("ICA-AROMA")
# If we're missing deps, raise Exception
if len(missing_install) > 0:
missing_string = ""
for string in missing_install:
missing_string = missing_string + string + "\n"
err = (
"\n\n[!] CPAC says: It appears the following software "
"packages are not installed or configured properly:\n\n%s\n"
% missing_string
)
raise Exception(err)
[docs]
def check_config_resources(
c: Configuration | dict,
) -> tuple[float | int, int, int, int]:
"""Check pipeline config againts computer resources."""
# Import packages
from multiprocessing import cpu_count
import psutil
# Init variables
sys_virt_mem = psutil.virtual_memory()
num_cores = cpu_count()
# Check for pipeline memory for subject
if c.pipeline_setup["system_config"]["maximum_memory_per_participant"] is None:
# Get system memory and numSubsAtOnce
sys_mem_gb = sys_virt_mem.total / (1024.0**3)
sub_mem_gb = (
sys_mem_gb / c.pipeline_setup["system_config"]["num_participants_at_once"]
)
else:
sub_mem_gb = c.pipeline_setup["system_config"]["maximum_memory_per_participant"]
# If centrality is enabled, check to mem_sub >= mem_centrality
if c.network_centrality["run"]:
if sub_mem_gb < c.network_centrality["memory_allocation"]:
err_msg = (
"Memory allocated for subject: %d needs to be greater "
"than the memory allocated for centrality: %d. Fix "
"and try again."
% (
c.pipeline_setup["system_config"]["maximum_memory_per_participant"],
c.network_centrality["memory_allocation"],
)
)
raise Exception(err_msg)
# Check for pipeline threads
# Check if user specified cores
if c.pipeline_setup["system_config"]["max_cores_per_participant"]:
total_user_cores = (
c.pipeline_setup["system_config"]["num_participants_at_once"]
* c.pipeline_setup["system_config"]["max_cores_per_participant"]
)
if total_user_cores > num_cores:
raise SystemError(
"Configuration specifies more threads running "
"in parallel (%d) than number of threads "
"available (%d). Change this and try again"
% (total_user_cores, num_cores)
)
num_cores_per_sub = c.pipeline_setup["system_config"][
"max_cores_per_participant"
]
else:
num_cores_per_sub = (
num_cores / c.pipeline_setup["system_config"]["num_participants_at_once"]
)
# Now check ANTS
if (
"ANTS"
in c.registration_workflows["anatomical_registration"]["registration"]["using"]
):
if c.pipeline_setup["system_config"]["num_ants_threads"] is None:
num_ants_cores = num_cores_per_sub
elif (
c.pipeline_setup["system_config"]["num_ants_threads"]
> c.pipeline_setup["system_config"]["max_cores_per_participant"]
):
err_msg = (
"Number of threads for ANTS: %d is greater than the "
"number of threads per subject: %d. Change this and "
"try again."
% (
c.pipeline_setup["system_config"]["num_ants_threads"],
c.pipeline_setup["system_config"]["max_cores_per_participant"],
)
)
raise Exception(err_msg)
else:
num_ants_cores = c.pipeline_setup["system_config"]["num_ants_threads"]
else:
num_ants_cores = 1
# Now check OMP
if c.pipeline_setup["system_config"]["num_OMP_threads"] is None:
num_omp_cores = 1
elif (
c.pipeline_setup["system_config"]["num_OMP_threads"]
> c.pipeline_setup["system_config"]["max_cores_per_participant"]
):
err_msg = (
"Number of threads for OMP: %d is greater than the "
"number of threads per subject: %d. Change this and "
"try again."
% (
c.pipeline_setup["system_config"]["num_OMP_threads"],
c.pipeline_setup["system_config"]["max_cores_per_participant"],
)
)
raise Exception(err_msg)
else:
num_omp_cores = c.pipeline_setup["system_config"]["num_OMP_threads"]
# Return memory and cores
return sub_mem_gb, num_cores_per_sub, num_ants_cores, num_omp_cores
def _check_nested_types(d, keys):
"""Check types for *_nested_value functions."""
if not isinstance(d, dict):
msg = f"Expected dict, got {type(d).__name__}: {d!s}"
raise TypeError(msg)
if not isinstance(keys, list) and not isinstance(keys, tuple):
msg = f"Expected list, got {type(keys).__name__}: {keys!s}"
raise TypeError(msg)
[docs]
def delete_nested_value(d, keys):
"""Delete nested values.
Parameters
----------
d: dict
keys: list or tuple
Returns
-------
dict
updated
Examples
--------
>>> delete_nested_value(
... {'nested': {'key1': 'value', 'key2': 'value'}},
... ['nested', 'key1'])
{'nested': {'key2': 'value'}}
"""
_check_nested_types(d, keys)
if len(keys) == 1:
del d[keys[0]]
return d
if not len(keys): # pylint: disable=len-as-condition
return d
d[keys[0]] = delete_nested_value(d.get(keys[0], {}), keys[1:])
return d
[docs]
@deprecated(
"1.8.8",
"Python 2's end of life was over 4 years prior to this release. A user jumping from a C-PAC version that used Python 2 can use this function in any C-PAC version from 1.6.2 up until its removal in an upcoming version.",
)
def repickle(directory): # noqa: T20
"""
Recursively check a directory; convert Python 2 pickles to Python 3 pickles.
Parameters
----------
directory : str
Returns
-------
None
"""
for root, _, files in os.walk(directory, followlinks=True):
for fn in files:
p = os.path.join(root, fn)
if fn.endswith(".pkl"):
if _pickle2(p):
try:
with open(p, "rb") as fp:
f = pickle.load(fp, encoding="latin1")
with open(p, "wb") as fp:
pickle.dump(f, fp)
print(
f"Converted pickle {fn} from a Python 2 pickle to "
"a Python 3 pickle."
)
except Exception as e:
print(f"Could not convert Python 2 pickle {p} because {e}\n")
else:
print(f"Pickle {fn} is a Python 3 pickle.")
elif fn.endswith(".pklz"):
if _pickle2(p, True):
try:
with gzip.open(p, "rb") as fp:
f = pickle.load(fp, encoding="latin1")
with gzip.open(p, "wb") as fp:
pickle.dump(f, fp)
print(
f"Converted pickle {fn} from a Python 2 pickle to "
"a Python 3 pickle."
)
except Exception as e:
print(f"Could not convert Python 2 pickle {p} because {e}\n")
else:
print(f"Pickle {fn} is a Python 3 pickle.")
@deprecated(
"1.8.8",
"Python 2's end of life was over 4 years prior to this release. A user jumping from a C-PAC version that used Python 2 can use this function in any C-PAC version from 1.6.2 up until its removal in an upcoming version.",
)
def _pickle2(p, z=False): # noqa: T20
"""
Check if a pickle is a Python 2 pickle.
Also print other exceptions raised by trying to load the file at p.
Parameters
----------
p : str
path to pickle
z : bool
if pickle is gzipped
Returns
-------
pickle2 : bool
True if p is a Python 2 pickle
"""
if z:
with gzip.open(p, "rb") as fp:
try:
pickle.load(fp)
except UnicodeDecodeError:
return True
except Exception as e:
print(f"Pickle {p} may be a Python 3 pickle, but raised exception {e}")
else:
with open(p, "rb") as fp:
try:
pickle.load(fp)
except UnicodeDecodeError:
return True
except Exception as e:
print(f"Pickle {p} may be a Python 3 pickle, but raised exception {e}")
return False
def _changes_1_8_0_to_1_8_1(config_dict: dict) -> dict:
"""Automatically update a configuration dictionary from 1.8.0 to 1.8.1.
Examples
--------
Starting with 1.8.0
>>> zero = {'anatomical_preproc': {
... 'non_local_means_filtering': True,
... 'n4_bias_field_correction': True
... }, 'functional_preproc': {
... 'motion_estimates_and_correction': {
... 'calculate_motion_first': False
... }
... }, 'segmentation': {
... 'tissue_segmentation': {
... 'ANTs_Prior_Based': {
... 'CSF_label': 0,
... 'left_GM_label': 1,
... 'right_GM_label': 2,
... 'left_WM_label': 3,
... 'right_WM_label': 4}}}}
>>> updated_apb = _changes_1_8_0_to_1_8_1(zero)[
... 'segmentation']['tissue_segmentation']['ANTs_Prior_Based']
>>> updated_apb['CSF_label']
[0]
>>> updated_apb['GM_label']
[1, 2]
>>> updated_apb['WM_label']
[3, 4]
Starting with 1.8.1
>>> one = {'anatomical_preproc': {
... 'non_local_means_filtering': True,
... 'n4_bias_field_correction': True
... }, 'functional_preproc': {
... 'motion_estimates_and_correction': {
... 'calculate_motion_first': False
... }
... }, 'segmentation': {
... 'tissue_segmentation': {
... 'ANTs_Prior_Based': {
... 'CSF_label': [0],
... 'GM_label': [1, 2],
... 'WM_label': [3, 4]}}}}
>>> updated_apb = _changes_1_8_0_to_1_8_1(one)[
... 'segmentation']['tissue_segmentation']['ANTs_Prior_Based']
>>> updated_apb['CSF_label']
[0]
>>> updated_apb['GM_label']
[1, 2]
>>> updated_apb['WM_label']
[3, 4]
"""
for key_sequence in (
("anatomical_preproc", "non_local_means_filtering"),
("anatomical_preproc", "n4_bias_field_correction"),
):
config_dict = _now_runswitch(config_dict, key_sequence)
for combiners in (
(
(("segmentation", "tissue_segmentation", "ANTs_Prior_Based", "CSF_label"),),
("segmentation", "tissue_segmentation", "ANTs_Prior_Based", "CSF_label"),
),
(
(
(
"segmentation",
"tissue_segmentation",
"ANTs_Prior_Based",
"left_GM_label",
),
(
"segmentation",
"tissue_segmentation",
"ANTs_Prior_Based",
"right_GM_label",
),
),
("segmentation", "tissue_segmentation", "ANTs_Prior_Based", "GM_label"),
),
(
(
(
"segmentation",
"tissue_segmentation",
"ANTs_Prior_Based",
"left_WM_label",
),
(
"segmentation",
"tissue_segmentation",
"ANTs_Prior_Based",
"right_WM_label",
),
),
("segmentation", "tissue_segmentation", "ANTs_Prior_Based", "WM_label"),
),
):
config_dict = _combine_labels(config_dict, *combiners)
try:
calculate_motion_first = lookup_nested_value(
config_dict,
[
"functional_preproc",
"motion_estimates_and_correction",
"calculate_motion_first",
],
)
except KeyError:
calculate_motion_first = None
if calculate_motion_first is not None:
del config_dict["functional_preproc"]["motion_estimates_and_correction"][
"calculate_motion_first"
]
config_dict = set_nested_value(
config_dict,
[
"functional_preproc",
"motion_estimates_and_correction",
"motion_estimates",
"calculate_motion_first",
],
calculate_motion_first,
)
return config_dict
def _combine_labels(config_dict, list_to_combine, new_key):
"""Combine formerly separate keys into a combined key.
Parameters
----------
config_dict: dict
key_sequence: iterable of lists or tuples
new_key: list or tuple
Returns
-------
updated_config_dict: dict
"""
new_value = []
any_old_values = False
for _to_combine in list_to_combine:
try:
old_value = lookup_nested_value(config_dict, _to_combine)
except KeyError:
old_value = None
if old_value is not None:
any_old_values = True
if isinstance(old_value, (list, set, tuple)):
for value in old_value:
new_value.append(value)
else:
new_value.append(old_value)
config_dict = delete_nested_value(config_dict, _to_combine)
if any_old_values:
return set_nested_value(config_dict, new_key, new_value)
return config_dict
[docs]
def concat_list(in_list1=None, in_list2=None):
"""Concatenate a pair of lists.
Parameters
----------
in_list1 : list or str
file path or a list of file paths
in_list2 : list or str
file path or a list of file paths
Returns
-------
out_list : list
a list of file paths
"""
if in_list1 is not None:
if not isinstance(in_list1, list):
in_list1 = [in_list1]
else:
in_list1 = []
if in_list2 is not None:
if not isinstance(in_list2, list):
in_list2 = [in_list2]
else:
in_list2 = []
return in_list1 + in_list2
[docs]
def list_item_replace(
l, # noqa: E741 # pylint: disable=invalid-name
old,
new,
):
"""Replace an item in a list.
Parameters
----------
l : list or string
old : any
item to replace
new : any
new item
Returns
-------
l : list or string
updated
Examples
--------
>>> list_item_replace(['AFNI', 'FSL'], 'AFNI', '3dSkullStrip')
['3dSkullStrip', 'FSL']
>>> list_item_replace(['AFNI', 'FSL'], 'FSL', 'BET')
['AFNI', 'BET']
"""
if isinstance(l, list) and old in l:
l[l.index(old)] = new
elif isinstance(l, str):
l = l.replace(old, new) # noqa: E741
return l
[docs]
def lookup_nested_value(d, keys):
"""Look up nested values.
Parameters
----------
d: dict
keys: list or tuple
Returns
-------
yaml: str or dict
Examples
--------
>>> lookup_nested_value({'nested': {'True': True}}, ['nested', 'True'])
True
>>> lookup_nested_value({'nested': {'None': None}}, ['nested', 'None'])
''
"""
if not isinstance(d, dict):
return d
if len(keys) == 1:
value = d[keys[0]]
if value is None:
return ""
return value
try:
return lookup_nested_value(d[keys[0]], keys[1:])
except KeyError as e:
e.args = (keys,)
raise
def _now_runswitch(config_dict, key_sequence):
"""Convert a formerly forkable value to a runswitch.
Parameters
----------
config_dict: dict
key_sequence: list or tuple
Returns
-------
updated_config_dict: dict
"""
try:
old_forkable = lookup_nested_value(config_dict, key_sequence)
except KeyError:
return config_dict
if isinstance(old_forkable, (bool, list)):
return set_nested_value(config_dict, key_sequence, {"run": old_forkable})
return config_dict
def _remove_somethings(value, things_to_remove):
"""Remove instances of any in a given set of values from a list.
Parameters
----------
value : list
things_to_remove : set
Returns
-------
list
"""
if isinstance(value, list):
for thing in things_to_remove:
while thing in value:
value.remove(thing)
return value
[docs]
def remove_False(d, k):
"""Remove "Off" and False from a list at a given nested key.
Parameters
----------
d : dict
k : list
Returns
-------
d : dict
updated
Examples
--------
>>> remove_False({'a': {'b': [1, False, 2, "Off", 3]}}, ['a', 'b'])
{'a': {'b': [1, 2, 3]}}
"""
value = _remove_somethings(lookup_nested_value(d, k), {False, "Off"})
return set_nested_value(d, k, value)
[docs]
def remove_None(d, k):
"""Remove "None" and None from a list at a given nested key.
Parameters
----------
d : dict
k : list
Returns
-------
d : dict
updated
Examples
--------
>>> remove_None({'a': {'b': [1, None, 2, "None", 3]}}, ['a', 'b'])
{'a': {'b': [1, 2, 3]}}
"""
value = _remove_somethings(lookup_nested_value(d, k), {None, "None"})
return set_nested_value(d, k, value)
[docs]
def replace_in_strings(d, replacements=None):
"""Recursively replace substrings.
Parameters
----------
d : any
replacements : list of 2-tuples
0 : str
substring to replace
1 : str
replacement substring
Returns
-------
d : any
same as input, but updated
Examples
--------
>>> replace_in_strings({'key': 'test${resolution_for_func_preproc}'})
{'key': 'test${func_resolution}'}
"""
if replacements is None:
replacements = [(r"${resolution_for_func_preproc}", r"${func_resolution}")]
if isinstance(d, dict):
return {k: replace_in_strings(d[k], replacements) for k in d}
if isinstance(d, list):
return [replace_in_strings(i, replacements) for i in d]
if isinstance(d, str):
for replacement in replacements:
d = d.replace(replacement[0], replacement[1])
return d
[docs]
def set_nested_value(d, keys, value):
"""Set nested values.
Parameters
----------
d: dict
keys: list or tuple
value: any
Returns
-------
dict
updated
Examples
--------
>>> set_nested_value({}, ['nested', 'keys'], 'value')
{'nested': {'keys': 'value'}}
"""
_check_nested_types(d, keys)
if len(keys) == 1:
d.update({keys[0]: value})
return d
if not len(keys): # pylint: disable=len-as-condition
return d
new_d = {keys[0]: set_nested_value(d.get(keys[0], {}), keys[1:], value)}
return update_nested_dict(d, new_d)
[docs]
def update_config_dict(old_dict):
"""Convert an old config dict to a new config dict.
Parameters
----------
old_dict : dict
Returns
-------
new_dict : dict
1.8 nested config dictionary
old_dict : dict
remaining undefined mappings
combined_dict : dict
1.8 nested config dictionary plus remaining undefined mappings
Examples
--------
>>> a, b, c = update_config_dict({
... 'pipelineName': 'example-pipeline', '2': None})
>>> a
{'pipeline_setup': {'pipeline_name': 'example-pipeline'}}
>>> b
{'2': None}
>>> c
{'pipeline_setup': {'pipeline_name': 'example-pipeline'}, '2': None}
"""
def _append_to_list(current_value, new_value):
"""Add new_value to the current_value list, creating list if it does not exist.
Skips falsy elements in new_value.
Parameters
----------
current_value : list
new_value : list, bool, None, or str
Returns
-------
list
Examples
--------
>>> _append_to_list([1], [2])
[1, 2]
>>> _append_to_list([1, 2], [2])
[1, 2]
>>> _append_to_list(None, [2])
[2]
>>> _append_to_list([1], [1, 2])
[1, 2]
>>> _append_to_list([1], [None, 2])
[1, 2]
"""
if not isinstance(current_value, list):
if current_value is not None:
current_value = [current_value]
else:
current_value = []
else:
current_value = [v for v in current_value if v is not None]
if isinstance(new_value, list):
for i in new_value:
if i and i not in current_value and i != "Off":
current_value.append(i)
elif new_value and new_value not in current_value and new_value != "Off":
current_value.append(new_value)
return current_value
def _bool_to_str(old_value, value_if_true):
"""Convert a True or a list containing a True to a given string.
Parameters
----------
old_value : list, bool, None, or str
value_if_true : str
Returns
-------
str or None
Examples
--------
>>> _bool_to_str([0], 'test_str')
>>> _bool_to_str([1], 'test_str')
'test_str'
>>> _bool_to_str(0, 'test_str')
>>> _bool_to_str(1, 'test_str')
'test_str'
>>> _bool_to_str([True, False], 'test_str')
'test_str'
>>> _bool_to_str(None, 'test_str')
>>> _bool_to_str([0, None, False], 'test_str')
>>> _bool_to_str([0, None, False, 1], 'test_str')
'test_str'
"""
if isinstance(old_value, list):
if any(bool(i) for i in old_value):
return value_if_true
elif bool(old_value):
return value_if_true
return None
def _get_old_values(old_dict, new_dict, key):
"""Get old and current values of a special key being updated.
Parameters
----------
old_dict : dict
new_dict : dict
key : str
Returns
-------
old_dict : dict
new_dict : dict
old_value : any
current_value : any
"""
old_value = old_dict.pop(key)
current_value = lookup_nested_value(new_dict, NESTED_CONFIG_MAPPING[key])
return old_dict, new_dict, old_value, current_value
new_dict = {}
for key in old_dict.copy():
if key in NESTED_CONFIG_MAPPING:
# handle special cases
special_cases = {
"acpc_run_preprocessing",
"acpc_template_brain",
"ANTs_prior_based_segmentation",
"func_reg_input",
"runRegisterFuncToTemplate",
"runRegisterFuncToEPI",
"fsl_linear_reg_only",
"functional_registration",
"template_for_resample",
"fnirtConfig",
"run_smoothing",
"runZScoring",
"run_longitudinal",
}
if key in special_cases:
try:
(old_dict, new_dict, old_value, current_value) = _get_old_values(
old_dict, new_dict, key
)
except KeyError:
continue
# longitudinal_template_generation.run
if key == "run_longitudinal":
if "anat" in old_value or "func" in old_value:
current_value = True
else:
current_value = False
# anatomical_preproc.acpc_alignment.run_before_preproc
if key == "acpc_run_preprocessing":
current_value = (
True
if old_value.lower() == "before"
else False
if old_value.lower() == "after"
else None
)
# anatomical_preproc.acpc_alignment.acpc_target
if key == "acpc_template_brain":
if current_value in {"None", None, ""}:
new_dict = set_nested_value(
new_dict,
["anatomical_preproc", "acpc_alignment", "acpc_target"],
"whole-head",
)
# segmentation.tissue_segmentation.using
elif key == "ANTs_prior_based_segmentation":
new_value = _bool_to_str(old_value, "ANTs_Prior_Based")
if new_value == "ANTs_Prior_Based":
new_dict = set_nested_value(
new_dict,
NESTED_CONFIG_MAPPING[key][:-1] + [new_value, "run"],
old_value,
)
# registration_workflows.functional_registration.
# coregistration.func_input_prep.input
elif key == "func_reg_input":
new_value = _replace_in_value_list(old_value, (" ", "_"))
current_value = _replace_in_value_list(current_value, (" ", "_"))
# registration_workflows.functional_registration.
# func_registration_to_template.target_template.using
elif key in {"runRegisterFuncToTemplate", "runRegisterFuncToEPI"}:
current_value = _replace_in_value_list(current_value, (" ", "_"))
if key == "runRegisterFuncToTemplate":
current_value = [
v for v in current_value if v not in {"Off", "False", False}
]
new_value = []
new_dict = set_nested_value(
new_dict,
[
"registration_workflows",
"functional_registration",
"func_registration_to_template",
"run",
],
bool(current_value),
)
if key == "runRegisterFuncToEPI":
new_value = _bool_to_str(old_value, "EPI_template")
# registration_workflows.anatomical_registration.registration.
# using
elif key == "fsl_linear_reg_only":
new_value = _bool_to_str(old_value, "FSL-linear")
# registration_workflows.functional_registration.
# func_registration_to_template.target_template.
# EPI_template.EPI_template_for_resample
elif key == "template_for_resample":
new_dict = set_nested_value(
new_dict,
[
"registration_workflows",
"functional_registration",
"func_registration_to_template",
"target_template",
"EPI_template",
"EPI_template_for_resample",
],
current_value,
)
# registration_workflows.functional_registration.
# EPI_registration.FSL-FNIRT.fnirt_config
elif key == "fnirtConfig":
current_value = old_value
new_dict = set_nested_value(
new_dict,
[
"registration_workflows",
"functional_registration",
"EPI_registration",
"FSL-FNIRT",
"fnirt_config",
],
current_value,
)
# post_processing.spatial_smoothing.output
elif key == "run_smoothing":
new_value = [_bool_to_str(old_value, "smoothed")]
if any(not bool(value) for value in old_value):
new_value.append("nonsmoothed")
current_value = new_value
# post_processing.z-scoring.output
elif key == "runZScoring":
new_value = [_bool_to_str(old_value, "z-scored")]
if any(not bool(value) for value in old_value):
new_value.append("raw")
current_value = new_value
# make sure list values are cast as lists
if key not in { # if key not in non-list-valued keys
"acpc_run_preprocessing",
"acpc_template_brain",
"functional_registration",
"template_for_resample",
"fnirtConfig",
"run_longitudinal",
}:
current_value = _append_to_list(current_value, new_value)
# update remaining keys
else:
current_value = old_dict.pop(key)
if current_value == "None":
current_value = None
new_dict = set_nested_value(
new_dict, NESTED_CONFIG_MAPPING[key], current_value
)
elif key in NESTED_CONFIG_DEPRECATIONS:
old_dict.pop(key)
return new_dict, old_dict, update_nested_dict(new_dict.copy(), old_dict)
[docs]
def update_nested_dict(d_base, d_update, fully_specified=False):
"""Update dictionary of varying depth.
Parameters
----------
d_base : dict
original dictionary
d_update : dict
dictionary with updates
fully_specified : bool
if True, overwrite instead of update
Returns
-------
d_base : dict
original dictionary with updates
Examples
--------
>>> d_base = {'pipeline_name': 'cpac-default-pipeline',
... 'output_directory': {'path': '/output',
... 'write_func_outputs': False,
... 'write_debugging_outputs': False,
... 'output_tree': 'default',
... 'quality_control': {
... 'generate_quality_control_images': True,
... 'generate_xcpqc_files': True}},
... 'working_directory': {'path': '/tmp', 'remove_working_dir': True},
... 'log_directory': {'run_logging': True, 'path': '/logs'},
... 'system_config': {'maximum_memory_per_participant': 1,
... 'max_cores_per_participant': 1,
... 'num_ants_threads': 4,
... 'num_participants_at_once': 1},
... 'Amazon-AWS': {'aws_output_bucket_credentials': None,
... 's3_encryption': False}}
>>> d_update = {'pipeline_name': 'cpac_fmriprep-options',
... 'system_config': {'num_ants_threads': 1},
... 'Amazon-AWS': {'s3_encryption': True}}
>>> str(update_nested_dict(d_base, d_update)) == str({
... 'pipeline_name': 'cpac_fmriprep-options', 'output_directory': {
... 'path': '/output', 'write_func_outputs': False,
... 'write_debugging_outputs': False, 'output_tree': 'default',
... 'quality_control': {
... 'generate_quality_control_images': True,
... 'generate_xcpqc_files': True}
... }, 'working_directory': {
... 'path': '/tmp', 'remove_working_dir': True
... }, 'log_directory': {'run_logging': True, 'path': '/logs'},
... 'system_config': {
... 'maximum_memory_per_participant': 1,
... 'max_cores_per_participant': 1,
... 'num_ants_threads': 1, 'num_participants_at_once': 1
... }, 'Amazon-AWS': {
... 'aws_output_bucket_credentials': None, 's3_encryption': True}})
True
>>> tse_base = {'timeseries_extraction': {'run': True, 'tse_roi_paths': {
... '/cpac_templates/CC400.nii.gz': 'Avg',
... '/cpac_templates/aal_mask_pad.nii.gz': 'Avg'
... }, 'realignment': 'ROI_to_func'}}
>>> str(update_nested_dict(tse_base, {})) == str({
... 'timeseries_extraction': {'run': True, 'tse_roi_paths': {
... '/cpac_templates/CC400.nii.gz': 'Avg',
... '/cpac_templates/aal_mask_pad.nii.gz': 'Avg'
... }, 'realignment': 'ROI_to_func'}})
True
>>> str(update_nested_dict(tse_base, {'timeseries_extraction': {
... 'tse_roi_paths': {'/cpac_templates/rois_3mm.nii.gz': 'Voxel'}
... }})) == str({'timeseries_extraction': {'run': True, 'tse_roi_paths': {
... '/cpac_templates/rois_3mm.nii.gz': 'Voxel'
... }, 'realignment': 'ROI_to_func'}})
True
>>> str(update_nested_dict(tse_base, {'timeseries_extraction': {
... 'roi_paths_fully_specified': False,
... 'tse_roi_paths': {'/cpac_templates/rois_3mm.nii.gz': 'Voxel'}
... }})) == str({'timeseries_extraction': {'run': True, 'tse_roi_paths': {
... '/cpac_templates/CC400.nii.gz': 'Avg',
... '/cpac_templates/aal_mask_pad.nii.gz': 'Avg',
... '/cpac_templates/rois_3mm.nii.gz': 'Voxel'
... }, 'realignment': 'ROI_to_func'}})
True
>>> str(update_nested_dict(tse_base, {'timeseries_extraction': {
... 'roi_paths_fully_specified': False,
... 'tse_roi_paths': {'/cpac_templates/aal_mask_pad.nii.gz': 'Voxel'}
... }})) == str({'timeseries_extraction': {'run': True,
... 'tse_roi_paths': {
... '/cpac_templates/CC400.nii.gz': 'Avg',
... '/cpac_templates/aal_mask_pad.nii.gz': 'Voxel'
... }, 'realignment': 'ROI_to_func'}})
True
>>> str(update_nested_dict(tse_base, {'timeseries_extraction': {
... 'tse_roi_paths': {'/cpac_templates/aal_mask_pad.nii.gz': 'Voxel'}
... }})) == str({'timeseries_extraction': {'run': True, 'tse_roi_paths': {
... '/cpac_templates/aal_mask_pad.nii.gz': 'Voxel'
... }, 'realignment': 'ROI_to_func'}})
True
""" # pylint: disable=line-too-long
# short-circuit if d_update has `*_roi_paths` and
# `roi_paths_fully_specified` children
if fully_specified:
return d_update
d_new = {} if d_base is None else deepcopy(d_base)
for k, v in d_update.items():
if k.endswith("_roi_paths"):
fully_specified = d_update.get("roi_paths_fully_specified", True)
else:
fully_specified = False
if k != "roi_paths_fully_specified":
if isinstance(v, collections.abc.Mapping):
d_new[k] = update_nested_dict(d_new.get(k, {}), v, fully_specified)
else:
d_new[k] = v
return d_new
[docs]
def update_pipeline_values_1_8(d_old):
"""Update pipeline config values that changed from C-PAC 1.7 to 1.8.
Parameters
----------
d_old : dict
Returns
-------
d : dict
updated
Examples
--------
>>> update_pipeline_values_1_8({'segmentation': {'tissue_segmentation': {
... 'using': ['FSL-FAST Thresholding', 'Customized Thresholding']}}})
{'segmentation': {'tissue_segmentation': {'using': ['FSL-FAST'], 'FSL-FAST': {'thresholding': {'use': 'Custom'}}}}}
>>> update_pipeline_values_1_8({'segmentation': {'tissue_segmentation': {
... 'using': ['FSL-FAST Thresholding']}}})
{'segmentation': {'tissue_segmentation': {'using': ['FSL-FAST'], 'FSL-FAST': {'thresholding': {'use': 'Auto'}}}}}
""" # pylint: disable=line-too-long
from CPAC.pipeline.schema import valid_options
# pylint: disable=import-outside-toplevel
d = replace_in_strings(d_old.copy())
d = _replace_changed_values(
d,
["anatomical_preproc", "brain_extraction", "using"],
[("AFNI", "3dSkullStrip"), ("FSL", "BET"), ("unet", "UNet")],
)
d = _replace_changed_values(
d,
["functional_preproc", "func_masking", "using"],
[("3dAutoMask", "AFNI"), ("BET", "FSL")],
)
try:
seg_use_threshold = lookup_nested_value(
d, ["segmentation", "tissue_segmentation", "using"]
)
except KeyError:
seg_use_threshold = []
if not isinstance(seg_use_threshold, list):
seg_use_threshold = [seg_use_threshold]
if "FSL-FAST Thresholding" in seg_use_threshold:
if "using" in d["segmentation"].get("tissue_segmentation", {}):
d["segmentation"]["tissue_segmentation"]["using"].append("FSL-FAST")
else:
d = set_nested_value(
d, ["segmentation", "tissue_segmentation", "using"], ["FSL-FAST"]
)
seg_use_threshold.remove("FSL-FAST Thresholding")
if "Customized Thresholding" in seg_use_threshold:
seg_use_threshold.remove("Customized Thresholding")
d = set_nested_value(
d,
["segmentation", "tissue_segmentation", "FSL-FAST", "thresholding", "use"],
"Custom",
)
else:
d = set_nested_value(
d,
["segmentation", "tissue_segmentation", "FSL-FAST", "thresholding", "use"],
"Auto",
)
for centr in [
"degree_centrality",
"eigenvector_centrality",
"local_functional_connectivity_density",
]:
centr_keys = ["network_centrality", centr, "weight_options"]
try:
centr_value = lookup_nested_value(d, centr_keys)
if any(isinstance(v, bool) for v in centr_value):
for i in range(2):
if centr_value[i]:
centr_value[i] = valid_options["centrality"]["weight_options"][
i
]
while False in centr_value:
centr_value.remove(False)
d = set_nested_value(d, centr_keys, centr_value)
except KeyError:
continue
seg_template_key = [
"segmentation",
"tissue_segmentation",
"Template_Based",
"template_for_segmentation",
]
try:
seg_template = lookup_nested_value(d, seg_template_key)
for replacement in [
("EPI_template", valid_options["segmentation"]["template"][0]),
("T1_template", valid_options["segmentation"]["template"][1]),
]:
seg_template = list_item_replace(seg_template, *replacement)
while "Off" in seg_template:
seg_template.remove("Off")
while False in seg_template:
seg_template.remove(False)
d = set_nested_value(d, seg_template_key, seg_template)
d = remove_None(d, seg_template_key)
except KeyError:
pass
distcor_key = ["functional_preproc", "distortion_correction", "using"]
try:
lookup_nested_value(d, distcor_key)
d = remove_None(d, distcor_key)
except KeyError:
pass
if "functional_registration" in d and isinstance(
d["functional_registration"], dict
):
if "1-coregistration" in d["functional_registration"]:
coreg = d["functional_registration"].pop("1-coregistration")
d = set_nested_value(
d,
["registration_workflows", "functional_registration", "coregistration"],
coreg,
)
if not (bool(d["functional_registration"])):
d.pop("functional_registration")
return update_values_from_list(d)
[docs]
def update_values_from_list(d_old, last_exception=None):
"""Convert 1-length lists of an expected type to single items of that type...
...or to convert singletons of an expected list of a type into lists thereof.
Also handles some type conversions against the schema.
Parameters
----------
d_old : dict
last_exception: Exception or None
if the same exception recurs, raise it.
Returns
-------
d : dict
updated
Examples
--------
>>> update_values_from_list({'pipeline_setup': {
... 'pipeline_name': ['one_string']}})
{'pipeline_setup': {'pipeline_name': 'one_string'}}
>>> update_values_from_list({'nuisance_corrections': {
... '1-ICA-AROMA': {'run': [False]}}})
{'nuisance_corrections': {'1-ICA-AROMA': {'run': [False]}}}
"""
from CPAC.pipeline.schema import schema
d = d_old.copy()
try:
schema(d)
except Invalid as e:
if (
last_exception
and last_exception.path == e.path
and last_exception.msg == e.msg
):
raise e
observed = lookup_nested_value(d, e.path)
if observed == "None":
return update_values_from_list(set_nested_value(d, e.path, None), e)
expected = (
e.msg.split("expected")[-1].strip() if "expected" in e.msg else "unknown"
)
if expected != "bool" and isinstance(observed, list) and len(observed) == 1:
try:
return update_values_from_list(
set_nested_value(d, e.path, observed[0]), e
)
except TypeError:
raise e
if expected == "bool":
if isinstance(observed, int): # pylint: disable=no-else-return
return update_values_from_list(
set_nested_value(d, e.path, bool(observed)), e
)
if isinstance(observed, list):
if len(observed) == 0: # pylint: disable=no-else-return
return update_values_from_list(
set_nested_value(d, e.path, False), e
)
# maintain a list if list expected
list_expected = e.path[-1] == 0
e_path = e.path[:-1] if list_expected else e.path
if len(observed) == 1: # pylint: disable=no-else-return
if isinstance(observed[0], int):
value = bool(observed[0])
elif observed[0].lower() in YAML_BOOLS[True]:
value = True
elif observed[0].lower() in YAML_BOOLS[False]:
value = False
return update_values_from_list(
set_nested_value(
d, e_path, [value] if list_expected else value
),
e,
)
return update_values_from_list(
set_nested_value(d, e_path, [bool(value) for value in observed]),
e,
)
if observed.lower() in YAML_BOOLS[True]:
return update_values_from_list(set_nested_value(d, e.path, True), e)
if observed.lower() in YAML_BOOLS[False]:
return update_values_from_list(set_nested_value(d, e.path, False), e)
return update_values_from_list(set_nested_value(d, e_path, observed[0]), e)
if expected == "a list":
return update_values_from_list(set_nested_value(d, e.path, [observed]), e)
raise e
return d
def _replace_changed_values(d, nested_key, replacement_list):
"""Replace values changed from C-PAC 1.7 to C-PAC 1.8.
Parameters
----------
d : dict
nested_key : list of strings
replacement_list : list of tuples
0 : any
value to replace
1 : any
replacement value
Returns
-------
d : dict
Examples
--------
>>> d = {'test': {'this': ['function']}}
>>> _replace_changed_values(d, ['test', 'this'], [('function', 'success')])
{'test': {'this': ['success']}}
"""
try:
current_value = lookup_nested_value(d, nested_key)
except KeyError:
return d
if isinstance(current_value, list):
current_value = _replace_in_value_list(current_value, replacement_list)
else:
for replacement in replacement_list:
current_value = list_item_replace(current_value, *replacement)
return set_nested_value(d, nested_key, current_value)
def _replace_in_value_list(current_value, replacement_tuple):
"""Make character replacements and drop falsy values.
Parameters
----------
current_value : list
replacement_tuple : tuple or list of tuples
0 : str
character value to replace
1 : str
replacement character value
Returns
-------
current_value : list
Examples
--------
>>> current_value = ['EPI Template', 'T1_Template', 'None']
>>> _replace_in_value_list(current_value, (' ', '_'))
['EPI_Template', 'T1_Template']
>>> current_value = ['AFNI', 'FSL', 'None', None, False]
>>> _replace_in_value_list(current_value, [
... ('AFNI', '3dSkullStrip'), ('FSL', 'BET')])
['3dSkullStrip', 'BET']
"""
if isinstance(replacement_tuple, list):
for rt in replacement_tuple:
current_value = _replace_in_value_list(current_value, rt)
return current_value
if not isinstance(current_value, list):
current_value = [current_value]
return [
v.replace(*replacement_tuple)
for v in current_value
if bool(v) and v not in {"None", "Off", ""}
]