Source code for src.Welsch_intraday2
import pandas as pd
import numpy as np
from collections import Counter
[docs]
def intraday_welsch2(storage_path, blocksperday, timeslices, representative_days, chronological_sequence, storage_level_start, output_path):
# Carregar os arquivos
storagelevel_df = pd.read_csv(storage_path)
# Criar os vetores de controle
dailytimebracket_vector = np.tile(np.arange(1, blocksperday + 1), (timeslices // blocksperday + 1))[:timeslices]
daytype_vector = np.repeat(np.arange(1, len(representative_days) + 1), (timeslices // len(representative_days)))[:timeslices]
# Criar o DataFrame a partir dos vetores esperados
expected_df = pd.DataFrame({
'DAYTYPE': daytype_vector,
'DAILYTIMEBRACKET': dailytimebracket_vector
})
# Inserir chave única para evitar problemas de repetição nos joins
expected_df['Key'] = expected_df['DAYTYPE'].astype(str) + '-' + expected_df['DAILYTIMEBRACKET'].astype(str)
storagelevel_df['Key'] = storagelevel_df['DAYTYPE'].astype(str) + '-' + storagelevel_df['DAILYTIMEBRACKET'].astype(str)
# Juntar o DataFrame esperado com o DataFrame original baseado na chave
result_df = pd.merge(expected_df, storagelevel_df.drop(columns=['DAYTYPE', 'DAILYTIMEBRACKET']), on='Key', how='left')
result_df['VALUE'] = result_df['VALUE'].fillna(0) # Preencher valores faltantes com 0
# Remover a chave temporária
result_df.drop(columns='Key', inplace=True)
# Repetir os resultados das linhas dos mesmos dias típicos
count = Counter(chronological_sequence)
daysindaytype = [count[day] for day in representative_days]
repeated_rows = []
for day, repeat_count in zip(range(1, len(representative_days) + 1), daysindaytype):
day_rows = result_df[result_df['DAYTYPE'] == day]
repeated_rows.append(pd.concat([day_rows] * repeat_count, ignore_index=True))
result_df = pd.concat(repeated_rows, ignore_index=True)
# Cria a coluna 'VALUEACCUMULATED' para iniciar com 0 e depois acumular os valores
result_df['VALUEACCUMULATED'] = result_df['VALUE'].shift(1).fillna(0).cumsum() + storage_level_start
# Salvar o DataFrame corrigido em um arquivo CSV
result_df.to_csv(output_path, index=False)
return output_path