Source code for magpysv.denoise

# -*- coding: utf-8 -*-
#    Copyright (C) 2016  Grace Cox (University of Liverpool)
#
#    Released under the MIT license, a copy of which is located at the root of
#    this project.
"""Module containing functions to remove external signal from geomagnetic data.

Part of the MagPySV package for geomagnetic data analysis. This module provides
various functions to denoise geomagnetic data by performing principal component
analysis and identifying and removing outliers. Also contains an outlier
detection function based on median absolute deviation from the median (MAD).
"""


import pandas as pd
import magpysv.plots as plots
import numpy as np
from sklearn.decomposition import PCA as sklearnPCA
try:
    from sklearn.preprocessing import Imputer
except ImportError:
    from sklearn.impute import SimpleImputer as Imputer


[docs]def eigenvalue_analysis_impute(*, dates, obs_data, model_data, residuals, proxy_number=1): """Remove external signal from SV data using Principal Component Analysis. Perform principal component analysis (PCA) on secular variation residuals (the difference between the observed SV and that predicted by a geomagnetic field model) calculated from annual differences of monthly means at several observatories. Uses the imputer from sklearn.preprocessing to fill in missing data points and calculates the singular values of the data matrix for n observatories (uses Singular Values Decomposition, SVD). The residuals are rotated into the eigendirections and denoised using the method detailed in Wardinski & Holme (2011, GJI, https://doi.org/10.1111/j.1365-246X.2011.04988.x). The SV residuals of the noisy component for all observatories combined are used as a proxy for the unmodelled external signal. The denoised data are then rotated back into geographic coordinates. The pca algorithm outputs the singular values (these are equal to the square root of the eigenvalues of the covariance matrix) sorted from largest to smallest, so the corresponding eigenvector matrix has the 'noisy' direction in the first column and the 'clean' direction in the final column. Note that the SVD algorithm cannot be used if any data are missing, which is why imputation is needed with this method. The function denoise.eigenvalue_analysis permits missing values and does not infill them - that is the more robust function. Smallest eigenvalue: 'quiet' direction Largest eiegenvalue: 'noisy' direction Args: dates (datetime.datetime): dates of the time series measurements. obs_data (pandas.DataFrame): dataframe containing columns for monthly/annual means of the X, Y and Z components of the secular variation at the observatories of interest. model_data (pandas.DataFrame): dataframe containing columns for field model prediction of the X, Y and Z components of the secular variation at the same observatories as in obs_data. residuals (pandas.DataFrame): dataframe containing the SV residuals (difference between the observed data and model prediction). proxy_number (int): the number of 'noisy' directions used to create the proxy for the external signal removal. Default value is 1 (only the residual in the direction of the largest eigenvalue is used). Using n directions means that proxy is the sum of the SV residuals in the n noisiest eigendirections. Returns: (tuple): tuple containing: - denoised_sv (*pandas.DataFrame*): dataframe with dates in the first column and columns for the denoised X, Y and Z secular variation components at each of the observatories for which data were provided. - proxy (*array*): the signal that was used as a proxy for unmodelled external magnetic field in the denoising stage. - eig_values (*array*): the singular values of the obs_data matrix. - eig_vectors (*array*): the eigenvectors associated with the n largest singular values of the data matrix. For example, if the residuals in the two 'noisiest' directions are used as the proxy for external signal, then these two eigenvectors are returned. - projected_residuals (*array*): SV residuals rotated into the eigendirections. - corrected_residuals (*array*): SV residuals after the denoising process. """ # Fill in missing SV values (indicated as NaN in the data files) imp = Imputer(missing_values='NaN', strategy='mean', axis=0) imputed_residuals = imp.fit_transform(residuals) pca = sklearnPCA() projected_residuals = pca.fit_transform(imputed_residuals) eig_values = pca.explained_variance_ eig_vectors = pca.components_ # Use the method of Wardinski & Holme (2011) to remove unmodelled external # signal in the SV residuals. The variable 'proxy' contains the noisy # component residual for all observatories combined corrected_residuals = [] if proxy_number == 1: noisy_direction = eig_vectors[:, 0] proxy = projected_residuals[:, 0] for idx in range(len(proxy)): corrected_residuals.append( imputed_residuals.data[idx, :] - proxy[idx] * noisy_direction) elif proxy_number > 1: noisy_direction = eig_vectors[:, 0:proxy_number] proxy = np.sum(projected_residuals[:, 0:proxy_number], axis=1) for idx in range(len(projected_residuals[:, 0])): corrected = imputed_residuals.data[idx, :] for direction in range(proxy_number): corrected = corrected - projected_residuals[idx, direction] \ * noisy_direction[:, direction] corrected_residuals.append(corrected) corrected_residuals = pd.DataFrame(corrected_residuals, columns=obs_data.columns) denoised_sv = pd.DataFrame( corrected_residuals.values + model_data.values, columns=obs_data.columns) denoised_sv.insert(0, 'date', dates) return denoised_sv, proxy, eig_values, eig_vectors, projected_residuals,\ corrected_residuals.astype('float')
[docs]def eigenvalue_analysis(*, dates, obs_data, model_data, residuals, proxy_number=1): """Remove external signal from SV data using principal Component Analysis. Perform principal component analysis (PCA) on secular variation residuals (the difference between the observed SV and that predicted by a geomagnetic field model) calculated from annual differences of monthly means at several observatories. Uses masked arrays to discount missing data points and calculates the eigenvalues/vectors of the (3nx3n) covariance matrix for n observatories. The residuals are rotated into the eigendirections and denoised using the method detailed in Wardinski & Holme (2011, GJI, https://doi.org/10.1111/j.1365-246X.2011.04988.x). The SV residuals of the noisy component for all observatories combined are used as a proxy for the unmodelled external signal. The denoised data are then rotated back into geographic coordinates. The PCA algorithm outputs the eigenvalues sorted from largest to smallest, so the corresponding eigenvector matrix has the 'noisy' direction in the first column and the 'clean' direction in the final column. This algorithm masks missing data so that they are not taken into account during the PCA. Missing values are not infilled or estimated, so NaN values in the input dataframe are given as NaN values in the output. Smallest eigenvalue 'quiet' direction Largest eiegenvalue 'noisy' direction Args: dates (datetime.datetime): dates of the time series measurements. obs_data (pandas.DataFrame): dataframe containing columns for monthly/annual means of the X, Y and Z components of the secular variation at the observatories of interest. model_data (pandas.DataFrame): dataframe containing columns for field model prediction of the X, Y and Z components of the secular variation at the same observatories as in obs_data. residuals (pandas.DataFrame): dataframe containing the SV residuals (difference between the observed data and model prediction). proxy_number (int): the number of 'noisy' directions used to create the proxy for the external signal removal. Default value is 1 (only the residual in the direction of the largest eigenvalue is used). Using n directions means that proxy is the sum of the SV residuals in the n noisiest eigendirections. Returns: (tuple): tuple containing: - denoised_sv (*pandas.DataFrame*): dataframe with datetime objects in the first column and columns for the denoised X, Y and Z SV components at each of the observatories for which data were provided. - proxy (*array*): the signal that was used as a proxy for unmodelled external magnetic field in the denoising stage. - eig_values (*array*): the eigenvalues of the obs_data matrix. - eig_vectors (*array*): the eigenvectors associated with the n largest eigenvalues of the data matrix. For example, if the residuals in the two 'noisiest' directions are used as the proxy for external signal, then these two eigenvectors are returned. - projected_residuals (*array*): SV residuals rotated into the eigendirections. - corrected_residuals (*array*): SV residuals after the denoising process. - covariance_matrix (*array*): residuals covariance matrix. """ # Create a masked version of the residuals array so that we can perform the # PCA ignoring all nan values masked_residuals = np.ma.array(residuals, mask=np.isnan(residuals)) # Calculate the covariance matrix of the masked residuals array covariance_matrix = np.ma.cov(masked_residuals, rowvar=False, allow_masked=True) # Calculate the eigenvalues and eigenvectors of the covariance matrix eig_values, eig_vectors = np.linalg.eig(covariance_matrix) # Sort the absolute values of the eigenvalues in decreasing order idx = np.argsort(np.abs(eig_values))[::-1] eig_values = eig_values[idx] # Sort the eigenvectors according to the same index eig_vectors = eig_vectors[:, idx] # Project the residuals onto the eigenvectors projected_residuals = np.ma.dot(masked_residuals, eig_vectors) # Use the method of Wardinski & Holme (2011) to remove unmodelled external # signal in the SV residuals. The variable 'proxy' contains the noisy # component residual for all observatories combined corrected_residuals = [] if proxy_number == 1: noisy_direction = eig_vectors[:, 0] proxy = projected_residuals[:, 0] for idx in range(len(proxy)): corrected_residuals.append( masked_residuals.data[idx, :] - proxy[idx] * noisy_direction) elif proxy_number > 1: noisy_direction = eig_vectors[:, 0:proxy_number] proxy = np.sum(projected_residuals[:, 0:proxy_number], axis=1) for idx in range(len(projected_residuals[:, 0])): corrected = masked_residuals.data[idx, :] for direction in range(proxy_number): corrected = corrected - projected_residuals[idx, direction] \ * noisy_direction[:, direction] corrected_residuals.append(corrected) corrected_residuals = pd.DataFrame(corrected_residuals, columns=obs_data.columns) denoised_sv = pd.DataFrame( corrected_residuals.values + model_data.values, columns=obs_data.columns) denoised_sv.insert(0, 'date', dates) return denoised_sv, proxy, np.abs(eig_values), eig_vectors,\ projected_residuals, corrected_residuals.astype('float'),\ covariance_matrix
[docs]def detect_outliers(*, dates, signal, obs_name, window_length, threshold, signal_type='SV', plot_fig=False, save_fig=False, write_path=None, fig_size=(8, 6), font_size=12, label_size=16): """Detect outliers in a time series and remove them. Use the median absolute deviation from the median (MAD) to identify outliers. The time series are long and highly variable so it is not appropriate to use single values of median to represent the whole series. The function uses a running median to better characterise the series (the window length and a threshold value stating many MADs from the median a point must be before it is classed as an outlier are user-specified). Args: dates (datetime.datetime): dates of the time series measurements. signal (array): array (or column from a pandas.DataFrame) containing the time series of interest. obs_name (str): states the component of interest and the three digit IAGA observatory name. window_length (int): number of months over which to take the running median. threshold (float): the minimum number of median absolute deviations a point must be away from the median in order to be considered an outlier. signal_type (str): specify whether magnetic field ('MF') or secular variation ('SV') is plotted. Defaults to SV. plot_fig (bool): option to plot figure of the time series and identified outliers. Defaults to False. save_fig (bool): option to save figure if plotted. Defaults to False. write_path (str): output path for figure if saved. fig_size (array): figure size in inches. Defaults to 8 inches by 6 inches. font_size (int): font size for axes. Defaults to 12 pt. label_size (int): font size for axis labels. Defaults to 16 pt. Returns: signal (array): the input signal with identified outliers removed (set to NaN). """ signal_temp = pd.DataFrame(data=signal.copy()) # Account for missing values when using rolling_median and rolling_std. # ffill (bfill) propagates the closest value forwards (backwards) through # nan values. E.g. [np.nan, np.nan, 1, 9, 7, np.nan, np.nan] returns as # [1, 1, 1, 9, 7, 7, 7]. The limit of half the window length is used so the # first ffill cannot overwrite the beginning of the next valid interval # (bfill values are used there instead). signal_temp = signal_temp.ffill(limit=int(window_length / 2 + 1)).bfill() # calculate the running median and median absolute standard deviation running_median = signal_temp.rolling(window=window_length, center=True).median().bfill().ffill() diff = (signal_temp - running_median).abs() med_abs_deviation = diff.rolling(window=window_length, center=True).median().bfill().ffill() # Normalise the median abolute deviation modified_z_score = diff / med_abs_deviation # Identify outliers outliers = signal_temp[modified_z_score > threshold] # Plot the outliers and original time series if required if plot_fig is True: plots.plot_outliers(dates=dates, obs_name=obs_name, signal=signal, outliers=outliers, save_fig=save_fig, write_path=write_path, fig_size=fig_size, font_size=font_size, label_size=label_size, signal_type=signal_type) # Set the outliers to NaN idx = np.where(modified_z_score > threshold)[0] signal.iloc[idx] = np.nan return signal.astype('float')