import geopandas as gpd
import pandas as pd
from collections import namedtuple
import warnings
from typing import Optional,Union, Tuple
from pathlib import Path
from datetime import datetime
from itertools import product
from shapely import Point
# RESource's Local
from RES.era5_cutout import ERA5Cutout
from RES import cluster
from RES import windspeed as wind
from RES.CellCapacityProcessor import CellCapacityProcessor
from RES.coders import CODERSData
from RES.power_nodes import GridNodeLocator
from RES.timeseries import Timeseries
from RES.hdf5_handler import DataHandler
from RES.AttributesParser import AttributesParser
from RES.score import CellScorer
from RES.cell import GridCells
from RES.gwa import GWACells
from RES.boundaries import GADMBoundaries
from RES.units import Units
from RES import utility as utils
# Get the current local time
current_local_time = datetime.now()
warnings.filterwarnings("ignore")
PRINT_LEVEL_BASE:int=1
[docs]
class RESources_builder(AttributesParser):
"""
Main orchestrator class for renewable energy resource assessment workflows.
__RESources_builder__ coordinates the complete workflow for assessing solar and wind potential at sub-national scales. It integrates spatial grid cell preparation, land availability analysis, weather data processing, economic evaluation, and site clustering into a unified framework.
This class implements a modular architecture where each assessment step is handled by specialized components, enabling reproducible, scalable, and transparent renewable energy assessments.
Parameters
----------
config_file_path : str or Path
Path to the YAML configuration file containing project settings
region_short_code : str
ISO or custom short code for the target region (e.g., 'BC' for British Columbia)
resource_type : {'solar', 'wind'}
Type of renewable energy resource to assess
Attributes
----------
store : Path
Root directory for data storage (HDF5 file) and caching.
units : Units
Handler for unit conversions and standardization
gridcells : GridCells
Spatial grid generation and management
timeseries : Timeseries
Climate data processing and capacity factor calculations
datahandler : DataHandler
HDF5-based data storage and retrieval interface
cell_processor : CellCapacityProcessor
Land availability and capacity potential calculations
coders : CODERSData
Canadian power system data integration (substations, transmission lines).
era5_cutout : ERA5Cutout
ERA5 climate data cutout management
scorer : CellScorer
Economic scoring and LCOE calculations
gwa_cells : GWACells
Global Wind Atlas data integration (wind resources only)
results_save_to : Path
Output directory for assessment results
region_name : str
Full name of the assessed region
Methods
-------
get_grid_cells()
Generate spatial grid cells covering the region boundary
get_cell_capacity(force_update=False)
Calculate potential capacity based on land availability constraints
extract_weather_data()
Process climate data for capacity factor calculations
update_gwa_scaled_params(memory_resource_limitation=False)
Integrate Global Wind Atlas wind speed corrections (wind only)
get_CF_timeseries(cells=None, force_update=False)
Generate hourly capacity factor time series
find_grid_nodes(cells=None, use_pypsa_buses=False)
Identify nearest electrical grid connection points
score_cells(cells=None)
Calculate economic scores based on LCOE methodology
get_clusters(scored_cells=None, wcss_tolerance=0.05)
Perform spatial clustering of viable sites
get_cluster_timeseries(clusters=None, dissolved_indices=None, cells_timeseries=None)
Generate representative time series for each cluster
build(select_top_sites=True, use_pypsa_buses=True, memory_resource_limitation=True)
Execute complete assessment workflow
export_results(resource_type, resource_clusters, cluster_timeseries, save_to=Path('results'))
Export results in standardized format for downstream models
select_top_sites(sites, timeseries, resource_max_capacity=10)
Filter results to highest-potential sites within capacity constraints
Examples
--------
Basic wind assessment workflow:
>>> from RES.RESources import RESources_builder
>>> builder = RESources_builder(
... config_file_path="config/config_BC.yaml",
... region_short_code="BC",
... resource_type="wind"
... )
>>> results = builder.build()
>>> builder.export_results(*results)
Step-by-step workflow with intermediate inspection:
>>> builder = RESources_builder("config/config.yaml", "AB", "solar")
>>> cells = builder.get_grid_cells()
>>> cells_with_capacity = builder.get_cell_capacity()
>>> scored_cells = builder.score_cells(cells_with_capacity)
>>> clusters = builder.get_clusters(scored_cells)
Notes
-----
- Inherits configuration parsing capabilities from AttributesParser
- Uses HDF5 storage for efficient handling of large geospatial datasets
- Implements caching mechanisms to avoid redundant computations
- Supports both solar PV and onshore wind technologies
- Economic calculations follow NREL LCOE methodology
- Clustering uses k-means with automatic cluster number optimization
"""
def __post_init__(self):
"""
Initialize inherited attributes and orchestrate the complete RESources workflow setup.
This comprehensive initialization method:
1. Calls parent __post_init__ to inherit configuration and regional attributes
2. Sets up required arguments dictionary for consistent parameter passing
3. Initializes all specialized component classes for the assessment workflow
4. Configures data storage and persistence infrastructure
5. Establishes temporal boundaries for the analysis period
6. Prepares regional and project-specific settings
Component Initialization:
- Units: Standardized unit management and conversion
- GridCells: Spatial grid generation and boundary management
- Timeseries: Climate data processing and capacity factor calculations
- GADMBoundaries: Administrative boundary handling
- DataHandler: HDF5-based data storage and retrieval
- CellCapacityProcessor: Land availability and capacity calculations
- CODERSData: Canadian power system data (if applicable)
- ERA5Cutout: Climate data cutout management
- CellScorer: Economic evaluation and LCOE calculations
- GWACells: Global Wind Atlas integration
Inherited Capabilities:
- Configuration file parsing and validation
- Regional identification and validation
- Logging infrastructure and operation tracking
- Data storage path management
- Temporal analysis period configuration
- Coordinate reference system management
Raises:
ConfigurationError: If required configuration parameters are missing
RegionValidationError: If region_short_code is invalid
StorageError: If HDF5 store cannot be initialized
ComponentInitializationError: If any component class fails to initialize
Notes:
- Canadian power system integration requires valid region code
- Temporal snapshot automatically loaded from configuration
- All components share consistent parameter dictionary
- HDF5 storage enables efficient large dataset management
"""
# Call the parent class __post_init__ to initialize inherited attributes
super().__post_init__()
utils.print_module_title(f'Initiating RESource Builder | {__name__}')
# This dictionary will be used to pass arguments to external classes
self.required_args = { #order doesn't matter
"config_file_path" : self.config_file_path,
"region_short_code": self.region_short_code,
"resource_type": self.resource_type
}
self.country_name = self.get_country()
if self.country_name is None:
utils.print_warning("Country name is not set in the configuration file.")
# Initiate Classes
self.units=Units(**self.required_args)
self.gridcells=GridCells(**self.required_args)
self.timeseries=Timeseries(**self.required_args)
self.gadmBoundary=GADMBoundaries(**self.required_args)
self.gridNodesProcessor=GridNodeLocator(**self.required_args)
self.datahandler=DataHandler(self.store)
self.cell_processor=CellCapacityProcessor(**self.required_args)
if self.country_name == 'Canada':
self.coders=CODERSData(**self.required_args) # only for Canada
self.era5_cutout=ERA5Cutout(**self.required_args)
self.scorer=CellScorer(**self.required_args)
self.gwa_cells=GWACells(**self.required_args)
self.region_name=self.get_region_name()
# Initiate the attributes for the RESources_builder
self.store_grid_cells:gpd.GeoDataFrame=None
self.region_grid_cells:gpd.GeoDataFrame=None
# Snapshot (range of of the temporal data)
(
self.start_date,
self.end_date,
) = self.load_snapshot()
utils.print_update(level=PRINT_LEVEL_BASE+1,
message=f"Snapshot for Resources: {self.start_date} to {self.end_date}")
# Save the configuration to the results directory
utils.print_update(level=PRINT_LEVEL_BASE+1,message=f"{__name__}| Saving configuration to results directory...")
utils.save_to_yaml(self.config, self.results_save_to/self.region_short_code/f'config_{self.region_short_code}_{self.RUN_ID}.yaml')
[docs]
def get_grid_cells(self)->gpd.GeoDataFrame:
"""
Retrieves the default grid cells for the region.
Args:
None
Returns:
gpd.GeoDataFrame: A GeoDataFrame containing the grid cells with their coordinates, geometry, and unique cell ids.
Notes:
- The `get_default_grid()` method creates several attributes, such as the atlite `cutout` object and the `region_boundary`.
- Uses the `cutout.grid` attribute to create the analysis grid cells (GeoDataFrame).
_________________________________________________________________________________________________________________________
- Future Scopes:
To give user flexibility to make their own grid resolution
Step 0: Set-up the Grid Cells and their Unique Indices to populate incremental datafields and to easy navigation to cells
- Step to create the Cells with unique indices generated from their x,y (centroids).
"""
utils.print_update(level=PRINT_LEVEL_BASE+1,
message=f"{__name__}| Preparing Grid Cells...")
self.region_grid_cells:gpd.GeoDataFrame=self.gridcells.get_default_grid()
utils.print_update(level=PRINT_LEVEL_BASE+2,
message=f"{__name__}| Grid Cells updated.")
return self.region_grid_cells
[docs]
def get_cell_capacity(self):
"""
Retrieves the potential capacity of the cells based on land availability and land-use intensity.
Args:
force_update (bool): If True, forces the update of the cell capacity data.
Returns:
tuple: A namedtuple containing the cells with their potential capacity and the capacity matrix.
Notes:
- The capacity matrix is a 2D array where each row corresponds to a cell and each column corresponds to a time step.
- The potential capacity is calculated as:
- Potential capacity (MW) = available land (%) x land-use intensity (MW/sq.km) x Area of a cell (sq. km)
- The method uses the `CellCapacityProcessor` class to process the capacity data.
- The method returns a namedtuple with two attributes: `data` (the cells GeoDataFrame) and `matrix` (the capacity matrix).
- Could be parallelized with Step 2A/2C.
"""
utils.print_update(level=PRINT_LEVEL_BASE+1,
message=f"{__name__}| Preparing Cells' capacity...")
self.cells_with_capacity,self.capacity_matrix=self.cell_processor.get_capacity()
utils.print_update(level=PRINT_LEVEL_BASE+2,
message=f"{__name__}| Cells' capacity updated.")
return self.cells_with_capacity,self.capacity_matrix # returns a namedtuple with `data` and `matrix attributes
#---------------------------
[docs]
def update_gwa_scaled_params(self,
memory_resource_limitation:Optional[bool]=False):
if self.resource_type=='wind':
utils.print_update(level=PRINT_LEVEL_BASE+2,
message=f"{__name__}| Preparing high resolution windspeed data from Global Wind Atlas")
if all(column in self.store_grid_cells.columns for column in ['CF_IEC2', 'CF_IEC3', 'windspeed_gwa','windspeed_ERA5']):
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| 'CF_IEC2', 'CF_IEC3', 'windspeed_gwa' are already present in the store information, skipping data extraction from source")
pass
else:
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Data extracting from source: 'CF_IEC2', 'CF_IEC3', 'windspeed_gwa' ")
self.gwa_cells.map_GWA_cells_to_ERA5(aggregation_level=self.gadmBoundary.boundary_datafields['NAME_2'],
memory_resource_limitation=memory_resource_limitation)
elif self.resource_type=='solar':
# Not activated for solar resources yet as the high resolution data processing is computationally expensive and the data contrast for solar doesn't provide satisfactory incentive for that.
utils.print_update(level=PRINT_LEVEL_BASE+2,
message=f"{__name__}| Global Solar Atlas data not yet supported for solar.")
pass
self.datahandler.refresh()
self.store_grid_cells=self.datahandler.from_store('cells')
return self.store_grid_cells
[docs]
def get_CF_timeseries(self,
cells:gpd.GeoDataFrame=None,
force_update=False)->tuple:
"""
Extract timeseries information for the Cells' e.g. static CF (yearly mean) and timeseries (hourly).
Args:
cells (gpd.GeoDataFrame): Cells with their coordinates, geometry, and unique cell ids.
force_update (bool): If True, forces the update of the CF timeseries data.
Returns:
tuple: A namedtuple containing the cells with their timeseries data.
Notes:
- The method uses the `Timeseries` class to retrieve the timeseries data for the cells.
- The timeseries data is retrieved based on the resource type (e.g., 'solar' or 'wind').
- If the `cells` argument is not provided, it retrieves the cells from the data handler.
- Could be parallelized with Step 2B/2C
"""
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Preparing Timeseries for the Cells...")
if cells is None:
self.datahandler.refresh()
cells=self.datahandler.from_store('cells')
cells_withCF,cells_timeseries= self.timeseries.get_timeseries(cells=cells)
return cells_withCF,cells_timeseries
[docs]
def find_grid_nodes(self,
cells:gpd.GeoDataFrame=None,
use_pypsa_buses:bool=False) -> gpd.GeoDataFrame:
"""
Find the grid nodes for the given cells.
Args:
cells (gpd.GeoDataFrame, optional): Cells with their coordinates, geometry, and unique cell ids. Defaults to None.
use_pypsa_buses (bool, optional): Whether to use PyPSA buses as preferred nodes for resource connection. Defaults to False.
Returns:
gpd.GeoDataFrame: Updated grid cells with nearest grid node information
Notes:
Could be parallelized with Step 1B/C.
"""
self.cutout,self.region_boundary=self.era5_cutout.get_era5_cutout()
# Initialize cells data
if cells is None:
self.datahandler.refresh()
self.store_grid_cells=self.datahandler.from_store('cells')
else:
self.store_grid_cells = cells.copy()
utils.print_update(level=PRINT_LEVEL_BASE+1,
message=f"{__name__}| Grid Node Location initiated...")
if use_pypsa_buses:
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Using PyPSA nodes as preferred nodes for resource connection.")
utils.print_info(f"{__name__}| PyPSA buses are configured in the config file under 'capacity\
-disaggregation/transmission/buses'")
buses_data_path=self.get_buses_path()
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| PyPSA buses loading from: {buses_data_path}")
grid_ss_df=pd.read_csv(buses_data_path)
assert 'x' in grid_ss_df.columns and 'y' in grid_ss_df.columns, \
"The buses data must contain 'x' and 'y' columns for coordinates."
self.grid_ss = gpd.GeoDataFrame(
grid_ss_df,
geometry=gpd.points_from_xy(grid_ss_df['x'], grid_ss_df['y']),
crs=self.get_default_crs(), # Set the coordinate reference system (e.g., WGS84)
)
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Searching for nearest grid nodes for each cell...")
self.region_grid_cells_cap_with_nodes = self.gridNodesProcessor.find_grid_nodes_ERA5_cells(self.grid_ss,
self.store_grid_cells)
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| ✔ Closest grid nodes and distance calculation completed.")
self.datahandler.to_store(self.store_grid_cells,'cells')
self.datahandler.to_store(self.grid_ss,'buses')
else:
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Using Substations (sourced from CODERS) preferred nodes for resource connection.")
if self.country_name == 'Canada':
self.grid_ss:gpd.GeoDataFrame=self.coders.get_table_provincial('substations')
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Searching for nearest grid nodes for each cell...")
self.region_grid_cells_cap_with_nodes = self.gridNodesProcessor.find_grid_nodes_ERA5_cells(self.grid_ss,
self.store_grid_cells)
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| ✔ Closest grid nodes and distance calculation completed.")
self.datahandler.to_store(self.store_grid_cells,'cells')
self.datahandler.to_store(self.grid_ss,'substations')
else:
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Using OSM grid lines for connection point analysis...")
self.grid_lines:gpd.GeoDataFrame=self.gridNodesProcessor.get_OSM_grid_lines()
if self.grid_lines is None or len(self.grid_lines) == 0:
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Warning: No OSM grid lines found for region {self.region_short_code}")
return self.store_grid_cells
# self.grid_lines=self.grid_lines[self.grid_lines['max_voltage']>=11000]
self.store_grid_cells["centroid"] = self.store_grid_cells.apply(lambda row: Point(row["x"], row["y"]), axis=1)
# Apply to each row to compute the single connection point
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| Calculating nearest connection points to transmission lines...")
connection_results = self.store_grid_cells.apply(
lambda row: self.gridNodesProcessor.find_nearest_connection_point(
row["centroid"], row["geometry"], self.store_grid_cells, self.grid_lines
),
axis=1, result_type="expand"
)
self.store_grid_cells[["nearest_connection_point", "nearest_distance"]] = connection_results
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| ✔ Connection point analysis completed.")
self.datahandler.to_store(self.store_grid_cells,'cells')
self.datahandler.to_store(self.grid_lines,'lines')
# Refresh and return updated cells
self.datahandler.refresh()
self.store_grid_cells=self.datahandler.from_store('cells')
return self.store_grid_cells
'''
____________________________________________________________________________________________________________________________________________
Step 2: Set Scoring Matrix for the Cells.
____________________________________________________________________________________________________________________________________________
- We populated necessary parameters to evaluate the cells. We can set the scoring metric using the parameters.
- Typical metric includes but not limited to LCOE (Levelized Cost of Electricity in $/MWh) of cells.
- As a starter and simplified metric, we calculate Total Cost ($) and Total Energy Yield (MWh) and for each Cell and calculate LCOE ($/MWh).
* Remarks: Sequential Step after Step-1
* Future Scope(s):
1. Apply MCDA (Multi Criteria Decision Analysis) as Scoring Metric of the Cells.
2. Introduce proxy of the Local Regulations regarding site accessibility/placements.
3. Introduce proxy of the Local/Govt. incentives for Sites (based on Load Center based placement, land ownership, proximity to transport network etc.)
4. Introduce proxy of Weather Drought parameters for cells
e.g. i. [standardized energy indices in future climate scenarios](https://www.sciencedirect.com/science/article/pii/S0960148123011217?via%3Dihub)
ii. [Compound energy droughts](https://www.sciencedirect.com/science/article/pii/S0960148123014659?via%3Dihub#d1e724)
'''
[docs]
def score_cells(self,
cells:gpd.GeoDataFrame=None):
"""
Scores the Cells based on calculated LCOE ($/MWh). </br>
Wrapper of the _.get_cell_score()_ method of **_CellScorer_** object.
"""
self.not_scored_cells=cells
if self.not_scored_cells is None:
self.datahandler.refresh()
self.not_scored_cells=self.datahandler.from_store('cells')
self.scored_cells = self.scorer.get_cell_score(self.not_scored_cells,f'{self.resource_type}_CF_mean') #
# # Add new columns to the existing DataFrame
# for column in self.scored_cells.columns:
# self.not_scored_cells[column] = self.scored_cells[column].reindex(self.not_scored_cells.index)
self.datahandler.to_store(self.scored_cells,'cells',force_update=True)
# self.store_grid_cells=self.datahandler.from_store('cells')
return self.scored_cells
# def rescale_cutout_windspeed(self, cutout, era5_cells_gdf_updated):
# return wind.rescale_ERA5_cutout_windspeed_with_mapped_GWA_cells(cutout, era5_cells_gdf_updated)
'''
____________________________________________________________________________________________________________________________________________
Step 3: Clusterize the Cells to minimize the representative technologies in downstream models.
____________________________________________________________________________________________________________________________________________
- As a starter, we apply simplified spatial clustering by using k-means based on LCOE of the cells.
* Remarks: Sequential Step after Step-3.
* Future Scope(s):
1. Apply Spatio-temporal clustering to extract hybrid RE profile (solar + wind) for regions/clusters.
2. Use ML approaches for comparative results with classical/heuristics based approach.
'''
'''
___________________
- Step 3A:
- As a starter, we apply simplified spatial clustering by using k-means based on LCOE of the cells.
* Remarks: Sequential Step after Step-2.
___________________
'''
[docs]
def get_clusters(self,
scored_cells:gpd.GeoDataFrame=None,
score_tolerance:float=200,
wcss_tolerance=None):
"""
### Args:
- **WCSS (Within-cluster Sum of Square) tolerance**. Higher tolerance gives , more simplification and less number of clusters.
- **Default set to 0.05**.
"""
self.resource_disaggregation_config=self.get_resource_disaggregation_config()
self.wcss_tolerance=wcss_tolerance if wcss_tolerance else self.get_wcss_tolerance()
self.scored_cells=scored_cells
self.gadm_config=self.get_gadm_config()
# self.wcss_tolerance:float= self.resource_disaggregation_config['WCSS_tolerance']
utils.print_update(level=PRINT_LEVEL_BASE+1,
message=f"{__name__}| Preparing cluster of resources...")
utils.print_update(level=PRINT_LEVEL_BASE+2,
message=f"{__name__}| Clustering requires scored cells. The default scoring method is set to 'lcoe'. Checking for 'lcoe' in datafields...")
if not hasattr(self, f'lcoe_{self.resource_type}') or self.scored_cells is None:
utils.print_update(level=PRINT_LEVEL_BASE+3,
message=f"{__name__}| 'lcoe_{self.resource_type}' not found in available datafields...")
self.scored_cells = self.score_cells()
utils.print_warning(f"{__name__}| Filtering scored cells with score tolerance <= {score_tolerance} $/MWh and grid proximity threshold <= {self.get_grid_proximity_km()} km")
node_distance_col:str = utils.get_available_column(self.scored_cells, ['nearest_station_distance_km', 'nearest_distance'])
self.scored_cells_FILTERED = self.scored_cells[
(self.scored_cells[f'lcoe_{self.resource_type}'] <= score_tolerance) &
(self.scored_cells[node_distance_col] <= self.gridNodesProcessor.grid_proximity_threshold_km)
]
self.vis_dir=self.get_vis_dir()
self.ERA5_cells_cluster_map, self.region_optimal_k_df = cluster.cells_to_cluster_mapping(self.scored_cells_FILTERED,
self.vis_dir,
self.wcss_tolerance,
self.sub_national_unit_tag,
self.resource_type,
[f'lcoe_{self.resource_type}', f'potential_capacity_{self.resource_type}']
)
self.cell_cluster_gdf, self.dissolved_indices = cluster.create_cells_Union_in_clusters(self.ERA5_cells_cluster_map,
self.region_optimal_k_df,
self.sub_national_unit_tag,
self.resource_type)
self.cell_cluster_gdf['Operational_life'] = self.resource_disaggregation_config.get('Operational_life', 20)
self.cell_cluster_gdf.loc[:, 'resource_type'] = self.resource_type.lower()
# Define a namedtuple
cluster_data = namedtuple('cluster_data', ['clusters','dissolved_indices'])
self.clusters_nt:tuple=cluster_data(self.cell_cluster_gdf,self.dissolved_indices)
# Corrected version of the code
self.datahandler.to_store(self.cell_cluster_gdf,f'clusters/{self.resource_type}',force_update=True)
self.dissolved_cell_indices_df=pd.DataFrame(self.dissolved_indices).T
self.dissolved_cell_indices_df.index.name=self.sub_national_unit_tag
self.datahandler.to_store(self.dissolved_cell_indices_df,f'dissolved_indices/{self.resource_type}',force_update=True)
return self.clusters_nt
'''
___________________
- Step 3B:
- As a starter, we apply simplified approach by calculating stepwise mean from the associated cells and set it as a representative profile of a cluster.
* Remarks: Sequential Step after Step-4A.
___________________
* Future Scope(s):
1. Apply temporal clustering methods for representative profile.
2. Collect hybrid RE profile (solar + wind) for regions/clusters show comparative analysis.
2. Use ML approaches for comparative results with aforementioned classical/heuristics based approach.
'''
[docs]
def get_cluster_timeseries(self,
clusters:gpd.GeoDataFrame=None,
dissolved_indices:pd.DataFrame=None,
cells_timeseries:pd.DataFrame=None,
):
self.cells_timeseries=cells_timeseries
self.cell_cluster_gdf=clusters
self.dissolved_cell_indices_df=dissolved_indices
if self.cells_timeseries is None:
self.cells_timeseries=self.datahandler.from_store(F'timeseries/{self.resource_type}')
if self.cell_cluster_gdf is None:
self.cell_cluster_gdf=self.datahandler.from_store(f'clusters/{self.resource_type}')
utils.print_update(level=PRINT_LEVEL_BASE+1,
message=f"{__name__}| Preparing representative profiles for {len(self.cell_cluster_gdf)} clusters")
if self.dissolved_cell_indices_df is None:
self.dissolved_cell_indices_df=self.datahandler.from_store(f'dissolved_indices/{self.resource_type}')
self.cluster_ts_df=self.timeseries.get_cluster_timeseries(self.cell_cluster_gdf,
self.cells_timeseries,
self.dissolved_cell_indices_df,
self.sub_national_unit_tag)
return self.cluster_ts_df
# _________________________________________________________________________________
[docs]
def build(self,
select_top_sites:Optional[bool]=True,
use_pypsa_buses:Optional[bool]=False,
memory_resource_limitation:Optional[bool]=True):
"""
Execute the specific module logic for the given resource type ('solar' or 'wind').
"""
utils.print_module_title(f"Initiating {self.resource_type} module for {self.get_region_name()}...")
self.memory_resource_limitation=memory_resource_limitation
utils.print_banner("Step 1 : Prepare Cutout and Grid Cells")
self.get_grid_cells()
utils.print_banner("Step 2 : Calculate Land availability and process capacity matrix")
self.get_cell_capacity()
utils.print_banner("Step 3 : [if Wind Resources] Collect and rescale Global Wind Atlas Data and calibrate ERA5's windspeed. ")
# Store CF data for validation purposes
self.extract_weather_data()
self.update_gwa_scaled_params(self.memory_resource_limitation) # testing, 2025 04 21
utils.print_banner("Step 4 : Create timeseries for Resources CF")
self.get_CF_timeseries()
utils.print_banner("Step 5 : Find closed grid connection nodes")
self.find_grid_nodes(use_pypsa_buses=use_pypsa_buses)
utils.print_banner("Step 6 : Use capacity, energy yield and cost attributes to score each cell")
self.score_cells()
utils.print_banner("Step 7.1 : Use score similarities to find clusterized representation (sites) of cells")
self.get_clusters()
utils.print_banner("Step 7.2 : Prepare representative timeseries of the clusterized sites")
self.get_cluster_timeseries()
utils.print_info("To avoid confusion, Units dictionary method should be updated if any units are changed across modules. However, units dictionary is for documentation purposes only. It doesn't have any calculation impacts on any of the methods.")
self.units.create_units_dictionary()
if select_top_sites:
utils.print_banner("Step 7 : Top Site Selections for Targeted Capacity Investments/Plans")
resource_max_capacity=self.resource_disaggregation_config.get('max_capacity',10) # Collects max_capacity from resource_disaggregation_config (if set), otherwise defaults to 10 GW
resource_clusters,cluster_timeseries=self.select_top_sites(self.get_clusters().clusters,
self.get_cluster_timeseries(),
resource_max_capacity=resource_max_capacity)
utils.print_module_title(f"Top Sites(clusters) from {self.resource_type} module saved to {self.store} for {self.get_region_name()}...")
else: # When user wants all of the sites
resource_clusters=self.get_clusters().clusters,
cluster_timeseries=self.get_cluster_timeseries(),
utils.print_module_title(f"All Sites (clusters) from {self.resource_type} module saved to {self.store} for {self.get_region_name()}...")
self.export_results(self.resource_type,
self.region_name,
resource_clusters,
cluster_timeseries,
self.results_save_to)
sites_summary:str=self.create_summary_info(self.resource_type,
self.region_name,
resource_clusters,
cluster_timeseries)
self.dump_export_metadata(sites_summary,
self.results_save_to)
[docs]
@staticmethod
def export_results(resource_type:str,
region:str,
resource_clusters:pd.DataFrame,
cluster_timeseries:pd.DataFrame,
save_to : Optional[Path]=Path('results')):
"""
Export processed resource cluster results (geodataframe) to standard datafield csvs as input for downstream models.
### Args
- **resource_type**: The type of resource ('solar' or 'wind').
- **resource_clusters**: A DataFrame containing resource cluster information.
- **output_dir** [optional]: The directory to save the output files. Default to : 'results/*.csv'
> Currently supports: CLEWs, PyPSA
"""
# Check if resource_clusters is a DataFrame or GeoDataFrame
if not isinstance(resource_clusters, (pd.DataFrame, gpd.GeoDataFrame)):
raise TypeError(
f"Invalid input: resource_clusters must be a Pandas DataFrame or GeoDataFrame, "
f"but got {type(resource_clusters).__name__}."
)
if not isinstance(cluster_timeseries, (pd.DataFrame)):
raise TypeError(
f"Invalid input: resource_clusters must be a Pandas DataFrame or GeoDataFrame, "
f"but got {type(resource_clusters).__name__}."
)
# Exclude all columns containing geometry-related data as these are not required for downstream models in consideration i.e. CLEWs, PyPSA
resource_clusters_excld_geom = resource_clusters[[col for col in resource_clusters.columns if col != 'geometry']]
# CSV -> Save to
save_to=utils.ensure_path(save_to)
save_to.mkdir(parents=True,exist_ok=True)
resource_clusters_excld_geom.to_csv(save_to/f'resource_options_{resource_type}_{region}.csv', index=True)
cluster_timeseries.to_csv(save_to/f'resource_options_{resource_type}_{region}_timeseries.csv', index=True)
utils.print_update(level=2, message=f"{resource_type} clusters exported to :{save_to/f'resource_options_{resource_type}_{region}_timeseries.csv'}")
[docs]
@staticmethod
def create_summary_info(resource_type:str,
region:str,
sites:pd.DataFrame,
timeseries:pd.DataFrame)->str:
"""
Creates summary information to be exported alongside results data.
"""
formatted_time = current_local_time.strftime("%H:%M:%S")
info = (
f"{'_'*25} Top Block Represents the latest results' summary <{'_'*25}\n"
f"{'-'*100}\n"
f"* {resource_type.upper()} for {region.upper()}*\n"
f"Total Capacity of the Sites: {sites['potential_capacity'].sum() / 1e3} GW\n"
f">> No. of Sites (Clusters): {len(sites)}\n"
f" >> Snapshot Points: {len(timeseries)}"
f"\n Results Generated on Local Time (hh:mm:ss): {formatted_time}\n"
f"{'-'*100}\n"
)
return info
[docs]
@staticmethod
def select_top_sites(sites:Union[gpd.GeoDataFrame, pd.DataFrame],
sites_timeseries:pd.DataFrame,
resource_max_capacity:float,
)-> Tuple[Union[gpd.GeoDataFrame, pd.DataFrame], pd.DataFrame]:
print(f">>> Selecting TOP Sites to for {resource_max_capacity} GW Capacity Investment in BC...")
"""
Select the top sites based on potential capacity and a maximum resource capacity limit.
Args:
sites_gdf: GeoDataFrame containing cell and bucket information.
resource_max_capacity (float) : Maximum allowable capacity in GW.
Returns:
- selected_sites: GeoDataFrame with the selected top sites.
"""
print(f"{'_'*100}")
print(f"Selecting the Top Ranked Sites to invest in {resource_max_capacity} GW PV in BC")
print(f"{'_'*100}")
# Initialize variables
selected_rows:list = []
total_capacity:float = 0.0
top_sites:gpd.GeoDataFrame = sites.copy()
if top_sites['potential_capacity'].iloc[0] < resource_max_capacity * 1000:
# Iterate through the sorted GeoDataFrame
for index, row in top_sites.iterrows():
# Check if adding the current row's capacity exceeds resource capacity
if total_capacity + row['potential_capacity'] <= resource_max_capacity * 1000:
selected_rows.append(index) # Add the row to the selection
# Update the total capacity
total_capacity += row['potential_capacity']
# If adding the current row's capacity would exceed max resource capacity, stop the loop
else:
break
# Create a new GeoDataFrame with the selected rows
top_sites:gpd.GeoDataFrame = top_sites.loc[selected_rows]
# Apply the additional logic
# mask = sites['cluster_id'] > top_sites['cluster_id'].max()
mask = sites.index > top_sites.index.max()
selected_additional_sites:gpd.GeoDataFrame = sites[mask].head(1)
remaining_capacity:float = resource_max_capacity * 1000 - top_sites['potential_capacity'].sum()
if remaining_capacity > 0:
if len(selected_additional_sites) > 0:
print(f"\n!! Note: The Last cluster ({selected_additional_sites.index[-1]}) originally had {round(selected_additional_sites['potential_capacity'].iloc[0] / 1000,2)} GW potential capacity."
f"To fit the maximum capacity investment of {resource_max_capacity} GW, it has been adjusted to {round(remaining_capacity / 1000,2)} GW\n")
else:
print(f"\n!! Note: No additional sites selected. Remaining capacity: {round(remaining_capacity / 1000,2)} GW\n")
selected_additional_sites['potential_capacity'] = remaining_capacity
# Concatenate the DataFrames
top_sites = pd.concat([top_sites, selected_additional_sites])
else:
original_capacity = sites['potential_capacity'].iloc[0]
print(f"!!Note: The first cluster originally had {round(original_capacity / 1000,2)} GW potential capacity.\n"
f"To fit the maximum capacity investment of {resource_max_capacity} GW, it has been adjusted. \n")
top_sites = top_sites.iloc[:1] # Keep only the first row
# Adjust the potential_capacity of the first row
top_sites.at[top_sites.index[0], 'potential_capacity'] = resource_max_capacity * 1000
# top_sites_ts = sites_timeseries[top_sites.index.astype(str)]
# sites_timeseries.columns = sites_timeseries.columns.str.strip()
# top_sites.index = top_sites.index.str.strip()
top_sites_ts = sites_timeseries[top_sites.index]
return top_sites ,top_sites_ts # gdf
def build_resources(regions:list,
resource_types: list,
config_path: str | Path = 'config/config.yaml'):
"""
Builds resources for specified regions and resource types using the RESources_builder module.
Args:
regions (list): A list of region short codes to process.
resource_types (list): A list of resource types to build for each region.
config_path (str | Path, optional): Path to the configuration file. Defaults to 'config/config.yaml'.
Returns:
None
"""
for region, resource in product(regions, resource_types):
RES_module = RESources_builder(
config_file_path=config_path,
region_short_code=region,
resource_type=resource
)
RES_module.build(select_top_sites=True,
use_pypsa_buses=False)