import xarray as xr
import os
import glob
from datetime import datetime
import boto3
import s3fs
import tempfile
import numpy as np
import rasterio
from rasterio.enums import Resampling
from rio_cogeo.cogeo import cog_translate
from rio_cogeo.profiles import cog_profilesGRA²PES Greenhouse Gas and Air Quality Species
Monthly, 0.036 degree resolution emissions of fossil fuel carbon dioxide (ffCO₂), carbon monoxide (CO), nitrogen oxide (NOₓ), sulfur dioxide (SO₂), and particulate matter (PM₂.₅) emissions for the year 2021 over the Contiguous United States from the Greenhouse gas And Air Pollutants Emissions System (GRA²PES).
This script was used to transform the GRA2PES dataset to Cloud Optimized GeoTIFF (COG) format for display in the Greenhouse Gas (GHG) Center.
config = {
"data_acquisition_method": "s3",
"raw_data_bucket" : "gsfc-ghg-store",
"raw_data_prefix": "GRA2PES/monthly_subset_regrid/2021",
"cog_data_bucket": "ghgc-data-store-develop",
"cog_data_prefix": "transformed_cogs/GRAAPES",
"date_fmt" :"%Y%m",
"transformation": {}
}session = boto3.session.Session()
s3_client = session.client("s3")
raw_data_bucket = config["raw_data_bucket"]
raw_data_prefix= config["raw_data_prefix"]
cog_data_bucket = config['cog_data_bucket']
cog_data_prefix= config["cog_data_prefix"]
date_fmt=config['date_fmt']
fs = s3fs.S3FileSystem()def get_all_s3_keys(bucket, model_name, ext):
"""Get a list of all keys in an S3 bucket."""
keys = []
kwargs = {"Bucket": bucket, "Prefix": f"{model_name}/"}
while True:
resp = s3_client.list_objects_v2(**kwargs)
for obj in resp["Contents"]:
if obj["Key"].endswith(ext) and "historical" not in obj["Key"]:
keys.append(obj["Key"])
try:
kwargs["ContinuationToken"] = resp["NextContinuationToken"]
except KeyError:
break
return keys
keys = get_all_s3_keys(raw_data_bucket, raw_data_prefix, ".nc4")
def download_s3_objects(bucket, keys, download_dir):
"""Download all S3 objects listed in keys to the specified local directory."""
if not os.path.exists(download_dir):
os.makedirs(download_dir)
for key in keys:
local_filename = os.path.join(download_dir, os.path.basename(key))
try:
s3_client.download_file(bucket, key, local_filename)
print(f"Downloaded {key} to {local_filename}")
except (NoCredentialsError, PartialCredentialsError) as e:
print(f"Credentials error: {e}")
except Exception as e:
print(f"Failed to download {key}: {e}")
download_s3_objects(raw_data_bucket, keys, "data")
def extract_date_from_key(key):
# Split the key to isolate the part that contains the date
parts = key.split('_')
for part in parts:
# Check if the part is numeric and has the length of 6 (YYYYMM format)
if part.isdigit() and len(part) == 6:
return part
return NoneCOG_PROFILE = {"driver": "COG", "compress": "DEFLATE"}
OVERVIEW_LEVELS = 4
OVERVIEW_RESAMPLING = 'average'
for key in glob.glob("data/*.nc4"):
xds= xr.open_dataset(key)
xds = xds.assign_coords(lon=(((xds.lon + 180) % 360) - 180)).sortby("lon")
for var in ["PM25-PRI","CO2","CO","NOX","SOX"]:
yearmonth = extract_date_from_key(key)
filename = f"output/GRA2PESv1.0_total_{("-").join(var.split('_'))}_{yearmonth}.tif"
data = getattr(xds,var)
data.rio.set_spatial_dims("lon", "lat", inplace=True)
data.rio.write_crs("epsg:4326", inplace=True)
# Create a temporary file to hold the COG
with tempfile.NamedTemporaryFile(suffix='.tif', delete=False) as temp_file:
data.rio.to_raster(f"temp_{yearmonth}_{var}.tif", **COG_PROFILE, nodata=-9999)
# Create COG with overviews and nodata value
cog_translate(
f"temp_{yearmonth}_{var}.tif",
temp_file.name,
cog_profiles.get("deflate"),
overview_level=OVERVIEW_LEVELS,
overview_resampling=OVERVIEW_RESAMPLING,
nodata=-9999
)
# Move the temporary file to the desired local path
os.rename(temp_file.name, filename)
if os.path.exists(f"temp_{yearmonth}_{var}.tif"):
os.remove(f"temp_{yearmonth}_{var}.tif")
del data
print(f"Done for: {filename}")