Workflow steps:
import os, logging
## .env --> Setting the environment variable for output format programmatically
os.environ['ENV_PATH'] = 'raw_pandas.env' ## Set to `cleaned_pandas.env` or `raw_pandas.env` as needed
%run -i ./Data_Loading_and_Descriptive_Statistics.ipynb
# %run -i ./DataFrameAdapter.ipynb
The specified .env file: raw_pandas.env +-----------------------------+--------------------------------------------------------------------------------------------+-------------------+ | Variable | Description | Value | +-----------------------------+--------------------------------------------------------------------------------------------+-------------------+ | IS_TERADATA_VANTAGE | Using scable Teradata Vantage vs. local machine (Laptop) | True | | IS_DATA_IN_TERADATA_VANTAGE | Using TeradataDataFrame in scalable Vantage vs. PandasDataFrame with local *.csv/xls files | False | | SCHEMA_NAME | [Teradata Vantage] Schema Name | Air_Pollution | | TABLE_NAME | [Teradata Vantage] Table Name | Air_Pollution_Raw | | IS_JUPYTERLAB | Running in JupyterLab vs Python Dash/Vizro Dashboard | True | | IS_DEBUG | Debug mode is active or not | False | | USE_DATA_PREFIX | Prefix to use for data files: 'raw' | 'cleaned' | raw | +-----------------------------+--------------------------------------------------------------------------------------------+-------------------+ ... Logon successful Connected as: xxxxxsql://demo_user:xxxxx@host.docker.internal/dbc Engine(teradatasql://demo_user:***@host.docker.internal) ℹ️ Load Data from data/raw data/raw/raw_Penrose7-07May2020-to-30Apr2022.csv data/raw/raw_Takapuna23-07May2020-to-30Apr2022.csv ℹ️ Load Data from data/raw/raw_Penrose7-07May2020-to-30Apr2022.csv file --> rawdata DataFrame 📂 ℹ️ Load Data from data/raw/raw_Takapuna23-07May2020-to-30Apr2022.csv file --> rawdata DataFrame 📂 ℹ️ The Shape of the rawdata Dataframe: (37382, 15) ℹ️ The Shape of the Dataframe rawdata_site1 (Penrose) and rawdata_site2 (Takapuna): (20023, 12) (17359, 11) 🎓 Describing the types of each attribute as numerical_columns (Continuous), ordinal_columns (Ordinal), or nominal_columns (Nominal) ... ℹ️ Numerical Variables/Features: ['AQI', 'PM10', 'PM2.5', 'SO2', 'NO', 'NO2', 'NOx', 'Wind_Speed', 'Wind_Dir', 'Air_Temp', 'Rel_Humidity'] ℹ️ Ordinal Variables/Features: ['Timestamp'] ℹ️ Nominal Variables/Features: Index(['Site', 'Site_Class', 'Country'], dtype='object') 🎓 1. [Site 1 - Penrose][numerical_columns_S1, nominal_columns_S1] Summary Statistics of the Dataframe such as the mean, maximum and minimum values ... ℹ️ Numerical Variables/Features: ['AQI', 'PM10', 'PM2.5', 'SO2', 'NO', 'NO2', 'NOx', 'Wind_Speed', 'Wind_Dir', 'Air_Temp', 'Rel_Humidity'] ℹ️ Ordinal Variables/Features: ['Timestamp'] 🎓 2. [Site 2 - Takapuna][numerical_columns_S2, nominal_columns_S2] Summary Statistics of the {site2} Dataframe such as the mean, maximum and minimum values ... ℹ️ Numerical Variables/Features: ['AQI', 'PM10', 'PM2.5', 'NO', 'NO2', 'NOx', 'Wind_Speed', 'Wind_Dir', 'Air_Temp', 'Rel_Humidity'] ℹ️ Ordinal Variables/Features: ['Timestamp'] 🎓 Listing variables with description... +-------------------------+----------------------------------------------------------------+-----------+---------+----------+ | Variable Name | Description | All Sites | Penrose | Takapuna | +-------------------------+----------------------------------------------------------------+-----------+---------+----------+ | rawdata | Complete dataset containing all observations across all sites. | [x] | [x] | [x] | | numerical_columns_site1 | Numerical columns specific to Site 1. | [ ] | [x] | [ ] | | nominal_columns_site1 | Nominal columns specific to Site 1. | [ ] | [x] | [ ] | | numerical_columns_site2 | Numerical columns specific to Site 2. | [ ] | [ ] | [x] | | nominal_columns_site2 | Nominal columns specific to Site 2. | [ ] | [ ] | [x] | | rawdata_site1 | Subset of raw data for Site 1. | [ ] | [x] | [ ] | | rawdata_site2 | Subset of raw data for Site 2. | [ ] | [ ] | [x] | +-------------------------+----------------------------------------------------------------+-----------+---------+----------+
rawdata_site1
Timestamp | AQI | PM10 | PM2.5 | SO2 | NO | NO2 | NOx | Wind_Speed | Wind_Dir | Air_Temp | Rel_Humidity | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2020-05-07 00:00:00 | 26.0 | 16.5 | 16.1 | 1.8 | 60.3 | 40.9 | 101.2 | 0.6 | 316.0 | 8.0 | 78.0 |
1 | 2020-05-07 01:00:00 | 28.0 | 17.7 | 10.1 | 1.0 | NaN | NaN | NaN | 0.7 | 269.0 | 8.0 | 76.8 |
2 | 2020-05-07 02:00:00 | 28.0 | 15.0 | 10.3 | 0.1 | 16.0 | 29.2 | 45.3 | 1.0 | 180.0 | 8.0 | 78.4 |
3 | 2020-05-07 03:00:00 | 29.0 | 14.3 | 11.4 | 0.0 | 11.2 | 27.5 | 38.7 | 0.8 | 232.0 | 8.0 | 77.5 |
4 | 2020-05-07 04:00:00 | 30.0 | 8.8 | 10.6 | -0.1 | 12.0 | 28.5 | 40.5 | 0.8 | 274.0 | 7.0 | 80.1 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
20018 | 2022-04-30 20:00:00 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
20019 | 2022-04-30 21:00:00 | 24.0 | NaN | 1.9 | 0.6 | 5.8 | 25.4 | 31.3 | 0.8 | 124.0 | 15.0 | 85.4 |
20020 | 2022-04-30 21:00:00 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
20021 | 2022-04-30 22:00:00 | 24.0 | 8.2 | 6.0 | 0.5 | 1.8 | 17.9 | 19.7 | 0.8 | 108.0 | 14.0 | 86.0 |
20022 | 2022-04-30 23:00:00 | 24.0 | 2.3 | 5.4 | 0.5 | 1.0 | 7.3 | 8.3 | 0.9 | 93.0 | 14.0 | 84.4 |
20023 rows × 12 columns
print("\n🎓 [Site1 - Penrose] Summary Statistics of the {site1} cleaned_data1 Dataframe such as the mean, max/minimum values ...")
rawdata_site1.describe()
🎓 [Site1 - Penrose] Summary Statistics of the {site1} cleaned_data1 Dataframe such as the mean, max/minimum values ...
Timestamp | AQI | PM10 | PM2.5 | SO2 | NO | NO2 | NOx | Wind_Speed | Wind_Dir | Air_Temp | Rel_Humidity | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
count | 20023 | 17019.000000 | 16818.000000 | 16102.000000 | 16623.000000 | 16260.000000 | 16260.000000 | 16259.000000 | 17182.000000 | 17181.000000 | 17206.000000 | 17338.000000 |
mean | 2021-05-06 21:58:02.954601984 | 28.233974 | 13.881787 | 5.348180 | 0.851639 | 10.041562 | 14.123253 | 24.151350 | 2.807368 | 173.234387 | 16.748925 | 69.426087 |
min | 2020-05-07 00:00:00 | 0.000000 | -6.400000 | -12.300000 | -4.400000 | -1.900000 | -1.400000 | -1.300000 | 0.200000 | 1.000000 | 3.000000 | 26.900000 |
25% | 2020-11-07 02:30:00 | 22.000000 | 8.400000 | 2.100000 | 0.100000 | 0.800000 | 3.900000 | 5.100000 | 1.600000 | 73.000000 | 14.000000 | 60.200000 |
50% | 2021-05-08 19:00:00 | 27.000000 | 13.100000 | 4.700000 | 0.500000 | 3.600000 | 10.600000 | 14.900000 | 2.600000 | 211.000000 | 17.000000 | 70.400000 |
75% | 2021-11-04 15:30:00 | 33.000000 | 18.400000 | 7.700000 | 1.000000 | 10.900000 | 20.700000 | 31.650000 | 3.800000 | 235.000000 | 20.000000 | 79.500000 |
max | 2022-04-30 23:00:00 | 121.000000 | 196.700000 | 138.400000 | 25.200000 | 378.400000 | 83.100000 | 455.100000 | 8.200000 | 357.000000 | 28.000000 | 92.000000 |
std | NaN | 9.693312 | 8.044288 | 5.259335 | 1.507654 | 21.065762 | 12.748169 | 30.956155 | 1.538615 | 94.038611 | 4.013776 | 12.290750 |
rawdata_site2
Timestamp | AQI | PM10 | PM2.5 | NO | NO2 | NOx | Wind_Speed | Wind_Dir | Air_Temp | Rel_Humidity | |
---|---|---|---|---|---|---|---|---|---|---|---|
20023 | 2020-05-07 17:00:00 | 21.0 | 5.95 | 4.15 | 10.90 | 0.01715 | 28.00 | 2.50 | 242.0 | 15.5 | 68.15 |
20024 | 2020-05-07 18:00:00 | 21.0 | 5.65 | 5.10 | 8.20 | 0.01655 | 24.70 | 2.20 | 239.5 | 15.5 | 61.75 |
20025 | 2020-05-07 19:00:00 | 21.0 | 7.70 | 5.45 | 5.75 | 0.01325 | 19.00 | 2.10 | 244.0 | 15.0 | 59.45 |
20026 | 2020-05-07 20:00:00 | 21.0 | 8.20 | 5.45 | 3.50 | 0.00870 | 12.20 | 2.25 | 251.0 | 15.0 | 59.25 |
20027 | 2020-05-07 21:00:00 | 21.0 | 11.80 | 5.80 | 3.55 | 0.00930 | 12.90 | 2.10 | 261.0 | 15.0 | 61.75 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
37377 | 2022-04-30 19:00:00 | 14.0 | 4.75 | 3.30 | 0.60 | 0.00440 | 5.00 | 2.55 | 109.5 | 16.0 | 76.05 |
37378 | 2022-04-30 20:00:00 | 14.0 | 6.35 | 3.15 | 0.50 | 0.00365 | 4.15 | 2.45 | 105.5 | 16.0 | 74.50 |
37379 | 2022-04-30 21:00:00 | 14.0 | 6.05 | 2.80 | 0.40 | 0.00480 | 5.20 | 2.35 | 115.5 | 16.0 | 74.15 |
37380 | 2022-04-30 22:00:00 | 13.0 | 4.20 | 2.60 | 0.40 | 0.00555 | 5.90 | 1.95 | 122.5 | 16.0 | 75.95 |
37381 | 2022-04-30 23:00:00 | 13.0 | 5.00 | 2.80 | 0.35 | 0.00405 | 4.30 | 1.95 | 119.0 | 16.0 | 78.00 |
17359 rows × 11 columns
print("\n🎓 [Site2 - Takapuna] Summary Statistics of the {site2} cleaned_data2 Dataframe such as the mean, maximum and minimum values ...")
rawdata_site2.describe()
🎓 [Site2 - Takapuna] Summary Statistics of the {site2} cleaned_data2 Dataframe such as the mean, maximum and minimum values ...
Timestamp | AQI | PM10 | PM2.5 | NO | NO2 | NOx | Wind_Speed | Wind_Dir | Air_Temp | Rel_Humidity | |
---|---|---|---|---|---|---|---|---|---|---|---|
count | 17359 | 17359.000000 | 16579.000000 | 15807.000000 | 16187.000000 | 16185.000000 | 16187.000000 | 16773.000000 | 16771.000000 | 16785.000000 | 16921.000000 |
mean | 2021-05-04 08:00:00 | 25.152947 | 11.590983 | 6.251275 | 12.005912 | 0.012416 | 24.203614 | 2.501792 | 176.335132 | 16.280548 | 69.909931 |
min | 2020-05-07 17:00:00 | -13.000000 | -3.800000 | 0.450000 | -0.800000 | -0.000500 | -0.700000 | 0.200000 | 5.500000 | 3.500000 | 30.400000 |
25% | 2020-11-04 12:30:00 | 19.000000 | 7.400000 | 4.250000 | 2.450000 | 0.004000 | 7.050000 | 1.250000 | 75.500000 | 13.500000 | 61.250000 |
50% | 2021-05-04 08:00:00 | 24.000000 | 10.650000 | 5.550000 | 5.850000 | 0.009100 | 15.100000 | 2.300000 | 205.000000 | 16.000000 | 71.050000 |
75% | 2021-11-01 03:30:00 | 30.000000 | 14.500000 | 7.350000 | 12.200000 | 0.017700 | 29.400000 | 3.500000 | 261.000000 | 19.500000 | 79.850000 |
max | 2022-04-30 23:00:00 | 318.000000 | 437.250000 | 52.300000 | 352.450000 | 0.072100 | 401.950000 | 7.800000 | 346.500000 | 27.000000 | 90.350000 |
std | NaN | 10.637321 | 7.911251 | 3.507078 | 22.592626 | 0.010951 | 30.662867 | 1.468465 | 93.196025 | 3.959962 | 11.804317 |
def ensure_datetime_format(df, column_name='Timestamp'):
"""
Ensures the specified column is in datetime format.
Parameters:
- df (pd.DataFrame): The dataframe to process.
- column_name (str): The name of the column to format as datetime.
Returns:
- pd.DataFrame: DataFrame with the column in datetime format.
"""
try:
df[column_name] = pd.to_datetime(df[column_name], errors='coerce')
except Exception as e:
raise ValueError(f"Error converting {column_name} to datetime: {e}")
return df
def clean_timeseries_dataset(df, time_col='Timestamp'):
"""
Clean the dataset by ensuring datetime format for 'Timestamp',
removing rows where all values are NaN (except time_col),
and removing duplicate rows based on 'Timestamp'.
Parameters:
- df (pd.DataFrame): Input dataset.
- time_col (str): The column name for timestamp data.
Returns:
- pd.DataFrame: Cleaned dataset.
"""
print(f'\n🎓 Cleans the dataset by removing rows where all values are NaN and removing duplicate rows based on {time_col}')
if time_col not in df.columns:
raise ValueError(f"Column '{time_col}' not found in DataFrame.")
## Convert 'Timestamp' column to datetime format
df = ensure_datetime_format(df, column_name=time_col)
## Remove rows where all values are NaN, excluding 'Timestamp'
cols_except_time = [col for col in df.columns if col != time_col]
if IS_DEBUG:
rows_all_nans = df[df[cols_except_time].isna().all(axis=1)]
if not rows_all_nans.empty:
print("Rows where all values are NaN, excluding 'Timestamp':\n", rows_all_nans)
## Remove rows where all values are NaN, excluding 'Timestamp'
df = df.dropna(how='all', subset=cols_except_time)
if IS_DEBUG:
## Show duplicate rows based on 'Timestamp'
duplicates = df[df.duplicated(subset=[time_col], keep=False)]
if not duplicates.empty:
print("Duplicate rows based on 'Timestamp':\n", duplicates)
## Remove duplicate rows based on 'Timestamp'
df = df.drop_duplicates(subset=[time_col], keep='first')
## Next Steps:
# - Handling outliers in numeric columns
# - Filling missing values with appropriate imputation techniques
# - Normalizing or scaling numeric columns
# - Encoding categorical variables if necessary
## Optional: sort by 'Timestamp' for chronological order
# cleaned_df.sort_values(by=time_col, inplace=True)
return df
cleaned_data_site1 = clean_timeseries_dataset(rawdata_site1)
print(f"☑️ Original rawdata_site1 dataset shape: {rawdata_site1.shape}")
print(f"✅ Cleaned rawdata_site1 dataset shape: {cleaned_data_site1.shape}")
cleaned_data_site2 = clean_timeseries_dataset(rawdata_site2)
print(f"☑️ Original rawdata_site2 dataset shape: {rawdata_site2.shape}")
print(f"✅ Cleaned rawdata_site2 dataset shape: {cleaned_data_site2.shape}")
🎓 Cleans the dataset by removing rows where all values are NaN and removing duplicate rows based on Timestamp ☑️ Original rawdata_site1 dataset shape: (20023, 12) ✅ Cleaned rawdata_site1 dataset shape: (17375, 12) 🎓 Cleans the dataset by removing rows where all values are NaN and removing duplicate rows based on Timestamp ☑️ Original rawdata_site2 dataset shape: (17359, 11) ✅ Cleaned rawdata_site2 dataset shape: (17359, 11)
import holidays
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer, SimpleImputer
def before_and_after_values_visualization(data_site, imputed_data, method_description, numerical_columns, site, feature_column='PM10'):
"""
This function plot the comparison between the two sites before and after imputation, adjusting parameters based on the visualization type.
Applies the specified imputation function to the data, then visualizes the results using the specified visualization function.
Parameters:
- data: pd.DataFrame - The dataset to impute and visualize.
- imputed_data: pd.DataFrame - The imputed dataset.
- site: str - The name of the site/location for comparison.
- feature_column: str - The feature column on which to focus the visualization.
"""
if IS_DEBUG:
print(f"before_and_after_values_visualization {data_site.shape}:", imputed_data.shape, site, visualization_type, feature_column)
if not isinstance(data_site, pd.DataFrame): ## Ensure data is a DataFrame before proceeding
raise ValueError("Data must be a pandas DataFrame.")
## Plotting using Plotly: Setting up the figure to hold two subplots side by side
fig = make_subplots(rows=1, cols=1, shared_yaxes=True, subplot_titles=(f'Data Plot of {feature_column} at {site}'))
## Original and Modified Data for Site1: darkorange vs lightblue | dodgerblue; dash='dot' vs 'solid'
fig.add_trace(go.Scatter(x=imputed_data['Timestamp'], y=imputed_data[feature_column], mode='markers', name='Modified Site',
line=dict(color='darkorange', width=0.3, dash='dot')), row=1, col=1)
# fig.add_trace(go.Scatter(x=data_site['Timestamp'], y=data_site[feature_column], mode='lines+markers', name='Original Site',
fig.add_trace(go.Scatter(x=data_site['Timestamp'], y=data_site[feature_column], mode='markers', name='Original Site',
line=dict(color='dodgerblue', width=0.3, dash='dot'), opacity=0.8), row=1, col=1)
## Adjust the layout for a better/consistent look and display the plot
fig.update_layout(height=800, title_text=f'[{method_description}] Original vs Imputed Data Plot of {feature_column} at {site}',
xaxis_title="Timestamp", yaxis_title=feature_column,
xaxis=dict(
title="Timestamp",
tickmode='auto',
tickformat="%Y-Q%q", ## Format: Year - Q<Quarter>
dtick="M3", ## Set ticks every 3 months to indicate quarters
), legend=dict(orientation="h", ## Horizontal legend
yanchor="bottom", y=1.02, ## Position legend above the plot
xanchor="right", x=1, ## Align legend to the right
title='Legend:', ## Optional legend title
bgcolor='rgba(255,255,255,0.3)', ## Semi-transparent white background
bordercolor="lightgrey", borderwidth=1))
fig.show()
class DataImputer:
"""
Class to perform various imputation methods on a DataFrame.
Note: Imputation methods should directly accept a DataFrame and return the imputed DataFrame
"""
def __init__(self):
# if not isinstance(data, pd.DataFrame):
# raise ValueError("Data must be a pandas DataFrame")
pass
@staticmethod
def mean_mode_imputation(data, columns=None):
"""
Impute with Mean and Mode:
Replaces missing numerical values with the MEAN and missing categorical values with the MODE.
Args:
data (DataFrame): DataFrame containing the data.
columns (list, optional): Columns to impute. If None, applies to all columns.
Returns:
DataFrame: DataFrame with imputed values.
"""
if columns is None:
# Exclude 'Timestamp' column or any non-numeric columns
columns = [col for col in data.columns if data[col].dtype in ['float64', 'int64'] and col != 'Timestamp']
# print("Applying Mean/Mode Imputation...")
for col in data.columns:
if col == 'Timestamp': # Skip the 'Timestamp' column
continue
if data[col].dtype in ['float64', 'int64']:
imputer = SimpleImputer(strategy='mean')
else: ## Assuming categorical data: return_value[col].dtype.name == 'category'
imputer = SimpleImputer(strategy='most_frequent')
## The fit_transform function expects a 2D array, hence the double square brackets
data[col] = imputer.fit_transform(data[[col]])
return data
@staticmethod
def forward_backward_imputation(data):
"""
Impute with Forward and Backward propagation
Using ffill and bfill as a naive method, to complete the data.
Parameters:
data (pd.DataFrame): DataFrame to impute.
inplace (bool): If False, returns a new DataFrame, otherwise modifies the original DataFrame.
Returns:
pd.DataFrame: DataFrame with imputed values.
"""
# print("Applying Forward and Backward Imputation across the entire DataFrame ...")
## Forward fill (ffill) missing values
data.fillna(method='ffill', inplace=True)
## Backward fill (bfill) remaining missing values
data.fillna(method='bfill', inplace=True)
return data
@staticmethod
def interpolation_imputation(data, method='linear'):
"""
Impute with Interpolation
A linear interpolation method works by assuming a linear relationship between the observed points and drawing a straight line accordingly.
Polynomial curves or splines.
Interpolation is an effective approach to impute missing values in time series.
Polynomial interpolation fits a polynomial function to the observed data points and estimates the missing values based on this function.
It works best if the time series is reasonably smooth. In case there are sudden changes or outliers, a simpler approach such as forward filling might be a better option.
Parameters:
data (pd.DataFrame): DataFrame to impute.
method (str): Interpolation technique, default is 'linear'.
order (int, optional): The order of the polynomial for polynomial interpolation.
inplace (bool): If False, returns a new DataFrame, otherwise modifies the original DataFrame.
Returns:
pd.DataFrame: DataFrame with imputed values.
"""
# print("Applying Interpolation Imputation...")
if method == 'polynomial' and order is None:
raise ValueError("Order must be specified for polynomial interpolation")
data.interpolate(method=method, inplace=True, limit_direction='both')
return data
@staticmethod
def polynomial_imputation(data, method='polynomial', order=2):
"""
Impute with Interpolation
A linear interpolation method works by assuming a linear relationship between the observed points and drawing a straight line accordingly.
Polynomial curves or splines.
Interpolation is an effective approach to impute missing values in time series.
Polynomial interpolation fits a polynomial function to the observed data points and estimates the missing values based on this function.
It works best if the time series is reasonably smooth. In case there are sudden changes or outliers, a simpler approach such as forward filling might be a better option.
"""
# print("Applying Polynomial Imputation...")
if method == 'polynomial' and order is None:
raise ValueError("Order must be specified for polynomial interpolation")
# data.interpolate(method=method, order=order if order else None, inplace=True, limit_direction='both')
## Apply polynomial interpolation across columns
for column in data.select_dtypes(include=['float', 'int']).columns:
## Only interpolate if there are at least two non-NA values
if data[column].count() > 1:
data[column].interpolate(method='polynomial', order=order, inplace=True, limit_direction='both')
# return_value = return_value.interpolate(method='spline')
return data
@staticmethod
def moving_average_imputation(data, columns=None, window='168H', aggressive_iterative=False, min_periods=1, verbose=True):
# def moving_average_imputation(data, columns=None, window='168H', min_periods=1, verbose=False, inplace=True):
"""
Impute missing values using a moving average over a specified window.
This method is particularly useful for time series data where patterns such as seasonality are expected to play a significant role.
Only NaNs are filled with the rolling mean, preserving the integrity of original non-NaN values.
A moving average is better at adapting to changes by considering a few nearby data points to compute the mean.
Yet, it can still lead to biased results if the data is not missing at random.
Examples:
.rolling(window='52W' ## A year has 52 weeks (52 weeks * 7 days per week) approximately.
.rolling(window='168H', min_periods=1).mean() ## 168 hours = 7 days
Parameters:
data (DataFrame): The pandas DataFrame containing the data.
columns (list of str, optional): Specific columns to impute. If None, all numeric columns are used.
window (str): The size of the moving window (default is '168H' for 168 hours = 24 * 7 = 7 days).
min_periods (int): Minimum number of observations in window required to have a value (default is 1).
verbose (bool): If True, print additional information about the imputation process.
# inplace (bool): If True, modify the DataFrame in place. Otherwise, return a modified copy.
Returns:
DataFrame: The DataFrame with imputed values if inplace is False. Otherwise, None.
Notes:
- A rolling window imputation can adapt to changes by considering nearby data points. However, it may
introduce biases if the data is not missing at random.
- This method currently supports only numeric data for rolling calculations.
"""
## Ensure the 'Timestamp' column is in datetime format and set as index only if it's not already the index
if 'Timestamp' in data.columns:
data['Timestamp'] = pd.to_datetime(data['Timestamp'], dayfirst=True, errors='coerce')
data.set_index('Timestamp', inplace=True)
if not pd.api.types.is_datetime64_any_dtype(data.index):
raise ValueError("Data index must be a datetime type for time-based rolling windows.")
## Select columns if not provided
if columns is None:
## Select only numeric columns for rolling operation
columns = data.select_dtypes(include=[np.number]).columns.tolist()
if verbose:
print(f"\nApplying Moving Average Imputation over a specified {window} window on columns:", columns)
## Apply rolling mean to specified numeric columns
for column in columns:
## Check if the column is of a numeric type: boolean (b), integer (i), floating (f), or complex number (c)
if data[column].dtype.kind in 'bifc':
# if verbose:
# print(f"Processing column: {column}")
## Calculate the rolling mean and update NaNs only
rolling_mean = data[column].rolling(window=window, min_periods=min_periods).mean()
data[column].fillna(rolling_mean, inplace=True)
## [Option] Aggressive Iterative pass to fill remaining NaNs
if aggressive_iterative:
## Perform multiple passes over the data. In each pass, it tries to fill the NaNs with the rolling mean.
max_iterations = 10
for iteration in range(max_iterations):
change_made = False
for column in columns:
if data[column].isna().sum() > 0: ## Check if there are still NaNs to fill
# if verbose:
# print(f"Iterative imputation for column: {column}, iteration {iteration+1}")
before_fill = data[column].isna().sum()
data[column].fillna(
data[column].rolling(window=window, min_periods=min_periods).mean(),
inplace=True
)
after_fill = data[column].isna().sum()
if after_fill < before_fill:
change_made = True
if not change_made: ## No more NaNs can be filled
if verbose:
print(f"Processing column: {column}. Iterative pass to fill remaining NaNs in {iteration+1} iteration(s).")
# print(f"No more changes made in iteration {iteration+1}. Ending iterative imputation.")
break
## Check for any remaining NaNs and attempt further imputation if necessary
if not aggressive_iterative:
if data.isna().any().any():
if verbose:
print("Second pass for any remaining NaNs after initial imputation using bfill & ffill.")
for column in columns:
data[column] = data[column].fillna(method='bfill').fillna(method='ffill')
## Reset index to turn 'Timestamp' back into a column
data.reset_index(inplace=True)
return data
## TODO: Future Work!!!
#
# !pip install --upgrade prophet xgboost --user
# import functools
#
# from xgboost import XGBRegressor
# from prophet import Prophet
# from sklearn.ensemble import RandomForestRegressor
# ## TODO
# @staticmethod
# def prophet_imputation(data, date_column, target_columns, holidays=None):
# """
# Decorator for imputing missing values using Facebook/Meta Prophet time series forecasting model.
#
# Parameters:
# - dates_column: Name of the column containing the dates.
# - columns_to_impute: Columns to impute. If 'all', all columns except the dates column are imputed.
#
# Returns:
# - A wrapper function for the imputation.
# """
# # print("Applying Time-Series Prophet Imputation...")
# if date_column not in data.columns:
# raise ValueError(f"{date_column} is not a column in the DataFrame")
#
# for column in target_columns:
# ## Prepare DataFrame for Prophet
# df_prophet = pd.DataFrame({
# 'ds': data[date_column],
# 'y': data[column],
# # 'y': data[column].interpolate() ## Improved handling of missing values ?
# }).dropna()
#
# ## Initialize and fit Prophet model
# model = Prophet(
# daily_seasonality=True,
# weekly_seasonality=True,
# yearly_seasonality='auto',
# holidays=holidays
# )
# model.fit(df_prophet)
# ## TODO: Make future dataframe and predict
# # future = model.make_future_dataframe(periods=0, freq='H')
# # forecast = model.predict(future)
# forecast = model.predict(df_prophet[['ds']])
# ## Fill missing values in original data
# forecast.set_index('ds', inplace=True)
# data.set_index(date_column, inplace=True)
# # data[column].update(forecast)
# ## 'yhat' from Prophet forecast can be used as imputed values
# data[column].fillna(forecast['yhat'], inplace=True)
# data.reset_index(inplace=True)
# return data
#
#
# ## TODO
# @staticmethod
# def xgboost_imputation(data, target_columns):
# """
# Impute missing values in a DataFrame using XGBoost with Scikit-learn's IterativeImputer.
#
# Parameters:
# - data: DataFrame to be imputed.
# - target_columns: Columns to impute, could be 'all' or a list of column names.
#
# Returns:
# - The DataFrame with imputed values.
# """
# # print("Applying Machine-Learning XGBoost Interpolation Imputation...")
# ## TODO: Set default estimator if none provided -> Directly use XGBRegressor as the estimator
# # chosen_estimator = estimator if estimator is not None else XGBRegressor(n_estimators=100, random_state=0)
# # chosen_estimator = XGBRegressor(n_estimators=100, random_state=0)
# ## Perform imputation on numerical data
# # imputer = IterativeImputer(estimator=chosen_estimator, max_iter=10, random_state=0)
# imputer = IterativeImputer(
# estimator=XGBRegressor(n_estimators=100, random_state=0),
# max_iter=10,
# random_state=0
# )
# ## TODO: Apply imputer only on specified target columns
# # data[target_columns] = imputer.fit_transform(data[target_columns])
# # for column in target_columns:
# # if column not in data.columns:
# # raise ValueError(f"{column} is not a column in the DataFrame")
# # data[[column]] = imputer.fit_transform(data[[column]])
# # Perform imputation on specified target columns or all numerical columns if 'all'
# if target_columns == 'all':
# numerical_data = data.select_dtypes(include=[np.number])
# imputed_numerical_data = pd.DataFrame(IterativeImputer(estimator=chosen_estimator, max_iter=10, random_state=0).fit_transform(numerical_data), columns=numerical_data.columns, index=data.index)
# data.update(imputed_numerical_data)
# else:
# for column in target_columns:
# if column in data.columns:
# column_data = data[[column]].select_dtypes(include=[np.number])
# imputed_column_data = IterativeImputer(estimator=chosen_estimator, max_iter=10, random_state=0).fit_transform(column_data)
# data[column] = imputed_column_data
# return data
## Instantiate the class with rawdata
imputer = DataImputer()
nz_holidays = holidays.country_holidays('NZ')
# nz_holidays = holidays.country_holidays('NZ', subdiv='AUK')
## List of imputation methods to apply, directly referencing the methods of DataImputer as tuples of (method_function, method_description)
# (lambda data: DataImputer.mean_mode_imputation(data, columns=numerical_columns), "SimpleImputer Mean/Mode Imputation"),
# (lambda data: DataImputer.xgboost_imputation(data, target_columns=['PM2.5', 'PM10', 'NO2']), "Machine-Learning XGBoost Imputation"),
# (lambda data: DataImputer.prophet_imputation(data, date_column='Timestamp', target_columns=['PM2.5', 'PM10', 'NO2'], holidays=nz_holidays), "Time-Series Prophet Imputation"),
# (lambda data: DataImputer.moving_average_imputation(data, columns=timestamp_and_numerical_columns, window='168H', min_periods=1, verbose=True), "Moving Average Imputation"),
imputation_methods = [
(lambda data: DataImputer.mean_mode_imputation(data), "SimpleImputer Mean/Mode Imputation"),
(DataImputer.forward_backward_imputation, "Forward/Backward Imputation"),
(lambda data: DataImputer.interpolation_imputation(data, method='linear'), "Linear Interpolation"),
(lambda data: DataImputer.polynomial_imputation(data, method='polynomial', order=2), "Polynomial Interpolation (Order 2)"),
(lambda data: DataImputer.moving_average_imputation(data, window='24H', min_periods=1, verbose=True), "1-DAY Moving Average Imputation"),
(lambda data: DataImputer.moving_average_imputation(data, window='168H', min_periods=1, verbose=True), "1-WEEK Moving Average Imputation"),
]
## Iterate through each imputation method, apply it, and visualize the results (data vs imputed data visualization)
for method_function, method_description in imputation_methods:
# print(f"Applying imputation method function: {method_function}")
# print(f"Applying imputation method description: {method_description}")
print(f'✅ [DataImputer] {method_description}: Original vs Imputed Data Plots of PM2.5/PM10 at Penrose/Takapuna')
if IS_DEBUG:
print(f"{method_description} --------------Before Imputation--------------")
print(cleaned_data_site1.isnull().sum(axis = 0))
print(cleaned_data_site2.isnull().sum(axis = 0))
## Step 1. Apply the imputation function
imputed_cleaned_data_site1 = method_function(cleaned_data_site1.copy()) ## This calls the method on the DataImputer instance
imputed_cleaned_data_site2 = method_function(cleaned_data_site2.copy()) ## This calls the method on the DataImputer instance
if IS_DEBUG:
# print(type(imputed_cleaned_data_site1))
print(f"{method_description} --------------After Imputation--------------")
print(imputed_cleaned_data_site1.isnull().sum(axis = 0))
print(imputed_cleaned_data_site2.isnull().sum(axis = 0))
## Introduce a short pause (e.g., 30 seconds) to ensure plots are fully rendered before moving to the next item
time.sleep(60)
## Step 2. After imputation, we can now proceed with visualization or further processing
before_and_after_values_visualization(cleaned_data_site1.copy(), imputed_cleaned_data_site1.copy(), method_description, numerical_columns_site1, 'Penrose', feature_column='PM2.5')
before_and_after_values_visualization(cleaned_data_site1.copy(), imputed_cleaned_data_site1.copy(), method_description, numerical_columns_site1, 'Penrose', feature_column='PM10')
# before_and_after_values_visualization(cleaned_data_site2.copy(), imputed_cleaned_data_site2.copy(), method_description, numerical_columns_site2, 'Takapuna', feature_column='PM2.5')
# before_and_after_values_visualization(cleaned_data_site2.copy(), imputed_cleaned_data_site2.copy(), method_description, numerical_columns_site2, 'Takapuna', feature_column='PM10')
✅ [DataImputer] SimpleImputer Mean/Mode Imputation: Original vs Imputed Data Plots of PM2.5/PM10 at Penrose/Takapuna
✅ [DataImputer] Forward/Backward Imputation: Original vs Imputed Data Plots of PM2.5/PM10 at Penrose/Takapuna
✅ [DataImputer] Linear Interpolation: Original vs Imputed Data Plots of PM2.5/PM10 at Penrose/Takapuna
✅ [DataImputer] Polynomial Interpolation (Order 2): Original vs Imputed Data Plots of PM2.5/PM10 at Penrose/Takapuna
✅ [DataImputer] 1-DAY Moving Average Imputation: Original vs Imputed Data Plots of PM2.5/PM10 at Penrose/Takapuna Applying Moving Average Imputation over a specified 24H window on columns: ['AQI', 'PM10', 'PM2.5', 'SO2', 'NO', 'NO2', 'NOx', 'Wind_Speed', 'Wind_Dir', 'Air_Temp', 'Rel_Humidity'] Second pass for any remaining NaNs after initial imputation using bfill & ffill. Applying Moving Average Imputation over a specified 24H window on columns: ['AQI', 'PM10', 'PM2.5', 'NO', 'NO2', 'NOx', 'Wind_Speed', 'Wind_Dir', 'Air_Temp', 'Rel_Humidity'] Second pass for any remaining NaNs after initial imputation using bfill & ffill.
✅ [DataImputer] 1-WEEK Moving Average Imputation: Original vs Imputed Data Plots of PM2.5/PM10 at Penrose/Takapuna Applying Moving Average Imputation over a specified 168H window on columns: ['AQI', 'PM10', 'PM2.5', 'SO2', 'NO', 'NO2', 'NOx', 'Wind_Speed', 'Wind_Dir', 'Air_Temp', 'Rel_Humidity'] Second pass for any remaining NaNs after initial imputation using bfill & ffill. Applying Moving Average Imputation over a specified 168H window on columns: ['AQI', 'PM10', 'PM2.5', 'NO', 'NO2', 'NOx', 'Wind_Speed', 'Wind_Dir', 'Air_Temp', 'Rel_Humidity'] Second pass for any remaining NaNs after initial imputation using bfill & ffill.
def weekday_from_index(day_index):
"""
Convert a day index (0-6) to its corresponding weekday name.
Parameters:
- day_index (int): The index of the day where Monday is 0 and Sunday is 6.
Returns:
- str: The name of the weekday corresponding to the input index.
"""
weekdays = ["0.Monday", "1.Tuesday", "2.Wednesday", "3.Thursday", "4.Friday", "5.Saturday", "6.Sunday"]
if 0 <= day_index < len(weekdays):
return weekdays[day_index]
else:
raise ValueError("Day index must be in the range 0 to 6.")
def impute_with_historical_averages(data, column, max_lookback_periods, verbose=True):
"""
Helper function to impute missing values for a specific column in the dataset using historical averages.
Historical averages are calculated using data from the same hour and weekday from previous weeks.
Parameters:
- data (pd.DataFrame): DataFrame with a datetime index.
- column (str): The column in the DataFrame to impute.
- max_lookback_periods (int): Maximum number of weeks to look back for historical data.
- verbose (bool): If set to True, prints out additional debugging information.
Returns:
pd.Series: A series with imputed values.
"""
imputed_values = data[column].copy()
missing_indices = imputed_values[imputed_values.isna()].index
if verbose:
# print(f"Missing indices for column '{column}': {missing_indices}")
print(f"Imputing column: {column} & number of missing entries: {len(missing_indices)}")
## Loop over all missing data points
for current_missing_index in missing_indices:
historical_values = []
valid_data_found = False
debug_info = []
## Collect historical data points from previous weeks
for weeks_back in range(1, max_lookback_periods + 1):
target_hist_time = current_missing_index - pd.DateOffset(weeks=weeks_back)
if target_hist_time < data.index.min():
## Stop if there is no more historical data available
if verbose:
print(f"No historical data beyond: {data.index.min()}")
continue
if target_hist_time in data.index:
## Retrieve the historical value if it exists and is not NaN
historical_value = data.at[target_hist_time, column]
if pd.notna(historical_value):
historical_values.append(historical_value)
valid_data_found = True
## Collecting additional debug information
debug_info.append((target_hist_time, target_hist_time.weekday(), target_hist_time.hour, historical_value))
# ## TODO: In case if data is sparse
# ## while (count < max_lookback_periods) and still have previous data points (the same weekday and hour in previous weeks):
# count = 0
# while count < max_lookback_periods:
# target_hist_time = current_missing_index - pd.DateOffset(weeks=count)
# if target_hist_time < data.index.min():
# break ## Break if historical data point goes beyond the available data range
# if target_hist_time in data.index and pd.notna(data.at[target_hist_time, column]):
# ## process the data
# ## Perform calculations or data retrieval ...
# count += 1
## Compute the mean of the historical values if any are available
# if pd.notna(historical_value):
if valid_data_found:
mean_value = np.mean(historical_values)
imputed_values.at[current_missing_index] = mean_value
if verbose:
print(f"Imputed {current_missing_index} (Day: {weekday_from_index(current_missing_index.weekday())}, Hour: {current_missing_index.hour}): {mean_value}")
print("Historical data used:")
for info in debug_info:
print(f" - Date: {info[0]} (Day: {info[1]}, Hour: {info[2]}), Value: {info[3]}")
else:
if verbose:
print(f"No valid historical data found for {current_missing_index} (Day: {weekday_from_index(current_missing_index.weekday())}, Hour: {current_missing_index.hour}). Cannot impute.")
## Optional: handle cases with no available historical data
## Could assign a default value or use another imputation method HERE
## [ ] Assign a global median, mean, or mode (if precomputed)
## [ ] Use another method like interpolation
## [x] Leave as NaN if downstream processes can handle it
continue
# data[column] = imputed_values
return imputed_values
def historical_average_imputation(data, datetime_column_name='Timestamp', columns=None, max_lookback_periods=4, verbose=True):
"""
Imputes missing values in a DataFrame by calculating the average of similar (the same weekday and hour) historical data points in previous (max_lookback_periods=4) weeks.
Parameters:
- data (pd.DataFrame): DataFrame with a datetime index and column for values.
- columns (list of str, optional): Specific columns to impute. Defaults to all numeric columns if None.
- max_lookback_periods (int): Number of weeks/periods to look back for similar historical data points.
- verbose (bool): If True, print additional information about the imputation process.
Returns:
- pd.DataFrame: The DataFrame with imputed values.
"""
## Ensure the index is a DatetimeIndex
if not isinstance(data.index, pd.DatetimeIndex):
raise ValueError("Index must be of type DatetimeIndex for proper timedelta calculations.")
if columns is None:
columns = data.select_dtypes(include=[np.number]).columns.tolist()
start_time = time.time()
imputed_series = data.copy()
## Iterate over each column to apply imputation
for target_column in columns:
imputed_series[target_column] = impute_with_historical_averages(data, target_column, max_lookback_periods, verbose=verbose)
if verbose:
print(f"Starting imputation for {target_column}.")
data[target_column] = impute_with_historical_averages(data, target_column, max_lookback_periods, verbose)
## Optional: handle cases with no available historical data
if imputed_series.isna().any().any():
if verbose:
print("Second pass for any remaining NaNs after initial imputation using bfill & ffill.")
for column in columns:
imputed_series[column] = imputed_series[column].fillna(method='bfill').fillna(method='ffill')
# if imputed_series.isna().any().any():
# ## 1-WEEK Moving Average Imputation
# imputed_series = DataImputer.moving_average_imputation(imputed_series.copy(), window='168H', min_periods=1, verbose=True)
# if verbose:
elapsed_time = time.time() - start_time
print(f"Completed imputation for all columns in {elapsed_time:.2f} seconds.")
return imputed_series
if cleaned_data_site1.index.name != 'Timestamp':
## Check if 'Timestamp' is not the index and set it as the index if needed
## Set 'Timestamp' as the index of the DataFrame
cleaned_data_site1.set_index('Timestamp', inplace=True)
imputed_cleaned_data_site1 = historical_average_imputation(cleaned_data_site1.copy(), numerical_columns_site1, verbose=False)
## Check if there are any NaNs left in the simulated data
# remaining_nans = imputed_cleaned_data_site1.isna().any().any()
total_nans = imputed_cleaned_data_site1.isna().sum().sum()
print(f"Total NaNs in the DataFrame: {total_nans}")
Completed imputation for all columns in 10.92 seconds. Total NaNs in the DataFrame: 0
## Check if the index name of cleaned_data_site1 is 'Timestamp'
if imputed_cleaned_data_site1.index.name == 'Timestamp':
## Reset index to turn 'Timestamp' back into a column in imputed_cleaned_data_site1
imputed_cleaned_data_site1.reset_index(inplace=True)
print("Index 'Timestamp' has been reset to a column: imputed_cleaned_data_site1")
else:
print("Index name is not 'Timestamp'. No changes made.")
## Check if the index name of cleaned_data_site1 is 'Timestamp'
if cleaned_data_site1.index.name == 'Timestamp':
## Reset index to turn 'Timestamp' back into a column in imputed_cleaned_data_site1
cleaned_data_site1.reset_index(inplace=True)
print("Index 'Timestamp' has been reset to a column: cleaned_data_site1")
else:
print("Index name is not 'Timestamp'. No changes made.")
Index 'Timestamp' has been reset to a column: imputed_cleaned_data_site1 Index 'Timestamp' has been reset to a column: cleaned_data_site1
print(f"☑️ [Historical Average Imputation] cleaned_data_site1 shape: {cleaned_data_site1.shape}")
before_and_after_values_visualization(cleaned_data_site1.copy(), imputed_cleaned_data_site1.copy(), 'historical_average_imputation', numerical_columns_site1, 'Penrose', feature_column='PM2.5')
before_and_after_values_visualization(cleaned_data_site1.copy(), imputed_cleaned_data_site1.copy(), 'historical_average_imputation', numerical_columns_site1, 'Penrose', feature_column='PM10')
print(f"✅ [Historical Average Imputation] imputed_cleaned_data_site1 imputed shape: {imputed_cleaned_data_site1.shape}")
☑️ [Historical Average Imputation] cleaned_data_site1 shape: (17375, 12)
✅ [Historical Average Imputation] imputed_cleaned_data_site1 imputed shape: (17375, 12)
if cleaned_data_site2.index.name != 'Timestamp':
## Check if 'Timestamp' is not the index and set it as the index if needed
## Set 'Timestamp' as the index of the DataFrame
cleaned_data_site2.set_index('Timestamp', inplace=True)
imputed_cleaned_data_site2 = historical_average_imputation(cleaned_data_site2.copy(), numerical_columns_site2, verbose=False)
## Check if there are any NaNs left in the simulated data
# remaining_nans = imputed_cleaned_data_site2.isna().any().any()
total_nans = imputed_cleaned_data_site2.isna().sum().sum()
print(f"Total NaNs in the DataFrame: {total_nans}")
# if imputed_cleaned_data_site2.isna().any().any():
# ## 1-WEEK Moving Average Imputation
# imputed_cleaned_data_site2 = DataImputer.moving_average_imputation(imputed_cleaned_data_site2.copy(), window='168H', min_periods=1, verbose=True)
Completed imputation for all columns in 13.55 seconds. Total NaNs in the DataFrame: 0
## Check if the index name of cleaned_data_site1 is 'Timestamp'
if imputed_cleaned_data_site2.index.name == 'Timestamp':
## Reset index to turn 'Timestamp' back into a column in imputed_cleaned_data_site2
imputed_cleaned_data_site2.reset_index(inplace=True)
print("Index 'Timestamp' has been reset to a column: imputed_cleaned_data_site2")
else:
print("Index name is not 'Timestamp'. No changes made.")
## Check if the index name of cleaned_data_site1 is 'Timestamp'
if cleaned_data_site2.index.name == 'Timestamp':
## Reset index to turn 'Timestamp' back into a column in cleaned_data_site2
cleaned_data_site2.reset_index(inplace=True)
print("Index 'Timestamp' has been reset to a column: cleaned_data_site2")
else:
print("Index name is not 'Timestamp'. No changes made.")
Index 'Timestamp' has been reset to a column: imputed_cleaned_data_site2 Index 'Timestamp' has been reset to a column: cleaned_data_site2
print(f"☑️ [Historical Average Imputation] cleaned_data_site2 shape: {cleaned_data_site2.shape}")
# before_and_after_values_visualization(cleaned_data_site2.copy(), imputed_cleaned_data_site2, method_description, numerical_columns_site2, 'Takapuna', feature_column='PM10')
before_and_after_values_visualization(cleaned_data_site2.copy(), imputed_cleaned_data_site2, 'historical_average_imputation', numerical_columns_site1, 'Takapuna', feature_column='PM2.5')
before_and_after_values_visualization(cleaned_data_site2.copy(), imputed_cleaned_data_site2, 'historical_average_imputation', numerical_columns_site1, 'Takapuna', feature_column='PM10')
print(f"✅ [Historical Average Imputation] imputed_cleaned_data_site2 imputed shape: {imputed_cleaned_data_site2.shape}")
☑️ [Historical Average Imputation] cleaned_data_site2 shape: (17359, 11)
✅ [Historical Average Imputation] imputed_cleaned_data_site2 imputed shape: (17359, 11)
This approach first identifies the actual minimum and maximum values within the acceptable range (beyond the IQR bounds) and then calculates the proportion of data points beyond these winsorized limits. It's particularly useful when you want to apply winsorization directly based on the distribution of your data, adjusting only the extreme values that fall outside the calculated bounds. As a result, this approach is more precise and aligns better with the principle of winsorization—limiting the influence of extreme outliers without removing them from the dataset.
from scipy.stats.mstats import winsorize
IS_WINSORIZING_OUTLIERS = True
def calculate_iqr_bounds(data, column):
"""
Calculate the Interquartile Range (IQR) bounds for a given column in a DataFrame.
Args:
- data (pd.DataFrame): The DataFrame containing the data.
- column (str): The column name for which to calculate the IQR bounds.
Returns:
- tuple: A tuple containing the lower and upper bounds.
"""
## Calculate IQR
Q1 = data[column].quantile(0.25)
Q3 = data[column].quantile(0.75)
IQR = Q3 - Q1
## Determine bounds for Winsorization based on IQR: the limits for potential outliers
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return lower_bound, upper_bound, Q1, Q3, IQR
# def detect_and_winsorize_outliers(data, numerical_columns):
def detect_and_handle_outliers(data, numerical_columns):
"""
Detects outliers in numerical columns of a DataFrame using the IQR method and handles them using Winsorization.
e.g. Apply the IQR method to detect outliers & Winsorization to handle/normalize outliers
Limiting the influence of extreme outliers without removing them from the dataset.
Args:
- data (pd.DataFrame): The DataFrame to process.
- numerical_columns (list): A list of column names to check for and handle outliers.
Returns:
- pd.DataFrame: A DataFrame with the outliers handled.
"""
## Initialize an empty list to store summary statistics
outlier_summaries = []
for col in numerical_columns:
lower_bound, upper_bound, Q1, Q3, IQR = calculate_iqr_bounds(data, col)
## Calculate_percentiles_for_winsorization
lower_winsorize_limit = data[col][data[col] < lower_bound].count() / data[col].count()
upper_winsorize_limit = data[col][data[col] > upper_bound].count() / data[col].count()
## Collecting and calculating outlier metrics for summary
# outlier_mask = (data[col] < lower_bound) | (data[col] > upper_bound)
# num_outliers = outlier_mask.sum()
## Consider only non-NaN values for total records
# num_outliers = outlier_mask.notnull().sum()
## Determine the percentages of the data that are outliers; this is for Winsorization limits
num_lower_outliers = data[col][data[col] < lower_bound].count()
num_upper_outliers = data[col][data[col] > upper_bound].count()
num_outliers = num_lower_outliers + num_upper_outliers
lower_outlier_percentage = num_lower_outliers / num_outliers
upper_outlier_percentage = num_upper_outliers / num_outliers
percentage_outliers = num_outliers / len(data) * 100
if IS_WINSORIZING_OUTLIERS:
# Calculate the actual min and max within the acceptable range
valid_min = data[col][(data[col] >= lower_bound) & (data[col] <= upper_bound)].min()
valid_max = data[col][(data[col] >= lower_bound) & (data[col] <= upper_bound)].max()
## Calculate the fractions for Winsorization
lower_winsorize_limit = data[col][data[col] < valid_min].count() / len(data[col])
upper_winsorize_limit = data[col][data[col] > valid_max].count() / len(data[col])
## Apply Winsorization
# data[col] = winsorize(data[col], limits=(lower_winsorize_limit, upper_winsorize_limit))
data[col] = winsorize(data[col], limits=(0.013653, 0.030128))
# if IS_WINSORIZING_OUTLIERS:
## Instead of removing the outliers, we apply Winsorization (Winsorize column data)
## Winsorizing the data such that values beyond the limits are capped: Winsorize data points beyond the bounds
# data[col] = winsorize(data[col], limits=(lower_outlier_percentage, upper_outlier_percentage))
# data[col] = winsorize(data[col], limits=[lower_bound, upper_bound])
# data[col] = winsorize(data[col], limits=(lower_winsorize_limit, upper_winsorize_limit))
## Applying winsorization to the column
## Note: winsorize operates on arrays, hence .values is used. It also modifies data in-place.
# winsorized_data = winsorize(data[col].values, limits=(lower_winsorize_limit, upper_winsorize_limit))
# ## Updating the column in the DataFrame with the winsorized data
# data[col] = winsorized_data
## Apply Winsorization using calculated limits
## Note: It's crucial to ensure the limits are not NaN, indicating no outliers were found
# if pd.notnull(lower_winsorize_limit) and pd.notnull(upper_winsorize_limit):
# data[col] = winsorize(data[col], limits=[lower_winsorize_limit, upper_winsorize_limit])
## Collect Outlier Summary statistics
outlier_summaries.append({
'Column': col,
'Lower Winsorization Limit': lower_winsorize_limit,
'Upper Winsorization Limit': upper_winsorize_limit,
'Q1': Q1,
'Q3': Q3,
'IQR': IQR,
'Lower Bound': lower_bound,
'Upper Bound': upper_bound,
'Number of Outliers': num_outliers,
'Percentage of Outliers': percentage_outliers,
'No. of Lower Outliers': num_lower_outliers,
'No. of Upper Outliers': num_upper_outliers,
'Lower Outlier %': num_lower_outliers / data[col].notnull().sum(),
'Upper Outlier %': num_upper_outliers / data[col].notnull().sum()
# 'Lower Outlier %': lower_outlier_percentage * 100, ## Convert to percentage
# 'Upper Outlier %': upper_outlier_percentage * 100 ## Convert to percentage
})
outlier_summary_df = pd.DataFrame(outlier_summaries)
return data, outlier_summary_df
## Detect and Handle/Retrieve Outliers
winsorized_imputed_cleaned_data_site1, outlier_summary_df1 = detect_and_handle_outliers(imputed_cleaned_data_site1.copy(), numerical_columns_site1)
winsorized_imputed_cleaned_data_site2, outlier_summary_df2 = detect_and_handle_outliers(imputed_cleaned_data_site2.copy(), numerical_columns_site2)
## Display the outlier summary
print("Outlier Summary for Site 1 (Penrose):\n")
outlier_summary_df1
## The result is a DataFrame 'rawdata' with outliers normalized/adjusted based on IQR and Winsorization
# winsorized_imputed_cleaned_data_site1.head()
# winsorized_imputed_cleaned_data_site2.head()
## TODO
## After applying IQR and Winsorization, the data is now ready for further processing such as PCA
## The `preprocessed_data` is ready for further analysis like PCA
Outlier Summary for Site 1 (Penrose):
Column | Lower Winsorization Limit | Upper Winsorization Limit | Q1 | Q3 | IQR | Lower Bound | Upper Bound | Number of Outliers | Percentage of Outliers | No. of Lower Outliers | No. of Upper Outliers | Lower Outlier % | Upper Outlier % | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | AQI | 0.003453 | 0.022964 | 22.0 | 33.0 | 11.0 | 5.50 | 49.50 | 459 | 2.641727 | 60 | 399 | 0.003453 | 0.022964 |
1 | PM10 | 0.000058 | 0.016403 | 8.5 | 18.3 | 9.8 | -6.20 | 33.00 | 286 | 1.646043 | 1 | 285 | 0.000058 | 0.016403 |
2 | PM2.5 | 0.001439 | 0.036777 | 2.2 | 7.7 | 5.5 | -6.05 | 15.95 | 664 | 3.821583 | 25 | 639 | 0.001439 | 0.036777 |
3 | SO2 | 0.000058 | 0.071252 | 0.1 | 1.0 | 0.9 | -1.25 | 2.35 | 1239 | 7.130935 | 1 | 1238 | 0.000058 | 0.071252 |
4 | NO | 0.000000 | 0.084259 | 0.9 | 10.7 | 9.8 | -13.80 | 25.40 | 1464 | 8.425899 | 0 | 1464 | 0.000000 | 0.084259 |
5 | NO2 | 0.000000 | 0.028892 | 4.1 | 20.3 | 16.2 | -20.20 | 44.60 | 502 | 2.889209 | 0 | 502 | 0.000000 | 0.028892 |
6 | NOx | 0.000000 | 0.057784 | 5.3 | 31.1 | 25.8 | -33.40 | 69.80 | 1004 | 5.778417 | 0 | 1004 | 0.000000 | 0.057784 |
7 | Wind_Speed | 0.000000 | 0.002935 | 1.6 | 3.8 | 2.2 | -1.70 | 7.10 | 51 | 0.293525 | 0 | 51 | 0.000000 | 0.002935 |
8 | Wind_Dir | 0.000000 | 0.000000 | 74.0 | 235.0 | 161.0 | -167.50 | 476.50 | 0 | 0.000000 | 0 | 0 | 0.000000 | 0.000000 |
9 | Air_Temp | 0.001151 | 0.000000 | 14.0 | 20.0 | 6.0 | 5.00 | 29.00 | 20 | 0.115108 | 20 | 0 | 0.001151 | 0.000000 |
10 | Rel_Humidity | 0.000403 | 0.000000 | 60.2 | 79.5 | 19.3 | 31.25 | 108.45 | 7 | 0.040288 | 7 | 0 | 0.000403 | 0.000000 |
## Display the outlier summary
print("Outlier Summary or Site 2 (Takapuna):\n")
outlier_summary_df2
Outlier Summary or Site 2 (Takapuna):
Column | Lower Winsorization Limit | Upper Winsorization Limit | Q1 | Q3 | IQR | Lower Bound | Upper Bound | Number of Outliers | Percentage of Outliers | No. of Lower Outliers | No. of Upper Outliers | Lower Outlier % | Upper Outlier % | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | AQI | 0.001498 | 0.036350 | 19.00000 | 30.0000 | 11.00000 | 2.500000 | 46.500000 | 657 | 3.784780 | 26 | 631 | 0.001498 | 0.036350 |
1 | PM10 | 0.000115 | 0.026787 | 7.46875 | 14.4000 | 6.93125 | -2.928125 | 24.796875 | 467 | 2.690247 | 2 | 465 | 0.000115 | 0.026787 |
2 | PM2.5 | 0.000000 | 0.045740 | 4.30000 | 7.3000 | 3.00000 | -0.200000 | 11.800000 | 794 | 4.573996 | 0 | 794 | 0.000000 | 0.045740 |
3 | NO | 0.000000 | 0.088945 | 2.35000 | 11.9000 | 9.55000 | -11.975000 | 26.225000 | 1544 | 8.894522 | 0 | 1544 | 0.000000 | 0.088945 |
4 | NO2 | 0.000000 | 0.039864 | 0.00395 | 0.0171 | 0.01315 | -0.015775 | 0.036825 | 692 | 3.986405 | 0 | 692 | 0.000000 | 0.039864 |
5 | NOx | 0.000000 | 0.072873 | 6.91250 | 28.5000 | 21.58750 | -25.468750 | 60.881250 | 1265 | 7.287286 | 0 | 1265 | 0.000000 | 0.072873 |
6 | Wind_Speed | 0.000000 | 0.003456 | 1.25000 | 3.5000 | 2.25000 | -2.125000 | 6.875000 | 60 | 0.345642 | 0 | 60 | 0.000000 | 0.003456 |
7 | Wind_Dir | 0.000000 | 0.000000 | 76.50000 | 260.0000 | 183.50000 | -198.750000 | 535.250000 | 0 | 0.000000 | 0 | 0 | 0.000000 | 0.000000 |
8 | Air_Temp | 0.001325 | 0.000000 | 13.50000 | 19.5000 | 6.00000 | 4.500000 | 28.500000 | 23 | 0.132496 | 23 | 0 | 0.001325 | 0.000000 |
9 | Rel_Humidity | 0.000288 | 0.000000 | 61.20000 | 79.7000 | 18.50000 | 33.450000 | 107.450000 | 5 | 0.028804 | 5 | 0 | 0.000288 | 0.000000 |
## TODO
## Identify Outliers with extreme values
## Let's assume Air_temp > 40°C, Rel_humidity > 100, and PM2.5 > 100 are outliers
extreme_outliers = rawdata[(rawdata['Air_Temp'] > 40) | (rawdata['Rel_Humidity'] > 100) | (rawdata['PM2.5'] > 100)]
## Print information about missing data and outliers
extreme_outliers
## FIXME: winsorize(cleaned_data[FEATURE], (lower_outlier_percentage,upper_outlier_percentage))
# ## Winsorization to handle outliers
# cleaned_data['Air_Temp'] = winsorize(cleaned_data['Air_Temp'], (0.05, 0.05))
# cleaned_data['Rel_Humidity'] = winsorize(cleaned_data['Rel_Humidity'], (0.05, 0.05))
# cleaned_data['PM2.5'] = winsorize(cleaned_data['PM2.5'], (0.05, 0.05))
# cleaned_data['PM10'] = winsorize(cleaned_data['PM10'], (0.05, 0.05))
# ## Show first 5 rows of cleaned_data
# cleaned_data.head()
Timestamp | AQI | PM10 | PM2.5 | SO2 | NO | NO2 | NOx | Wind_Speed | Wind_Dir | Air_Temp | Rel_Humidity | Site | Site_Class | Country | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
11344 | 2021-06-25 21:00:00 | 56.0 | 8.0 | 138.4 | 0.5 | 1.4 | 26.2 | 27.6 | 2.6 | 338.0 | 14.0 | 63.9 | Penrose | Industrial / Traffic | New Zealand |
# imputed_cleaned_data_site1.shape
## [What If] Visualize the difference between before and after the Winsorization
before_and_after_values_visualization(imputed_cleaned_data_site1.copy(), winsorized_imputed_cleaned_data_site1, '[What-If] Winsorization', numerical_columns_site1, 'Penrose', feature_column='PM2.5')
before_and_after_values_visualization(imputed_cleaned_data_site1.copy(), winsorized_imputed_cleaned_data_site1, '[What-If] Winsorization', numerical_columns_site1, 'Penrose', feature_column='PM10')
# imputed_cleaned_data_site2.shape
## [What If] Visualize the difference between before and after the Winsorization
before_and_after_values_visualization(imputed_cleaned_data_site2.copy(), winsorized_imputed_cleaned_data_site2, '[What-If] Winsorization', numerical_columns_site2, 'Takapuna', feature_column='PM2.5')
before_and_after_values_visualization(imputed_cleaned_data_site2.copy(), winsorized_imputed_cleaned_data_site2, '[What-If] Winsorization', numerical_columns_site2, 'Takapuna', feature_column='PM10')
from prettytable import PrettyTable
def display_site_comparison():
"""
This function creates and displays a table that lists various data attributes along with descriptions and
their applicability to different datasets across sites using the PrettyTable library.
"""
## Define the PrettyTable table columns and set column alignments
table = PrettyTable()
table.field_names = ["Variable Name", "Description", "All Sites", "Penrose", "Takapuna"]
table.align["Variable Name"] = "l"
table.align["Description"] = "l"
table.align["All Sites"] = "c"
table.align["Penrose"] = "c"
table.align["Takapuna"] = "c"
## Define the data dictionary with clear structure that containing information about the datasets and placeholders for potential future expansion
data_dictionary = [
("rawdata", "Complete dataset containing all observations across all sites.", "[x]", "[x]", "[x]"),
("numerical_columns_site1", "Numerical columns specific to Site 1.", "[ ]", "[x]", "[ ]"),
("nominal_columns_site1", "Nominal columns specific to Site 1.", "[ ]", "[x]", "[ ]"),
("numerical_columns_site2", "Numerical columns specific to Site 2.", "[ ]", "[ ]", "[x]"),
("nominal_columns_site2", "Nominal columns specific to Site 2.", "[ ]", "[ ]", "[x]"),
("rawdata_site1", "Subset of raw data for Site 1.", "[ ]", "[x]", "[ ]"),
("rawdata_site2", "Subset of raw data for Site 2.", "[ ]", "[ ]", "[x]"),
("---------------------------", "---------------------------------------------------------------------", "---------", "-------", "--------"), ## Blank line for separation
("cleaned_data", "Cleaned dataset with preprocessing applied.", "[x]", "[x]", "[x]"),
("cleaned_ordinal_columns", "Ordinal columns in the cleaned dataset.", "[x]", "[x]", "[x]"),
("cleaned_numerical_columns", "Numerical columns in the cleaned dataset.", "[x]", "[x]", "[x]"),
("cleaned_nominal_columns", "Nominal columns in the cleaned dataset.", "[x]", "[x]", "[x]"),
("cleaned_data1", "Cleaned data for Site 1.", "[ ]", "[x]", "[ ]"),
("cleaned_data2", "Cleaned data for Site 2.", "[ ]", "[ ]", "[x]"),
("---------------------------", "---------------------------------------------------------------------", "---------", "-------", "--------"), ## Blank line for separation
("imputed_cleaned_data_site1", "Imputed Cleaned data for Site 1.", "[ ]", "[x]", "[ ]"),
("imputed_cleaned_data_site2", "Imputed Cleaned data for Site 2.", "[ ]", "[ ]", "[x]"),
("---------------------------", "---------------------------------------------------------------------", "---------", "-------", "--------"), ## Blank line for separation
("winsorized_imputed_cleaned_data_site1", "Winsorized Imputed Cleaned data for Site 1.", "[ ]", "[x]", "[ ]"),
("winsorized_imputed_cleaned_data_site2", "Winsorized Imputed Cleaned data for Site 2.", "[ ]", "[ ]", "[x]"),
# ("---------------------------", "---------------------------------------------------------------------", "---------", "-------", "--------"), ## Blank line for separation
# ("top_features_PM25_site1", "Top features correlated with PM2.5 at Site 1 (Penrose).", "[ ]", "[x]", "[ ]"),
# ("top_features_PM25_site2", "Top features correlated with PM2.5 at Site 2 (Takapuna).", "[ ]", "[ ]", "[x]"),
# ("top_features_PM10_site1", "Top features correlated with PM10 at Site 1.", "[ ]", "[x]", "[ ]"),
# ("top_features_PM10_site2", "Top features correlated with PM10 at Site 2.", "[ ]", "[ ]", "[x]"),
# ("summary_stats_PM25_penrose", "[PM2.5] Summary statistics for the Penrose site after preprocessing.", "[ ]", "[x]", "[ ]"),
# ("summary_stats_PM25_takapuna", "[PM2.5] Summary statistics for the Takapuna site after preprocessing.", "[ ]", "[ ]", "[x]"),
# ("summary_stats_PM10_penrose", "[PM10] Summary statistics for the Penrose site after preprocessing.", "[ ]", "[x]", "[ ]"),
# ("summary_stats_PM10_takapuna", "[PM10] Summary statistics for the Takapuna site after preprocessing.", "[ ]", "[ ]", "[x]"),
# ("selected_features_PM25_penrose", "[PM2.5] Target + Selected Features for the Penrose site.", "[ ]", "[x]", "[ ]"),
# ("selected_features_PM25_takapuna", "[PM2.5] Target + Selected Features for the Takapuna site.", "[ ]", "[ ]", "[x]"),
# ("selected_features_PM10_penrose", "[PM10] Target + Selected Features for the Penrose site.", "[ ]", "[x]", "[ ]"),
# ("selected_features_PM10_takapuna", "[PM10] Target + Selected Features for the Takapuna site.", "[ ]", "[ ]", "[x]"),
]
## Format for better readability
for entry in data_dictionary:
## Adding rows to the table
table.add_row(entry)
## Print the table in an organized format
print(table)
## Call the function to display the table
print('\n🎓 Listing variables with description...')
display_site_comparison()
🎓 Listing variables with description... +---------------------------------------+-----------------------------------------------------------------------+-----------+---------+----------+ | Variable Name | Description | All Sites | Penrose | Takapuna | +---------------------------------------+-----------------------------------------------------------------------+-----------+---------+----------+ | rawdata | Complete dataset containing all observations across all sites. | [x] | [x] | [x] | | numerical_columns_site1 | Numerical columns specific to Site 1. | [ ] | [x] | [ ] | | nominal_columns_site1 | Nominal columns specific to Site 1. | [ ] | [x] | [ ] | | numerical_columns_site2 | Numerical columns specific to Site 2. | [ ] | [ ] | [x] | | nominal_columns_site2 | Nominal columns specific to Site 2. | [ ] | [ ] | [x] | | rawdata_site1 | Subset of raw data for Site 1. | [ ] | [x] | [ ] | | rawdata_site2 | Subset of raw data for Site 2. | [ ] | [ ] | [x] | | --------------------------- | --------------------------------------------------------------------- | --------- | ------- | -------- | | cleaned_data | Cleaned dataset with preprocessing applied. | [x] | [x] | [x] | | cleaned_ordinal_columns | Ordinal columns in the cleaned dataset. | [x] | [x] | [x] | | cleaned_numerical_columns | Numerical columns in the cleaned dataset. | [x] | [x] | [x] | | cleaned_nominal_columns | Nominal columns in the cleaned dataset. | [x] | [x] | [x] | | cleaned_data1 | Cleaned data for Site 1. | [ ] | [x] | [ ] | | cleaned_data2 | Cleaned data for Site 2. | [ ] | [ ] | [x] | | --------------------------- | --------------------------------------------------------------------- | --------- | ------- | -------- | | imputed_cleaned_data_site1 | Imputed Cleaned data for Site 1. | [ ] | [x] | [ ] | | imputed_cleaned_data_site2 | Imputed Cleaned data for Site 2. | [ ] | [ ] | [x] | | --------------------------- | --------------------------------------------------------------------- | --------- | ------- | -------- | | winsorized_imputed_cleaned_data_site1 | Winsorized Imputed Cleaned data for Site 1. | [ ] | [x] | [ ] | | winsorized_imputed_cleaned_data_site2 | Winsorized Imputed Cleaned data for Site 2. | [ ] | [ ] | [x] | +---------------------------------------+-----------------------------------------------------------------------+-----------+---------+----------+
## Filter cleaned data for the specified sites
# imputed_cleaned_data_site1 = cleaned_data[cleaned_data['Site'] == 'Penrose']
# imputed_cleaned_data_site2 = cleaned_data[cleaned_data['Site'] == 'Takapuna']
#
## Remove redundant columns before saving to *.csv
# imputed_cleaned_data_site1 = cleaned_data_site1.drop(['SO2', 'Site', 'Site_Class'], axis=1)
# imputed_cleaned_data_site2 = cleaned_data_site2.drop(['Site', 'Site_Class'], axis=1)
imputed_cleaned_data_site1['Timestamp'] = imputed_cleaned_data_site1['Timestamp'].dt.strftime('%d/%m/%Y %H:%M')
imputed_cleaned_data_site2['Timestamp'] = imputed_cleaned_data_site2['Timestamp'].dt.strftime('%d/%m/%Y %H:%M')
CLEAN_DATA_PATH = 'data/cleaned'
imputed_cleaned_data_site1.to_csv(f"{CLEAN_DATA_PATH}/cleaned_Penrose7-07May2020-to-30Apr2022.csv", index=False)
imputed_cleaned_data_site2.to_csv(f"{CLEAN_DATA_PATH}/cleaned_Takapuna23-07May2020-to-30Apr2022.csv", index=False)