import yaml
import os
import atlite
import shapely
import geopandas as gpd
import pypsa
import numpy as np
import pandas as pd
from shapely.ops import unary_union
import sys
from pathlib import Path
from colorama import Fore, Style
from typing import Optional
[docs]
def print_update(level: int=None,
message: str="--",
alert:Optional[bool]=False):
if level is not None:
if level == 1:
color = Fore.YELLOW
prefix="└"
elif level == 2:
color = Fore.CYAN
prefix=" └"
elif level == 3:
color = Fore.LIGHTMAGENTA_EX
prefix=" └"
elif level > 3:
color = Fore.LIGHTBLACK_EX + Style.DIM
prefix=" └─"
elif alert:
level=2
color = Fore.RED
prefix=" └ X "
else:
color = Fore.LIGHTRED_EX + Style.DIM
prefix=" !!!"
print(f"{color}{prefix} {message}{Style.RESET_ALL}")
[docs]
def set_root(current_dir = Path.cwd()):
# Check if the last folder name matches 'BC_Combined_Modelling'
if current_dir.name == 'BC_Combined_Modelling':
print("The current directory is 'BC_Combined_Modelling'")
else:
# Set it as the working directory
os.chdir('..')
# Verify the change
print(f"Current working directory set to: {os.getcwd()}")
[docs]
def find_project_root(marker="bc_combined_modelling", start_dir: str = None) -> Path:
"""
Traverse upwards to find the root directory containing a specific folder or file.
Args:
marker (str): The folder or file name to identify the project root.
start_dir (str): The starting directory for the search. Defaults to current directory.
Returns:
Path: The path to the project root.
Raises:
FileNotFoundError: If the marker is not found.
"""
start_dir = Path(start_dir or os.getcwd())
current_dir = start_dir.resolve()
while current_dir != current_dir.parent: # Stop at the filesystem root
if (current_dir / marker).exists(): # Check for the marker
print(f"Project root found at: {current_dir}")
return current_dir
current_dir = current_dir.parent
raise FileNotFoundError(f"Project root with marker '{marker}' not found starting from '{start_dir}'.")
[docs]
def setup_environment(marker="bc_combined_modelling", start_dir: str = None):
"""
Configure the environment for running scripts or notebooks from any directory.
Args:
marker (str): The folder or file name to identify the project root.
start_dir (str): The starting directory for the search. Defaults to current directory.
"""
try:
project_root = find_project_root(marker, start_dir=start_dir)
if str(project_root) not in sys.path:
sys.path.append(str(project_root)) # Add project root to sys.path
print(f"Environment set up. Project root added to sys.path: {project_root}")
except FileNotFoundError as e:
print(str(e))
raise
# In a Jupyter Notebook
# Add the following at the start of your notebook:
#
# from your_script_name import ProjectUtils
# ProjectUtils.setup_environment()
[docs]
def parse_data_value(value):
"""
Attempts to convert strings that represent lists (e.g. '[100, 200, 300]')
into actual lists. If not possible, returns the value as-is.
"""
if isinstance(value, str):
try:
value = eval(value)
except (SyntaxError, NameError):
# If eval fails, keep the original string
pass
return value
[docs]
def load_config(config_file):
'''
This function loads the configuration file for PyPSA_BC
config_file: Path + filename of the configuration file. (i.e. ../config/config.yaml)
'''
with open(config_file, 'r') as file:
cfg = yaml.safe_load(file)
return cfg
[docs]
def create_folder(folder):
'''
This functions creates a folder if not already created.
If the folder is already created it takes no action
folder: Path + folder name.
'''
if not os.path.exists(folder):
os.makedirs(folder)
print(f"Created folder @ {folder}")
[docs]
def get_region_polygon(geometry):
'''
This function finds a bounding box of the region and creates a polygon for it.
Returns a polygon of the regions max/min bounds in terms of lats and lons.
'''
if len(geometry) == 1:
west_lon= geometry.bounds['minx'].iloc[0]
south_lat = geometry.bounds['miny'].iloc[0]
east_lon = geometry.bounds['maxx'].iloc[0]
north_lat = geometry.bounds['maxy'].iloc[0]
bbox = (west_lon, south_lat, east_lon, north_lat)
polygon = shapely.geometry.box(*bbox, ccw=True)
else:
print('There remains multiple geometries')
exit(1)
return polygon
[docs]
def get_bounds(polygon_list):
'''
This function takes in a list of polygons and returns the maximum bounds for them.
'''
bounds = {}
merged_poly = gpd.GeoSeries(unary_union(polygon_list))
bounds["west_lon"] = merged_poly.geometry.bounds['minx'].iloc[0]
bounds["south_lat"] = merged_poly.geometry.bounds['miny'].iloc[0]
bounds["east_lon"] = merged_poly.geometry.bounds['maxx'].iloc[0]
bounds["north_lat"] = merged_poly.geometry.bounds['maxy'].iloc[0]
return bounds
[docs]
def get_cutout_path(cfg):
'''
This function return the unique name based on the region and start/end year
for a cutout.
return: file path + name for the cutout described by selections in the
cutout configuration.
'''
# Create file_path name with custom year_+date
start_year = cfg['cutout']["snapshots"]["start"][0][:4]
end_year = cfg['cutout']["snapshots"]["end"][0][:4]
prefix = cfg['cutout']['root'] + "BC" # automate the region name
if start_year == end_year:
suffix = start_year
file = "_".join([prefix, suffix + ".nc"])
else: # multi_year_file
suffix = "_".join([start_year, end_year])
file = "_".join([prefix, suffix + ".nc"])
return file
[docs]
def create_era5_cutout(bounds, cfg):
'''
This function creates a cutout based on data for era5.
'''
# Extract parameters from configuration file
dx,dy = cfg["cutout"]["dx"], cfg["cutout"]['dy']
time_horizon = slice(cfg["cutout"]["snapshots"]['start'][0],
cfg["cutout"]["snapshots"]['end'][0])
# get path + filename for the cutout
file = get_cutout_path(cfg)
# Create the cutout based on bounds found from above
cutout = atlite.Cutout(path=file,
module=cfg["cutout"]["module"],
x=slice(bounds['west_lon'] - dx, bounds['east_lon'] + dx),
y=slice(bounds['south_lat'] - dy, bounds['north_lat'] + dy ),
dx=dx,
dy=dy,
time=time_horizon)
cutout.prepare()
[docs]
def convert_cid_2_aid(cid,old_aid):
'''
This creates an asset id (aid) based on the component id (cid).
Common Example:
cid -> BC_ZBL03_GEN
old_aid -> BC_ZBL_GSS
new_aid -> BC_ZBL_GSS
Example:
cid -> BC_BR0101_GEN
old_aid -> BC_BR1_DFS
new_aid -> BC_BR1_DFS
Example:
cid -> BC_BR0102_GEN
old_aid -> BC_BR2_GSS
new_aid -> BC_BR2_GSS
'''
aid_start = old_aid.split('_')[0]
cid_2_aid = cid.split('_')[1][:3]
aid_end = old_aid.split('_')[-1]
if aid_start != cid.split('_')[0]: # error check
print('Error detected in convert_cid_2_aid')
exit(3)
new_aid= "_".join([aid_start, cid_2_aid, aid_end])
return new_aid
[docs]
def write_pickle(data_dict, filepath):
'''
Write a pickle file based on a dictionary.
'''
with open(filepath,"wb") as f:
pd.to_pickle(data_dict, f)
f.close()
print(f'Wrote pickle file {filepath}')
[docs]
def read_pickle(filepath):
'''
Read a json file based on a dictionary.
'''
with open(filepath, 'rb') as f:
data_dict = pd.read_pickle(f)
return data_dict
[docs]
def create_standard_gen_bus_map(buses):
'''
This function accepts a list a buses and returns a mapping from the buses to the lowest voltage
bus for each unique node code. The underlying assumption used in selecting a bus to connect a generator to is that
generators are connected the lowest voltage bus at their given node location.
Example:
Buses = ["230_ABN_GSS", "138_ABN_GSS","500_MCA_GSS", "63_MCA_GSS"] -> bus_dict = {ABN_GSS:138, MCA_GSS:63}
'''
bus_dict = {}
# Assume generators are connected to lowest voltage bus at their given node_code
for bus in buses:
node = "_".join(bus.split('_')[1:])
voltage = int(bus.split('_')[0])
if node not in bus_dict.keys():
bus_dict[node] = voltage
else:
bus_dict[node] = min(voltage,bus_dict[node])
return bus_dict
[docs]
def get_gen_bus(node_code, bus_dict):
'''
This function returns the correct standardized electric bus for a generator.
Example:
node_code = "BC_ABS_GSS"
bus_dict = {"ABS_GSS":230, "MCA_GSS":63}
return -> "230_ABS_GSS"
'''
node_code_suffix = "_".join(node_code.split('_')[1:]) # i.e. BC_ABN_GSS -> ABN_GSS
return "_".join([str(bus_dict[node_code_suffix]), node_code_suffix])
[docs]
def get_multi_link_override():
'''
Gets the multi-link override. Needed for cascaded hydroelectric.
'''
# From PyPSA CHP Example: This ensures we can add 2 outputs for a single link i.e bus0 -> bus_1 AND bus_2
override_component_attrs = pypsa.descriptors.Dict(
{k: v.copy() for k, v in pypsa.components.component_attrs.items()}
)
override_component_attrs["Link"].loc["bus2"] = [
"string",
np.nan,
np.nan,
"2nd bus",
"Input (optional)",
]
override_component_attrs["Link"].loc["efficiency2"] = [
"static or series",
"per unit",
1.0,
"2nd bus efficiency",
"Input (optional)",
]
override_component_attrs["Link"].loc["p2"] = [
"series",
"MW",
0.0,
"2nd bus output",
"Output",
]
return override_component_attrs
[docs]
def get_multi_gen_override():
'''
'''
override_component_attrs = pypsa.descriptors.Dict(
{k: v.copy() for k, v in pypsa.components.component_attrs.items()}
)
override_component_attrs["Link"].loc["bus2"] = [
"string",
np.nan,
np.nan,
"2nd bus",
"Input (optional)",
]
override_component_attrs["Link"].loc["efficiency2"] = [
"static or series",
"per unit",
1.0,
"2nd bus efficiency",
"Input (optional)",
]
override_component_attrs["Link"].loc["p2"] = [
"series",
"MW",
0.0,
"2nd bus output",
"Output",
]
[docs]
def add_generic_columns(assets, gen_generic, gen_type):
'''
This functions adds generic columns of costs and efficiencies.
This is needed before creating the .csv for OSeMOSYS via Otoole.
# MODIFIED 2024-10-01 for CODERS update
'''
columns_to_add = ['variable_om_cost_CAD_per_MWh',
'average_fuel_price_CAD_per_MMBtu',
'average_fuel_price_CAD_per_GJ',
'carbon_emissions', # _tCO2eq_per_MWh
'heat_rate', # _MMBtu_per_MWh
'spinning_reserve_capability',
'economic_life',
'capital_cost_CAD_per_kW',
'capital_overhead_CAD_per_kW',
'overnight_capital_cost_CAD_per_kW',
'interest_during_construction_CAD_per_kW',
'annualized_capital_cost_CAD_per_MWyear',
'total_project_cost_2020_CAD_per_kW',
]
mask = gen_generic['gen_type'] == gen_type # "Wind_onshore"
param_dict = gen_generic.loc[mask,columns_to_add].iloc[0].to_dict()
enriched_wind_assets = assets.assign(**param_dict)
return enriched_wind_assets
[docs]
def add_generic_columns_tpp(assets, gen_generic):
'''
This functions adds generic columns of costs and efficiencies.
This is needed before creating the .csv for OSeMOSYS via Otoole.
'''
columns_to_add = ['variable_om_cost_CAD_per_MWh',
'average_fuel_price_CAD_per_MMBtu',
'average_fuel_price_CAD_per_GJ',
'carbon_emissions',
'heat_rate',
'spinning_reserve_capability',
'economic_life',
'capital_cost_CAD_per_kW',
'capital_overhead_CAD_per_kW',
'overnight_capital_cost_CAD_per_kW',
'interest_during_construction_CAD_per_kW',
'annualized_capital_cost_CAD_per_MWyear',
'total_project_cost_2020_CAD_per_kW',
'ramp_rate_percent_per_min',
"min_up_time_hours",
"min_down_time_hours",
]
# Need to find the matching "generation_type" based on the
# assets = assets.rename(columns={'gen_type': 'generation_type'})
df_new = pd.merge(assets, gen_generic[columns_to_add + ['gen_type']], on='gen_type', how='left')
return df_new
[docs]
def fix_df_ts_index(
df:pd.DataFrame,
start_date:str='2021-01-01 00:00:00',
end_date:str='2021-12-31 23:00:00'):
'''
This function hardcodes and fixes the timeseries to be an 8760 timeseries
beginning in 2021-01-01.
'''
new_indices = pd.date_range(start = start_date, end = end_date, freq='h')
df.index = new_indices
return df
[docs]
def fix_coders_update(data,col_to_correct,codes):
'''
Modified 2024-10-01: CODERS updates to buses creating mismatches in string match incorrectly.
data:
col_to_correct: column name containing the values in which we plan to map to corrections for CODERS.
codes: cords to map from an old string to a new one, example of codes...
codes = {'BC_BR0101_GEN':'BC_BR101_GEN','BC_BR0102_GEN':'BC_BR102_GEN','BC_BR0103_GEN':'BC_BR103_GEN','BC_BR0104_GEN':'BC_BR104_GEN',
'BC_BR0201_GEN':'BC_BR201_GEN','BC_BR0202_GEN':'BC_BR202_GEN','BC_BR0203_GEN':'BC_BR203_GEN','BC_BR0204_GEN':'BC_BR204_GEN'}
'''
mask = data[col_to_correct].isin(codes)
data.loc[mask,col_to_correct] = data.loc[mask,col_to_correct].apply(lambda old_cold: codes[old_cold])