import pandas as pd
import glob
import os
import zipfile
import wget
from collections import defaultdict
from io import StringIO
import re
import warnings
import warnings
from datetime import datetime, timedelta
# Ignore the FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)Carbon Dioxide and Methane Concentrations from the Indianapolis Flux Experiment (INFLUX)
Documentation of data transformation
This script was used to transform the NIST INFLUX dataset into meaningful csv files for ingestion to vector dataset.
selected_level="level1"
base_dir = "data/"
output_dir = "output/"
dat_file_pattern = f"{base_dir}/*/*.dat"
output_base_dataset_name = "PSU_INFLUX_INSITU"
constant_variables = ["datetime","latitude","longitude","level","elevation_m","intake_height_m","Instr"]
variables =[['CO2(ppm)'],['CH4(ppb)']] # exclude CO
metadata_link= "UrbanTestBed-Metadata - INFLUX.csv"# Functions
def filter_dict(site_dict, selected_level):
return {key: [x for x in value if selected_level in x] for key, value in site_dict.items()}
def flag_desired_level(df, desired_level):
df['is_max_height_data'] = df['level']== desired_level
return df
def add_location(link, site_number):
meta= pd.read_csv(link)
location =meta[meta['Station Code']==f"Site {site_number[-2:]}"][['City','State']]#(get the actual site number)
return location['City'].item()+","+location['State'].item()
def convert_to_datetime(row):
year = int(row['Year'])
doy = int(row['DOY'])
hour = int(row['Hour'])
# Create a datetime object for the start of the year
date = datetime(year, 1, 1) + timedelta(days=doy - 1)
# Add the hours
datetime_obj = date + timedelta(hours=hour)
# Format as yyyy-mm-ddThh:mm:ssZ
return datetime_obj.strftime('%Y-%m-%dT%H:%M:%SZ')
def download_and_extract_zip_files(base_dir, levels):
"""
Download, extract, and delete zip files for the specified levels.
Parameters:
base_dir (str): The base directory for storing the downloaded files.
levels (list): A list of levels to download and extract.
"""
# Ensure the base directory exists
os.makedirs(base_dir, exist_ok=True)
# Loop through the levels and handle the download and extraction
for level in levels:
download_link = f"https://www.datacommons.psu.edu/download/meteorology/influx/influx-tower-data/wmo-x2019-scale/level{level}.zip"
fname = download_link.split("/")[-1]
target_path = os.path.join(base_dir, fname)
# Download the zip file
wget.download(download_link, target_path)
print(f"Downloaded {download_link} to {target_path}")
# Extract the zip file
with zipfile.ZipFile(target_path, 'r') as zip_ref:
zip_ref.extractall(base_dir)
print(f"Extracted {fname}")
# Delete the zip file after extraction
os.remove(target_path)
def create_site_dict(pattern):
"""
Creates a dictionary where keys are site numbers extracted from file paths,
and values are lists of file paths corresponding to each site number.
Args:
- pattern (str): Glob pattern to match files.
Returns:
- dict: Dictionary mapping site numbers to lists of file paths.
"""
all_files = glob.glob(pattern)
site_dict = defaultdict(list)
for file_path in all_files:
site_number = file_path.split('_')[-4]
site_dict[site_number].append(file_path)
return dict(site_dict)
def process_site_files(site_number, file_list):
"""
Process files for a given site number and save the combined DataFrame to CSV.
Args:
- site_number (str): Site number to process.
- file_list (list): List of file paths corresponding to the site number.
"""
df = pd.DataFrame()
for file_path in file_list:
with open(file_path, 'r') as file:
data = file.read()
contents = data.split("\nSite")
lat = float((re.search(r'LATITUDE:\s*([0-9.]+)\s*[NS]', contents[0])).group(1))
lat_hemisphere = (re.search(r'LATITUDE:\s*([0-9.]+)\s*[NS]', contents[0])).group(0)[-1]
lon = float((re.search(r'LONGITUDE:\s*([0-9.]+)\s*[EW]', contents[0])).group(1))
lon_hemisphere = (re.search(r'LONGITUDE:\s*([0-9.]+)\s*[EW]', contents[0])).group(0)[-1]
level= file_path.split("/")[-2]
elevation= re.search(r'ALTITUDE:\s*([0-9.]+)\s*m\s*ASL', contents[0]).group(1)
intake_height= re.search(r'SAMPLING HEIGHT:\s*([0-9.]+)\s*m\s*AGL', contents[0]).group(1)
data_io = StringIO(contents[1])
tmp_data = pd.read_csv(data_io, delim_whitespace=True)
tmp_data = tmp_data.reset_index().rename(columns={'index': 'Site'})
tmp= tmp_data.query("Flag==1").copy()# 1 means no known problem, 0 is not recommemded, 9 is instrument issue (unrealistic)
#tmp['SiteCode'] = int(re.search(r'\d+', site_number).group())
tmp['latitude'] = lat
tmp['longitude'] = lon
tmp['level'] = int(re.search(r'\d+', level).group())
tmp['elevation_m'] = elevation
tmp['intake_height_m']= intake_height
if lat_hemisphere == 'S':
tmp['latitude'] = -1* tmp["latitude"]
if lon_hemisphere == 'W':
tmp['longitude'] = -1* tmp["longitude"]
df = pd.concat([df, tmp], ignore_index=True)
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)
os.makedirs(output_dir+"PSU_INFLUX_INSITU/", exist_ok=True)
df['datetime'] = df[["Year","DOY","Hour"]].apply(convert_to_datetime, axis=1)
df.reset_index(drop=True, inplace=True)
for v in variables:
tmp_file=df[constant_variables + v].copy()
tmp_file['unit'] = v[0][-4:-1] #CO2(ppm) get the unit only
tmp_file.rename(columns={v[0]: 'value'}, inplace=True)
tmp_file['location']= add_location(metadata_link, site_number)
tmp_file = flag_desired_level(tmp_file, 1) # Flagging only level 1 data
# Remove nan
tmp_file.dropna(subset=["value"], inplace=True)
#filter 0 values
tmp_file[tmp_file["value"]!=0].to_csv(f"{output_dir}/PSU_INFLUX_INSITU/NIST-FLUX-IN-{site_number}-{v[0][:-5]}-hourly-concentrations.csv", index=False)
print(f"CSV Created for Site {site_number}-{v[0][:-5]}!!!")
return
# Download and extract zip files
levels_to_download = range(1, 5)
#download_and_extract_zip_files(base_dir=base_dir, levels=levels_to_download)
# Create site dictionary
site_dict = create_site_dict(dat_file_pattern)
# Comment if you want data from all levels
#site_dict = filter_dict(site_dict, selected_level)
# Process each site's files
for site_number, file_list in site_dict.items():
print(f"Processing Site Number: {site_number}, Total Files: {len(file_list)}")
process_site_files(site_number, file_list)