Carbon Dioxide and Methane Concentrations from the Indianapolis Flux Experiment (INFLUX)

Documentation of data transformation
Author

Paridhi Parajuli

Published

September 19, 2024

This script was used to transform the NIST INFLUX dataset into meaningful csv files for ingestion to vector dataset.

import pandas as pd
import glob
import os
import zipfile
import wget
from collections import defaultdict
from io import StringIO
import re
import warnings
import warnings
from datetime import datetime, timedelta
# Ignore the FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)

selected_level="level1"
base_dir = "data/"
output_dir = "output/"
dat_file_pattern = f"{base_dir}/*/*.dat"
output_base_dataset_name = "PSU_INFLUX_INSITU" 
constant_variables = ["datetime","latitude","longitude","level","elevation_m","intake_height_m","Instr"]
variables =[['CO2(ppm)'],['CH4(ppb)']] # exclude CO
metadata_link= "UrbanTestBed-Metadata - INFLUX.csv"
# Functions
def filter_dict(site_dict, selected_level):
    return {key: [x for x in value if selected_level in x] for key, value in site_dict.items()}

def flag_desired_level(df, desired_level):
    df['is_max_height_data'] = df['level']== desired_level
    return df

def add_location(link, site_number):
    meta= pd.read_csv(link)
    location =meta[meta['Station Code']==f"Site {site_number[-2:]}"][['City','State']]#(get the actual site number)
    return location['City'].item()+","+location['State'].item()

def convert_to_datetime(row):
    year = int(row['Year'])
    doy = int(row['DOY'])
    hour = int(row['Hour'])
    
    # Create a datetime object for the start of the year
    date = datetime(year, 1, 1) + timedelta(days=doy - 1)
    # Add the hours
    datetime_obj = date + timedelta(hours=hour)
    # Format as yyyy-mm-ddThh:mm:ssZ
    return datetime_obj.strftime('%Y-%m-%dT%H:%M:%SZ')

def download_and_extract_zip_files(base_dir, levels):
    """
    Download, extract, and delete zip files for the specified levels.

    Parameters:
    base_dir (str): The base directory for storing the downloaded files.
    levels (list): A list of levels to download and extract.
    """
    # Ensure the base directory exists
    os.makedirs(base_dir, exist_ok=True)

    # Loop through the levels and handle the download and extraction
    for level in levels:
        download_link = f"https://www.datacommons.psu.edu/download/meteorology/influx/influx-tower-data/wmo-x2019-scale/level{level}.zip"
        fname = download_link.split("/")[-1]
        target_path = os.path.join(base_dir, fname)
        
        # Download the zip file
        wget.download(download_link, target_path)
        print(f"Downloaded {download_link} to {target_path}")

        # Extract the zip file
        with zipfile.ZipFile(target_path, 'r') as zip_ref:
            zip_ref.extractall(base_dir)
            print(f"Extracted {fname}")

        # Delete the zip file after extraction
        os.remove(target_path)

def create_site_dict(pattern):
    """
    Creates a dictionary where keys are site numbers extracted from file paths,
    and values are lists of file paths corresponding to each site number.
    
    Args:
    - pattern (str): Glob pattern to match files.
    
    Returns:
    - dict: Dictionary mapping site numbers to lists of file paths.
    """
    all_files = glob.glob(pattern)
    site_dict = defaultdict(list)
    
    for file_path in all_files:
        site_number = file_path.split('_')[-4]
        site_dict[site_number].append(file_path)
    
    return dict(site_dict)

def process_site_files(site_number, file_list):
    """
    Process files for a given site number and save the combined DataFrame to CSV.
    
    Args:
    - site_number (str): Site number to process.
    - file_list (list): List of file paths corresponding to the site number.
    """
    df = pd.DataFrame()
    
    for file_path in file_list:
        with open(file_path, 'r') as file:
            data = file.read()
            
        contents = data.split("\nSite")
        lat = float((re.search(r'LATITUDE:\s*([0-9.]+)\s*[NS]', contents[0])).group(1))
        lat_hemisphere = (re.search(r'LATITUDE:\s*([0-9.]+)\s*[NS]', contents[0])).group(0)[-1]
        
        lon = float((re.search(r'LONGITUDE:\s*([0-9.]+)\s*[EW]', contents[0])).group(1))
        lon_hemisphere = (re.search(r'LONGITUDE:\s*([0-9.]+)\s*[EW]', contents[0])).group(0)[-1]
        
        level= file_path.split("/")[-2]
        
        elevation= re.search(r'ALTITUDE:\s*([0-9.]+)\s*m\s*ASL', contents[0]).group(1)
        intake_height= re.search(r'SAMPLING HEIGHT:\s*([0-9.]+)\s*m\s*AGL', contents[0]).group(1)

        
        data_io = StringIO(contents[1])
        tmp_data = pd.read_csv(data_io, delim_whitespace=True)
        tmp_data = tmp_data.reset_index().rename(columns={'index': 'Site'})
        tmp= tmp_data.query("Flag==1").copy()# 1 means no known problem, 0 is not recommemded, 9 is instrument issue (unrealistic)
        #tmp['SiteCode'] = int(re.search(r'\d+', site_number).group()) 
        tmp['latitude'] = lat
        tmp['longitude'] = lon
        tmp['level'] = int(re.search(r'\d+', level).group())
        tmp['elevation_m'] = elevation
        tmp['intake_height_m']= intake_height

        if lat_hemisphere == 'S':
            tmp['latitude'] = -1* tmp["latitude"]
        if lon_hemisphere == 'W':
            tmp['longitude'] = -1* tmp["longitude"]

        df = pd.concat([df, tmp], ignore_index=True)

    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(output_dir+"PSU_INFLUX_INSITU/", exist_ok=True)
    

    df['datetime'] = df[["Year","DOY","Hour"]].apply(convert_to_datetime, axis=1)
    df.reset_index(drop=True, inplace=True)
    for v in variables:
        tmp_file=df[constant_variables + v].copy()
        tmp_file['unit'] = v[0][-4:-1] #CO2(ppm) get  the unit only
        
        tmp_file.rename(columns={v[0]: 'value'}, inplace=True)
        tmp_file['location']= add_location(metadata_link, site_number)
        tmp_file = flag_desired_level(tmp_file, 1) # Flagging only level 1 data

        # Remove nan
        tmp_file.dropna(subset=["value"], inplace=True)

        #filter 0 values
        tmp_file[tmp_file["value"]!=0].to_csv(f"{output_dir}/PSU_INFLUX_INSITU/NIST-FLUX-IN-{site_number}-{v[0][:-5]}-hourly-concentrations.csv", index=False)
        print(f"CSV Created for Site {site_number}-{v[0][:-5]}!!!")
    return 



# Download and extract zip files
levels_to_download = range(1, 5)
#download_and_extract_zip_files(base_dir=base_dir, levels=levels_to_download)

# Create site dictionary
site_dict = create_site_dict(dat_file_pattern)

# Comment if you want data from all levels
#site_dict = filter_dict(site_dict, selected_level)

# Process each site's files
for site_number, file_list in site_dict.items():
    print(f"Processing Site Number: {site_number}, Total Files: {len(file_list)}")
    process_site_files(site_number, file_list)
Back to top