import pandas as pd
import glob
import os
import zipfile
import wget
from collections import defaultdict
from io import StringIO
import re
import warnings
import warnings
from datetime import datetime, timedelta
# Ignore the FutureWarning
"ignore", category=FutureWarning) warnings.filterwarnings(
Carbon Dioxide and Methane Concentrations from the Indianapolis Flux Experiment (INFLUX)
Documentation of data transformation
This script was used to transform the NIST INFLUX dataset into meaningful csv files for ingestion to vector dataset.
="level1"
selected_level= "data/"
base_dir = "output/"
output_dir = f"{base_dir}/*/*.dat"
dat_file_pattern = "PSU_INFLUX_INSITU"
output_base_dataset_name = ["datetime","latitude","longitude","level","elevation_m","intake_height_m","Instr"]
constant_variables =[['CO2(ppm)'],['CH4(ppb)']] # exclude CO
variables = "UrbanTestBed-Metadata - INFLUX.csv" metadata_link
# Functions
def filter_dict(site_dict, selected_level):
return {key: [x for x in value if selected_level in x] for key, value in site_dict.items()}
def flag_desired_level(df, desired_level):
'is_max_height_data'] = df['level']== desired_level
df[return df
def add_location(link, site_number):
= pd.read_csv(link)
meta=meta[meta['Station Code']==f"Site {site_number[-2:]}"][['City','State']]#(get the actual site number)
location return location['City'].item()+","+location['State'].item()
def convert_to_datetime(row):
= int(row['Year'])
year = int(row['DOY'])
doy = int(row['Hour'])
hour
# Create a datetime object for the start of the year
= datetime(year, 1, 1) + timedelta(days=doy - 1)
date # Add the hours
= date + timedelta(hours=hour)
datetime_obj # Format as yyyy-mm-ddThh:mm:ssZ
return datetime_obj.strftime('%Y-%m-%dT%H:%M:%SZ')
def download_and_extract_zip_files(base_dir, levels):
"""
Download, extract, and delete zip files for the specified levels.
Parameters:
base_dir (str): The base directory for storing the downloaded files.
levels (list): A list of levels to download and extract.
"""
# Ensure the base directory exists
=True)
os.makedirs(base_dir, exist_ok
# Loop through the levels and handle the download and extraction
for level in levels:
= f"https://www.datacommons.psu.edu/download/meteorology/influx/influx-tower-data/wmo-x2019-scale/level{level}.zip"
download_link = download_link.split("/")[-1]
fname = os.path.join(base_dir, fname)
target_path
# Download the zip file
wget.download(download_link, target_path)print(f"Downloaded {download_link} to {target_path}")
# Extract the zip file
with zipfile.ZipFile(target_path, 'r') as zip_ref:
zip_ref.extractall(base_dir)print(f"Extracted {fname}")
# Delete the zip file after extraction
os.remove(target_path)
def create_site_dict(pattern):
"""
Creates a dictionary where keys are site numbers extracted from file paths,
and values are lists of file paths corresponding to each site number.
Args:
- pattern (str): Glob pattern to match files.
Returns:
- dict: Dictionary mapping site numbers to lists of file paths.
"""
= glob.glob(pattern)
all_files = defaultdict(list)
site_dict
for file_path in all_files:
= file_path.split('_')[-4]
site_number
site_dict[site_number].append(file_path)
return dict(site_dict)
def process_site_files(site_number, file_list):
"""
Process files for a given site number and save the combined DataFrame to CSV.
Args:
- site_number (str): Site number to process.
- file_list (list): List of file paths corresponding to the site number.
"""
= pd.DataFrame()
df
for file_path in file_list:
with open(file_path, 'r') as file:
= file.read()
data
= data.split("\nSite")
contents = float((re.search(r'LATITUDE:\s*([0-9.]+)\s*[NS]', contents[0])).group(1))
lat = (re.search(r'LATITUDE:\s*([0-9.]+)\s*[NS]', contents[0])).group(0)[-1]
lat_hemisphere
= float((re.search(r'LONGITUDE:\s*([0-9.]+)\s*[EW]', contents[0])).group(1))
lon = (re.search(r'LONGITUDE:\s*([0-9.]+)\s*[EW]', contents[0])).group(0)[-1]
lon_hemisphere
= file_path.split("/")[-2]
level
= re.search(r'ALTITUDE:\s*([0-9.]+)\s*m\s*ASL', contents[0]).group(1)
elevation= re.search(r'SAMPLING HEIGHT:\s*([0-9.]+)\s*m\s*AGL', contents[0]).group(1)
intake_height
= StringIO(contents[1])
data_io = pd.read_csv(data_io, delim_whitespace=True)
tmp_data = tmp_data.reset_index().rename(columns={'index': 'Site'})
tmp_data = tmp_data.query("Flag==1").copy()# 1 means no known problem, 0 is not recommemded, 9 is instrument issue (unrealistic)
tmp#tmp['SiteCode'] = int(re.search(r'\d+', site_number).group())
'latitude'] = lat
tmp['longitude'] = lon
tmp['level'] = int(re.search(r'\d+', level).group())
tmp['elevation_m'] = elevation
tmp['intake_height_m']= intake_height
tmp[
if lat_hemisphere == 'S':
'latitude'] = -1* tmp["latitude"]
tmp[if lon_hemisphere == 'W':
'longitude'] = -1* tmp["longitude"]
tmp[
= pd.concat([df, tmp], ignore_index=True)
df
# Ensure the output directory exists
=True)
os.makedirs(output_dir, exist_ok+"PSU_INFLUX_INSITU/", exist_ok=True)
os.makedirs(output_dir
'datetime'] = df[["Year","DOY","Hour"]].apply(convert_to_datetime, axis=1)
df[=True, inplace=True)
df.reset_index(dropfor v in variables:
=df[constant_variables + v].copy()
tmp_file'unit'] = v[0][-4:-1] #CO2(ppm) get the unit only
tmp_file[
={v[0]: 'value'}, inplace=True)
tmp_file.rename(columns'location']= add_location(metadata_link, site_number)
tmp_file[= flag_desired_level(tmp_file, 1) # Flagging only level 1 data
tmp_file
# Remove nan
=["value"], inplace=True)
tmp_file.dropna(subset
#filter 0 values
"value"]!=0].to_csv(f"{output_dir}/PSU_INFLUX_INSITU/NIST-FLUX-IN-{site_number}-{v[0][:-5]}-hourly-concentrations.csv", index=False)
tmp_file[tmp_file[print(f"CSV Created for Site {site_number}-{v[0][:-5]}!!!")
return
# Download and extract zip files
= range(1, 5)
levels_to_download #download_and_extract_zip_files(base_dir=base_dir, levels=levels_to_download)
# Create site dictionary
= create_site_dict(dat_file_pattern)
site_dict
# Comment if you want data from all levels
#site_dict = filter_dict(site_dict, selected_level)
# Process each site's files
for site_number, file_list in site_dict.items():
print(f"Processing Site Number: {site_number}, Total Files: {len(file_list)}")
process_site_files(site_number, file_list)