Source code for selfeeg.utils.utils

from __future__ import annotations

import copy
import os
import pickle
import random
from typing import Optional, Sequence, Union

import numpy as np
import pandas as pd
import torch
from numpy.typing import ArrayLike

__all__ = [
    "check_models",
    "count_parameters",
    "create_dataset",
    "get_subarray_closest_sum",
    "RangeScaler",
    "scale_range_soft_clip",
    "torch_pchip",
    "torch_zscore",
    "ZscoreScaler",
]


def subarray_closest_sum(arr: ArrayLike, n: int, k: float) -> tuple(ArrayLike, float, float, float):
    """
    returns a subarray whose element sum is closest to k.

    This function is inspired from [link1]_

    It is important to note that this function returns a subarray and not a
    subset of the array. A subset is a collection of elements in the array taken
    from any index, a subarray here is a slice of the array (arr[start:end]).
    If you are looking for the exact subset with closest sum, which is more
    accurate but more computationally and memory demanding, use another function.

    Parameters
    ----------
    arr: ArrayLike
        The array to search.
    n: int
        The length of the array.
    k: float
        The target value.

    Returns
    -------
    best_arr: ArrayLike
        The subarray whose element sum is closest to k.
    best_start: float
        The starting index of the subarray.
    best_end: float
        The ending index of the subarray.
    min_diff: float
        Absolute difference between the target value and the sum of the subarray's values.

    References
    ----------
    .. [link1] https://www.geeksforgeeks.org/subarray-whose-sum-is-closest-to-k/

    """
    # Initialize start and end pointers, current sum, minimum difference
    # and best start and end pointers
    start = 0
    end = 0
    best_start = 0
    best_end = 0
    curr_sum = arr[0]

    # Initialize the minimum difference between the subarray sum and K
    min_diff = abs(curr_sum - k)

    # Traverse through the array
    while end < n - 1:

        # If the current sum is less than k, move the end pointer to the right
        if curr_sum < k:
            end += 1
            curr_sum += arr[end]
        # Otherwise, move the start pointer to the right
        else:
            curr_sum -= arr[start]
            start += 1

        # Update the minimum difference and store best subarray pointers
        if abs(curr_sum - k) < min_diff:
            min_diff = abs(curr_sum - k)
            best_start = start
            best_end = end
            # if minimum difference is zero, return the optimal subarray
            if min_diff == 0:
                return arr[best_start : best_end + 1], best_start, best_end, min_diff

    return arr[best_start : best_end + 1], best_start, best_end, min_diff


