import sys
import json
import pandas as pd
Atmospheric Carbon Dioxide and Methane Concentrations from NOAA Global Monitoring Laboratory
Documentation of data transformation
This script was used to transform the CO₂ and CH₄ datasets in txt format with hourly granularity to JSON in daily and monthly granularity for visualization in the Greenhouse Gas (GHG) Center.
def daily_aggregate(filepath):
"""
Reads hourly data from a .txt file, aggregates it to daily, and returns a list of JSON objects that can be readily visualized in chart.
Parameters:
filepath (str): The path to the file containing the data to be aggregated.
Returns:
list: A list of dictionaries representing aggregated data, with each dictionary containing
'date' and 'value' keys.
Description:
This function reads data from the specified file, aggregates it, and returns a list of JSON objects.
The function performs the following steps:
- Reads the content of the file.
- Extracts the header lines from the file to determine the structure of the data.
- Processes the data into a DataFrame.
- Filters and aggregates the data.
- Converts the aggregated data into a list of JSON objects, where each object contains 'date' and 'value' keys.
Exceptions:
- FileNotFoundError: If the specified file is not found.
- Exception: If any other exception occurs during the processing, the exception message is returned.
Note:
- The input file is expected to have a .txt format with header lines indicating the structure of the data.
- The function aggregates data from hourly to daily intervals.
- The returned JSON list is suitable for use in frontend applications to visualize the aggregated data.
Example:
aggregated_data = daily_aggregate("/path/to/data_file.txt")
"""
try:
with open(filepath, "r", encoding="utf-8") as file:
= file.read()
file_content_str # split the string text based on new line
= file_content_str.split("\n")
file_content_list # get the header lines. its mentioned in the file's first line.
= file_content_list[0].split(":")[-1]
header_lines = int(header_lines)
header_lines # Slice the non header part of the data. and the last empty element
= file_content_list[header_lines - 1: -1]
str_datas = [data.replace("\n", "").split(" ") for data in str_datas]
data # seperate table body and head to form dataframe
= data[0]
table_head = data[1:]
table_body = pd.DataFrame(table_body, columns=table_head)
dataframe 'value'] = dataframe['value'].astype(float)
dataframe[# Filter data
= (dataframe["qcflag"] == "...") & (dataframe["value"] != 0) & (dataframe["value"] != -999)
mask = dataframe[mask].reset_index(drop=True)
filtered_df # Aggregate data (hourly into daily)
= filtered_df.groupby(['year', 'month', 'day'])['value'].mean().reset_index()
aggregated_df 'value'] = aggregated_df['value'].round(2)
aggregated_df[# necessary columns, processed df
'datetime'] = pd.to_datetime(aggregated_df[['year', 'month', 'day']])
aggregated_df['datetime'] = aggregated_df['datetime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
aggregated_df[= aggregated_df[['datetime', 'value']]
processed_df = processed_df.sort_values(by='datetime')
processed_df # dict formation, needed for frontend [{date: , value: }]
= []
json_list for _, row in processed_df.iterrows():
= {'date': row['datetime'], 'value': row['value']}
json_obj
json_list.append(json_obj)return json_list
except FileNotFoundError:
return "File not found"
except Exception as e:
return f"Exception occured {e}"
def monthly_aggregate(filepath):
"""
Reads hourly data from a .txt file, aggregates it to monthly, and returns a list of JSON objects that can be readily visualized in chart.
Parameters:
filepath (str): The path to the file containing the data to be aggregated.
Returns:
list: A list of dictionaries representing aggregated data, with each dictionary containing
'date' and 'value' keys.
Description:
This function reads data from the specified file, aggregates it, and returns a list of JSON objects.
The function performs the following steps:
- Reads the content of the file.
- Extracts the header lines from the file to determine the structure of the data.
- Processes the data into a DataFrame.
- Filters and aggregates the data.
- Converts the aggregated data into a list of JSON objects, where each object contains 'date' and 'value' keys.
Exceptions:
- FileNotFoundError: If the specified file is not found.
- Exception: If any other exception occurs during the processing, the exception message is returned.
Note:
- The input file is expected to have a .txt format with header lines indicating the structure of the data.
- The function aggregates data from hourly to daily intervals.
- The returned JSON list is suitable for use in frontend applications to visualize the aggregated data.
Example:
aggregated_data = monthly_aggregate("/path/to/data_file.txt")
"""
try:
with open(filepath, "r", encoding="utf-8") as file:
= file.read()
file_content_str # split the string text based on new line
= file_content_str.split("\n")
file_content_list # get the header lines. its mentioned in the file's first line.
= file_content_list[0].split(":")[-1]
header_lines = int(header_lines)
header_lines # Slice the non header part of the data. and the last empty element
= file_content_list[header_lines - 1: -1]
str_datas = [data.replace("\n", "").split(" ") for data in str_datas]
data # seperate table body and head to form dataframe
= data[0]
table_head = data[1:]
table_body = pd.DataFrame(table_body, columns=table_head)
dataframe 'value'] = dataframe['value'].astype(float)
dataframe[# Filter data
= (dataframe["qcflag"] == "...") & (dataframe["value"] != 0) & (dataframe["value"] != -999)
mask = dataframe[mask].reset_index(drop=True)
filtered_df # Aggregate data (hourly into monthly)
= filtered_df.groupby(['year', 'month'])['value'].mean().reset_index()
aggregated_df 'value'] = aggregated_df['value'].round(2)
aggregated_df[# necessary columns, processed df
'datetime'] = pd.to_datetime(aggregated_df[['year', 'month']].assign(day=1))
aggregated_df['datetime'] = aggregated_df['datetime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
aggregated_df[= aggregated_df[['datetime', 'value']]
processed_df = processed_df.sort_values(by='datetime')
processed_df # dict formation, needed for frontend [{date: , value: }]
= []
json_list for _, row in processed_df.iterrows():
= {'date': row['datetime'], 'value': row['value']}
json_obj
json_list.append(json_obj)return json_list
except FileNotFoundError:
return "File not found"
except Exception as e:
return f"Exception occured {e}"
if __name__ == "__main__":
# Check if filepath argument is provided
if len(sys.argv) != 2:
print("Usage: python aggregrate.py <daily|monthly> <filepath>")
1)
sys.exit(
# Get the filepath from command line argument
= sys.argv[1]
frequency = sys.argv[2]
hourly_data_filepath
# Call the aggregate function with the provided filepath
if (frequency == "daily"):
= daily_aggregate(hourly_data_filepath)
result elif (frequency == "monthly"):
= monthly_aggregate(hourly_data_filepath)
result else:
print("Usage: python aggregrate.py <daily|monthly> <filepath>")
1)
sys.exit(
if result is not None:
print(result)
# save the json file for reference
= f"{hourly_data_filepath.split("/")[-1]}.json"
out_path with open(out_path, "w", encoding="utf-8") as file:
file) json.dump(result,