[docs] def get_subarray_closest_sum( arr: ArrayLike, target: float, tolerance: float = 0.01, perseverance: int = 1000, return_subarray: bool = True, ) -> tuple[list, Optional[list]]: """ find the subarray whose values sum is closer to a target. The solution found is the first inside a specified tolerance (if possible) and return the index of the selected values in the original array. To find the subarray, get_subarray_closest_sum calls multiple times the ``subarray_closest_sum`` function until the subarray has the sum within [target*(1-tolerance), target*(1+tolerance)]. At each try the array is shuffled in order to get a different solution. Keep in mind that the solution is not always the optimal, but rather the first which satisfies the requirements given. Parameters ---------- arr: ArrayLike The array to search. target: float The target sum. tolerance: float, optional The tolerance to apply to the sum in percentage, in range [0,1]. Default = 0.01 perseverance: int, optional The maximum number of tries before stopping searching the subarray with closest sum. Default = 1000 return_subarray: bool, optional whether to also return the subarray or not. Default = True Returns ------- final_idx: list A list with the index of the identified subarray. best_sub_arr: list, optional The subarray. Example ------- >>> import random >>> import selfeeg.utils >>> random.seed(1235) >>> arr = [i for i in range (1,100)] >>> final_idx, best_sub_arr = utils.get_subarray_closest_sum( ... arr, 3251, perseverance=10000) >>> print( sum(best_sub_arr)) #should print 3251 """ if tolerance < 0 or tolerance > 1: raise ValueError("tolerance must be in [0,1]") else: upper_bound = target * tolerance if not isinstance(perseverance, int): perseverance = int(perseverance) arr_original = arr idx = range(len(arr)) N = len(arr) subarr_diff = 0 best_idx = [] best_start = 0 best_end = 0 best_subarr_diff = float("inf") starti = 0 endi = 0 # c = np.array([arr,idx]).T for _ in range(perseverance): c = list(zip(arr, idx)) random.shuffle(c) arr, idx = zip(*c) # np.random.shuffle(c) # _, starti, endi, subarr_diff = subarray_closest_sum(c[:,0], N, target) _, starti, endi, subarr_diff = subarray_closest_sum(arr, N, target) if subarr_diff < best_subarr_diff: best_subarr_diff = subarr_diff best_idx = idx best_start = starti best_end = endi if best_subarr_diff < upper_bound: break # get final list final_idx = list(best_idx[best_start : best_end + 1]) final_idx.sort() if return_subarray: best_subarr = list(map(arr_original.__getitem__, final_idx)) return final_idx, best_subarr else: return final_idx
[docs] def scale_range_soft_clip( x: ArrayLike, Range: float = 200, asintote: float = 1.2, scale: str = "mV", exact: bool = True ) -> ArrayLike: """ soft version of the range scaler. The function will rescale the data in the following way: 1. values in Range will be rescaled in the range [-1,1] linearly 2. values outside the range will be either clipped or soft clipped with an exponential saturating curve with first derivative in -1 and 1 preserved and horizontal asintote (the saturating point) given by the user. To provide faster computation, this function can also approximate its behaviour with a sigmoid function which scales the given input using the specified range and asintote. To check the difference in those functions see the geogebra file provided in the extra folder of the github repository. Parameters ---------- x: ArrayLike The array or tensor to rescale. Rescaling can be perfomed along the last dimension. Tensors can also be placed in a GPU. Computation in this case is much faster Range: float, optional The range of values to rescale given in microVolt. It rescale linearly the values in [-range, range] to [-1, 1]. Must be a positive value. The list [-range, range] is created internally. Default = 200 asintote: float, optional The horizontal asintote of the soft clipping part. Must be a value bigger than 1. Default = 1.2 scale: str, optional The scale of the EEG Samples. It can be: - 'mV' for milliVolt - 'uV' for microVolt - 'nV' for nanoVolt Default = 'mV' exact: bool, optional Whether to approximate the composed function (linear + exponential function) with a sigmoid. It will make the rescaling much faster but will not preserve the linearity in the range [-1, 1]. Returns ------- x_scaled: ArrayLike The rescaled array. Example ------- >>> import selfeeg.utils >>> import torch >>> x = torch.zeros(16,32,1024) + torch.sin(torch.linspace(0, 8*torch.pi,1024))*500 >>> x_scaled = utils.scale_range_soft_clip(x, 200, 2.5, 'uV' ) >>> print( x.max()<=2.5 and x.min()>=-2.5) # should return False >>> print( x_scaled.max()<=2.5 and x_scaled.min()>=-2.5) # should return True """ if Range < 0: raise ValueError("Range argument cannot be lower than 0") if asintote is None: asintote = 1.0 elif asintote < 1: raise ValueError("asintote must be a value bigger than 1") scale = scale.lower() Range = Range / 1000 if scale not in ["mv", "uv", "nv"]: raise ValueError("scale must be any of 'mV', 'uV', 'nV'") else: if scale == "uv": x = x / 1.0e3 elif scale == "nv": x = x / 1.0e6 x_scaled = torch.clone(x) if isinstance(x, torch.Tensor) else np.copy(x) # CASE 1: HARD clipping if asintote == 1.0: mask1 = x > Range mask2 = x < -Range x_scaled = x / Range if isinstance(x, torch.Tensor): x_scaled = torch.clamp(x_scaled, min=-1, max=1) else: x_scaled = np.clip(x_scaled, -1, 1) return x_scaled # CASE 2: SOFT CLIPPING if exact: mask1 = x > (Range) mask2 = x < (-Range) x_scaled = x / Range if isinstance(x, torch.Tensor): x_scaled[mask2] = (asintote - 1) * torch.exp( (x[mask2] + Range) / (Range * (asintote - 1)) ) - asintote x_scaled[mask1] = -( (asintote - 1) * torch.exp((-x[mask1] + Range) / (Range * (asintote - 1))) - asintote ) else: x_scaled[mask2] = (asintote - 1) * np.exp( (x[mask2] + Range) / (Range * (asintote - 1)) ) - asintote x_scaled[mask1] = -( (asintote - 1) * np.exp((-x[mask1] + Range) / (Range * (asintote - 1))) - asintote ) else: # trating c as -coeff c = (np.log((2 * asintote) / (1 + asintote) - 1)) / Range if isinstance(x, torch.Tensor): x_scaled = ((2 * asintote) / (1 + torch.exp(c * x))) - asintote else: x_scaled = ((2 * asintote) / (1 + np.exp(c * x))) - asintote return x_scaled
[docs] class RangeScaler: """ class adaptation of the ``scale_range_with_soft_clip`` function. Upon call, RangeScaler rescales the given EEG data in the following way: 1. values in Range will be linearly rescaled in the range [-1,1]. 2. values outside the range will be either clipped or soft clipped with an exponential saturating curve with first derivative in -1 and 1 preserved and horizontal asintote (the saturating point) given by the user. To provide faster computation, this function can also approximate its behaviour with a sigmoid function which scales the given input using the specified range and asintote. To check the difference in those functions see the geogebra file provided in the extra folder of the github repository. Parameters ---------- x: ArrayLike The array or tensor to rescale. Rescaling can be perfomed along the last dimension. Tensors can also be placed in a GPU. Computation in this case is faster. Range: float, optional The range of values to rescale given in microVolt. It rescale linearly the values in [-range, range] to [-1, 1]. Must be a positive value. Default = 200 asintote: float, optional The horizontal asintote of the soft clipping part. Must be a value bigger than 1. Default = 1.2 scale: str, optional The scale of the EEG Samples. It can be: - 'mV' for milliVolt - 'uV' for microVolt - 'nV' for nanoVolt Default = 'mV' exact: bool, optional Whether to approximate the composed function (linear + exponential function) with a sigmoid. It will make the rescaling much faster but will not preserve the linearity in the range [-1, 1]. Example ------- >>> import selfeeg.utils >>> import torch >>> x = torch.zeros(16,32,1024) + torch.sin(torch.linspace(0, 8*torch.pi,1024))*500 >>> x_scaled = utils.RangeScaler(200, 2.5, 'uV' )(x) >>> print( x.max()<=2.5 and x.min()>=-2.5) # should return False >>> print( x_scaled.max()<=2.5 and x_scaled.min()>=-2.5) # should return True """ def __init__( self, Range: float = 200, asintote: float = 1.2, scale: str = "mV", exact: bool = True ): if Range < 0: raise ValueError("Range cannot be lower than 0") if asintote is None: asintote = 1.0 elif asintote < 1: raise ValueError("asintote must be a value bigger than 1") scale = scale.lower() if scale not in ["mv", "uv", "nv"]: raise ValueError("scale must be any of 'mV', 'uV', 'nV'") self.Range = Range self.asintote = asintote self.scale = scale self.exact = exact def __call__(self, x): """ :meta private: """ return scale_range_soft_clip(x, self.Range, self.asintote, self.scale, self.exact)
[docs] def torch_zscore( x: torch.Tensor, axis: int = -2, correction: int = 1, ) -> torch.Tensor: """ zscore operator for torch tensors. It is heavily based on scipy's zscore in order to provide identical results when using numpy arrays. The analogous command in scipy is: x_zscore = scipy.stats.zscore(x, axis=axis, ddof=correction) Parameters ---------- x: torch.Tensor The tensor to standardize. axis: int The axis along which to operate. By default, it assumes that the EEG channel dimension is the second to last. If the tensor has only one dimension, default value is changed to 0. Default = -2 correction: int difference between the sample size and sample degrees of freedom. It is applied during the calculation of the standard deviation. It is equivalent to the Scipy's zscore `ddof` argument. Default is Bessel's correction as used in Pytorch's std function. Default = 1 Returns ------- xz: torch.Tensor The tensor standardized along the given dimension. """ dims = len(x.shape) if dims == 0: raise ValueError("Got a tensor with 0 length") elif dims == 1: axis = 0 # get mean and standard deviation mn = x.mean(axis, keepdim=True) sd = x.std(axis, correction=correction, keepdim=True) # a solid solution implemented in scipy's zscore # to avoid 0 division or too large values x0 = x.min(axis=axis, keepdims=True)[0] iszero = torch.eq(x, x0).all(axis=axis, keepdims=True) # torch doesn't throw zero division warnings sd[iszero] = 1.0 xz = (x - mn) / sd # Put nans xz[torch.broadcast_to(iszero, x.shape)] = torch.nan return xz
[docs] class ZscoreScaler: """ zscore operator callable objects. It can accept both torch Tensors and numpy arrays. In case of torch Tensors are passed during call, ``torch_zscore`` is called. Parameters ---------- x: ArrayLike The ArrayLike object to standardize. axis: int The axis along which to operate. By default, it assumes that the EEG channel dimension is the second to last. If the tensor has only one dimension, default value is changed to 0. Default = -2 correction: int difference between the sample size and sample degrees of freedom. It is applied during the calculation of the standard deviation. It is equivalent to the Scipy's zscore `ddof` argument. Default is Bessel's correction as used in Pytorch's std function. Default = 1 """ def __init__(self, axis: int = -2, correction: int = 1): self.axis = axis self.correction = correction def __call__(self, x): if isinstance(x, torch.Tensor): return torch_zscore(x, self.axis, self.correction) else: return zscore(x, axis=self.axis, ddof=self.correction)
[docs] def torch_pchip( x: "1D Tensor", y: "ND Tensor", xv: "1D Tensor", save_memory: bool = True, new_y_max_numel: int = 4194304, ) -> torch.Tensor: """ performs the pchip interpolation on the last dimension of the input tensor. This function is a pytorch adaptation of the scipy's pchip_interpolate [pchip]_ . It performs sp-pchip interpolation (Shape Preserving Piecewise Cubic Hermite Interpolating Polynomial) on the last dimension of the y tensor. x is the original time grid and xv new virtual grid. So, the new values of y at time xv are given by the polynomials evaluated at the time grid x. This function is compatible with GPU devices. Parameters ---------- x: 1D Tensor Tensor with the original time grid. Must be the same length as the last dimension of y. y: ND Tensor Tensor to interpolate. The last dimension must be the time dimension of the signals to interpolate. xv: 1D Tensor Tensor with the new virtual grid, i.e. the time points where to interpolate save_memory: bool, optional Whether to perform the interpolation on subsets of the y tensor by recursive function calls or not. Does not apply if y is a 1-D tensor. If set to False memory usage can greatly increase (for example with a 128 MB tensor, the memory usage of the function is 1.2 GB), but it can speed up the process. However, this is not the case for all devices and performance may also decrease. Default = True new_y_max_numel: int, optional The number of elements which the tensor needs to surpass in order to make the function start doing recursive calls. It can be considered as an indicator of the maximum allowed memory usage since the lower the number, the lower the memory used. Default = 256*1024*16 (approximately 16s of recording of a 256 Channel EEG sampled at 1024 Hz). Returns ------- new_y: torch.Tensor The pchip interpolated tensor. Note ---- Some technical information and difference with other interpolation can be found here: https://blogs.mathworks.com/cleve/2012/07/16/splines-and-pchips/ Note ---- have a look also at the Scipy's documentation: https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.PchipInterpolator.html Some parts of the code are inspired from: https://github.com/scipy/scipy/blob/v1.10.1/scipy/interpolate/_cubic.py#L157-L302 References ---------- .. [pchip] https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.pchip_interpolate.html Example ------- >>> from scipy.interpolate import pchip_interpolate >>> import numpy as np >>> import selfeeg.utils >>> import torch >>> x = torch.zeros(16,32,1024) + torch.sin(torch.linspace(0, 8*torch.pi,1024))*500 >>> xnp = x.numpy() >>> x_pchip = utils.torch_pchip(torch.arange(1024), x, torch.linspace(0,1023,475)).numpy() >>> xnp_pchip = pchip_interpolate(np.arange(1024),xnp, np.linspace(0,1023,475), axis=-1) >>> print( ... np.isclose(x_pchip, xnp_pchip, rtol=1e-3,atol=0.5*1e-3).sum()==16*32*475 ... ) # Should return True """ if len(x.shape) != 1: raise ValueError( ["Expected 1D Tensor for x but received a ", str(len(x.shape)), "-D Tensor"] ) if len(xv.shape) != 1: raise ValueError( ["Expected 1D Tensor for xv but received a ", str(len(xv.shape)), "-D Tensor"] ) if x.shape[0] != y.shape[-1]: raise ValueError("x must have the same length than the last dimension of y") # Initialize the new interpolated tensor Ndim = len(y.shape) new_y = torch.empty((*y.shape[: (Ndim - 1)], xv.shape[0]), device=y.device) # If save_memory and the new Tensor size is huge, call recursively for # each element in the first dimension if save_memory: if Ndim > 1: if ((torch.numel(y) / y.shape[-1]) * xv.shape[0]) > new_y_max_numel: for i in range(new_y.shape[0]): new_y[i] = torch_pchip(x, y[i], xv) return new_y # This is a common part for every channel if x.device.type == "mps" or xv.device.type == "mps": # torch bucketize is not already implemented in mps unfortunately # need to pass in cpu and return to mps. Note that this is very slow # like 500 times slower. But at least it doesn't throw an error bucket = torch.bucketize(xv.to(device="cpu"), x.to(device="cpu")) - 1 bucket = bucket.to(device=x.device) else: bucket = torch.bucketize(xv, x) - 1 bucket = torch.clamp(bucket, 0, x.shape[0] - 2) tv_minus = (xv - x[bucket]).unsqueeze(1) infer_tv = torch.cat( (tv_minus**3, tv_minus**2, tv_minus, torch.ones(tv_minus.shape, device=tv_minus.device)), 1 ) h = x[1:] - x[:-1] Delta = (y[..., 1:] - y[..., :-1]) / h k = torch.sign(Delta[..., :-1] * Delta[..., 1:]) > 0 w1 = 2 * h[1:] + h[:-1] w2 = h[1:] + 2 * h[:-1] whmean = (w1 / Delta[..., :-1] + w2 / Delta[..., 1:]) / (w1 + w2) slope = torch.zeros(y.shape, device=y.device) slope[..., 1:-1][k] = whmean[k].reciprocal() slope[..., 0] = ((2 * h[0] + h[1]) * Delta[..., 0] - h[0] * Delta[..., 1]) / (h[0] + h[1]) slope_cond = torch.sign(slope[..., 0]) != torch.sign(Delta[..., 0]) slope[..., 0][slope_cond] = 0 slope_cond = torch.logical_and( torch.sign(Delta[..., 0]) != torch.sign(Delta[..., 1]), torch.abs(slope[..., 0]) > torch.abs(3 * Delta[..., 0]), ) slope[..., 0][slope_cond] = 3 * Delta[..., 0][slope_cond] slope[..., -1] = ((2 * h[-1] + h[-2]) * Delta[..., -1] - h[-1] * Delta[..., -2]) / ( h[-1] + h[-2] ) slope_cond = torch.sign(slope[..., -1]) != torch.sign(Delta[..., -1]) slope[..., -1][slope_cond] = 0 slope_cond = torch.logical_and( torch.sign(Delta[..., -1]) != torch.sign(Delta[..., -1]), torch.abs(slope[..., -1]) > torch.abs(3 * Delta[..., 1]), ) slope[..., -1][slope_cond] = 3 * Delta[..., -1][slope_cond] t = (slope[..., :-1] + slope[..., 1:] - Delta - Delta) / h a = (t) / h b = (Delta - slope[..., :-1]) / h - t py_coef = torch.stack((a, b, slope[..., :-1], y[..., :-1]), -1) new_y = (py_coef[..., bucket, :] * infer_tv).sum(axis=-1) return new_y
[docs] def create_dataset( folder_name: str = "Simulated_EEG", Sample_range: list = [512, 1025], Chans: int = 8, p: list = 0.8, return_labels: bool = False, seed: int = 1234, ) -> Optional[np.ndarray]: """ creates a simulated EEG dataset for normal abnormal binary classification. Samples have random length within a given range. Once called, the function will generate 1000 files in a new directory. Samples will have name 'A_B_C_D.pickle' with: 1. A = dataset ID 2. B = subject ID 3. C = session ID 4. D = trial ID. In total, ``create_dataset`` will generate files associated to: 1. 5 datasets (200 files per dataset) 2. 40 subjects per dataset 3. 5 sessions per subject 4. 1 trial per session. All files will store a dictionary with two keys: 1. 'data' = the array with random length and given channels (channels in column dimension) 2. 'label' = an integer with a random binary label (0=normal, 1=abnormal). EEG files have values in uV, with range at most in [-550,550] uV. Parameters ---------- folder_name: str, optional A string with the optional name of the subdirectory to store the generated files. Default = 'Simulated_EEG' Sample_range: list, optional A length 2 list with the possible minimum and maximum length of the generated EEGs. Default = [512, 1025] Chans: int, optional An integer defining the number of channels each EEG must have. Default = 8 p: float, optional A scalar in range [0, 1] with the probability of a sample being normal. Default = 0.8 seed: int, optional A seed to set for reproducibility. Default = 1234 Returns ------- classes: ArrayLike An array with the generated label. Index association is based on the file sorted by names. Example ------- >>> import selfeeg.utils >>> import glob >>> utils.create_dataset() >>> print(len(glob.glob('Simulated_EEG/*'))==1000) #shoud return True """ # Various checks if not (isinstance(Sample_range, list)): raise ValueError("Sample_range must be a list") else: if len(Sample_range) != 2: raise ValueError("Sample_range must have length 2") if Chans < 1: raise ValueError("Chans must be bigger than 1") if (p < 0) or (p > 1): raise ValueError("p must be in range [0, 1]") # create new sub-folder if that does not exist if not (os.path.isdir(folder_name)): os.mkdir(folder_name) # prepare elements for file generation Sample_range.sort() N = 1000 np.random.seed(seed=seed) classes = np.zeros(N) for i in range(N): # get random length and class label Sample = np.random.randint(Sample_range[0], Sample_range[1]) y = np.random.choice([0, 1], p=[p, 1 - p]) classes[i] = y # generate sample while being sure that values will not have # strange ranges x = 600 while np.max(x) > 550 or np.min(x) < -550: if y == 1: stderr = np.sqrt(122.35423) F1 = np.random.normal(0.932649, 0.040448) F0 = np.random.normal(2.1159355, 2.3523977) else: stderr = np.sqrt(454.232666) F1 = np.random.normal(0.9619603, 0.0301687) F0 = np.random.normal(-0.1810323, 3.4712047) x = np.zeros((Chans, Sample)) x[:, 0] = np.random.normal(0, stderr, Chans) for k in range(1, Sample): x[:, k] = F0 + F1 * x[:, k - 1] + np.random.normal(0, stderr, Chans) # store files sample = {"data": x, "label": y} A = int(i // 200) + 1 B = int((i - 200 * int(i // 200))) // 5 + 1 C = i % 5 + 1 file_name = "Simulated_EEG/" + str(A) + "_" + str(B) + "_" + str(C) + "_1.pickle" with open(file_name, "wb") as f: pickle.dump(sample, f) if return_labels: return classes
[docs] def check_models(model1: torch.nn.Module, model2: torch.nn.Module) -> bool: """ checks that two nn.Modules are equal. Parameters ---------- model1: nn.Module The first model to compare. model2: nn.Module The second model to compare. Returns ------- equals: bool A boolean stating if the models are equal or not. Example ------- >>> import selfeeg.models >>> model1 = models.EEGNet(4,8,512) >>> model2 = models.EEGNet(4,8,512) >>> print( utils.utils.check_models(model1,model2)) # Should return False >>> model2.load_state_dict(model1.state_dict()) >>> utils.check_models(model1,model2) # Should return False """ for p1, p2 in zip(model1.parameters(), model2.parameters()): if p1.data.ne(p2.data).sum() > 0: return False return True
[docs] def count_parameters( model: torch.nn.Module, return_table: bool = False, print_table: bool = False, add_not_trainable=False, ) -> [int, Optional[pd.DataFrame]]: """ counts the number of **trainable parameters** of a Pytorch's nn.Module. It can additionally create a two column dataframe with module's name and number of trainable parameters. Not trainable parameters can be also added to the table if specified. The implementation is an enriched implementation inspired from [stacko1]_ and [stacko2]_ . Parameters ---------- model: nn.Module The model to scroll. return_table: bool, optional Whether to return a with module's name and number of trainable parameters or not. Default = False print_table: bool, optional Whether to print the created table or not. Default = False add_not_trainable: bool, optional Whether to add blocks with 0 trainable parameters to the table or not. Default = False Returns ------- total_params: int The number of trainable parameters. layer_table: pd.DataFrame, optional A two column dataframe with module's name and number of trainable parameters. References ---------- .. [stacko1] https://stackoverflow.com/questions/49201236 .. [stacko2] https://discuss.pytorch.org/t/how-do-i-check-the-number-of-parameters-of-a-model/4325/9 Example ------- >>> import selfeeg.utils >>> import selfeeg.models >>> mdl = models.ShallowNet(4,8,1024) >>> for n, i in enumerate(mdl.parameters()): # bias require grad put to False ... i.requires_grad=False if n in [1,3,5,7] else True >>> a,b = utils.count_parameters(mdl, True,True,True) >>> print (b == 23760) # should return True """ table = [] total_params = 0 for name, parameter in model.named_parameters(): if not parameter.requires_grad: if add_not_trainable: params = 0 else: continue else: params = parameter.numel() table.append([name, params]) total_params += params layer_table = pd.DataFrame(table, columns=["Modules", "Parameters"]) if print_table: print(layer_table.to_string()) print("=" * len(layer_table.to_string().split("\n")[0])) char2add = len(layer_table.to_string().split("\n")[0].split("Modules")[0]) - 15 char2add2 = ( len(layer_table.to_string().split("\n")[0].split("Modules")[1]) - len(str(total_params)) - 1 ) print(" " * char2add + "TOTAL TRAINABLE PARAMS" + " " * char2add2, total_params) return (layer_table, total_params) if return_table else total_params
def _reset_seed( seed: int = None, reset_random: bool = True, reset_numpy: bool = True, reset_torch: bool = True, ) -> None: """ :meta private: """ if seed is not None: if seed <= 0: raise ValueError("seed must be a nonnegative number") if reset_numpy: np.random.seed(seed) if reset_random: random.seed(seed) if reset_torch: torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed)