Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade pyPDAF to be compatible with PDAF V2.2.1 #4

Merged
merged 6 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
separate model ensemble and model integrator
  • Loading branch information
yumengch committed Jul 22, 2024
commit ef11b9df19789013ea63b7adbe7677ce81811f05
35 changes: 0 additions & 35 deletions example/PDAF_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import parallelisation
import prepost_processing
import state_vector
import step.shift

import pyPDAF.PDAF as PDAF

Expand Down Expand Up @@ -187,40 +186,6 @@ def init_ens_pdaf(self, filtertype:int, dim_p:int, dim_ens:int,
ens_p[:, i] = np.loadtxt(self.initial_ensemble_filename.format(i=i+1))[:, offset:offset+nx_p].ravel()
return state_p, uinv, ens_p, status_pdaf

def forward(self, nsteps:int) -> None:
# When each model task runs one ensemble member,
# i.e. no need to run each ensemble member sequentially,
# we call this full parallel implementation
if self.pe.dim_ens_l == 1:
self.forward_full_parallel(nsteps)
else:
self.forward_flexible(nsteps)

def forward_full_parallel(self, nsteps:int) -> None:
for i in range(nsteps):
self.model_ens[0].field_p = step.shift.step(self.model_ens[0].field_p, self.pe, i+1, config.USE_PDAF)
if config.USE_PDAF:
self.assimilate_full_parallel()

def forward_flexible(self, nsteps:int) -> None:
# create directory to
current_step = 0
# full DA system integration loop
while current_step < nsteps:
if config.USE_PDAF:
# model integration
for _ in range(self.steps_for):
for i in range(self.pe.dim_ens_l):
self.model_ens[i].field_p = step.shift.step(self.model_ens[i].field_p, self.pe, current_step + 1, config.USE_PDAF)
current_step += 1

self.assimilate_flexible()
else:
for i in range(self.pe.dim_ens_l):
self.model_ens[i].field_p = step.shift.step(self.model_ens[i].field_p, self.pe, current_step + 1, config.USE_PDAF)
current_step += 1


def assimilate_full_parallel(self) -> None:
"""Assimilation function for the full parallel implementation
"""
Expand Down
11 changes: 8 additions & 3 deletions example/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import config
import model
import model_integrator
from parallelisation import parallelisation
from PDAF_system import PDAF_system

Expand All @@ -39,13 +40,17 @@ def main():
model_ens[0].print_info(pe)

if not config.USE_PDAF:
for model_t in model_ens:
model_t.init_field(pe.mype_model)
model_ens.init_field(pe.mype_model)

# Initialise model integrator
integrator = model_integrator.model_integrator(model_ens)

# Initialise PDAF system
das = PDAF_system(pe, model_ens)
if config.USE_PDAF:
das.init_pdaf(screen=2)
das.forward(config.nsteps)

integrator.forward(config.nsteps, das)

pe.finalize_parallel()

Expand Down
95 changes: 95 additions & 0 deletions example/model_integrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
from mpi4py import MPI
import numpy as np

import config
import model
import PDAF_system

class model_integrator:
def __init__(self, model_ens: list[model.model]) -> None:
self.model_ens: list[model.model] = model_ens

def forward(self, nsteps:int, PDAF_system:PDAF_system.PDAF_system) -> None:
# When each model task runs one ensemble member,
# i.e. no need to run each ensemble member sequentially,
# we call this full parallel implementation
if PDAF_system.pe.dim_ens_l == 1:
self.forward_full_parallel(nsteps, PDAF_system)
else:
self.forward_flexible(nsteps, PDAF_system)

def forward_full_parallel(self, nsteps:int, PDAF_system:PDAF_system.PDAF_system) -> None:
for i in range(nsteps):
self.model_ens[0].field_p = self.step(self.model_ens[0].field_p, PDAF_system.pe, i+1, config.USE_PDAF)
if config.USE_PDAF:
PDAF_system.assimilate_full_parallel()

def forward_flexible(self, nsteps:int, PDAF_system:PDAF_system.PDAF_system) -> None:
# create directory to
current_step = 0
# full DA system integration loop
while current_step < nsteps:
if config.USE_PDAF:
# model integration
for _ in range(PDAF_system.steps_for):
for i in range(PDAF_system.pe.dim_ens_l):
self.model_ens[i].field_p = self.step(self.model_ens[i].field_p, PDAF_system.pe, current_step + 1, config.USE_PDAF)
current_step += 1

PDAF_system.assimilate_flexible()
else:
for i in range(PDAF_system.pe.dim_ens_l):
self.model_ens[i].field_p = self.step(self.model_ens[i].field_p, PDAF_system.pe, current_step + 1, config.USE_PDAF)
current_step += 1

def step(self, field_p:np.ndarray, pe, step:int, USE_PDAF:bool) -> np.ndarray:
"""shifting model forward 'integration'

Parameters
----------
field_p : `np.ndarray`
model field
pe : `parallelization.parallelization`
parallelization object from example
step : int
current time step
USE_PDAF : bool
whether PDAF is performed
"""
if pe.task_id == 1 and pe.mype_model == 0:
logging.info(f'model step: {step}')

field_p = np.roll(field_p, 1, axis=0)

if USE_PDAF:
return field_p

# collect the length of state vector on each processor
ny, _ = field_p.shape
n_gp = len(field_p.ravel())
send_counts = np.array(pe.comm_model.gather(n_gp, root=0))
if pe.mype_model == 0:
# get the length of the full state vector
n_gp_total = np.sum(send_counts)
# get nx as domain decomposition only occurs at nx direction
nx = n_gp_total//ny
# declare the full ensemble
field = np.zeros(n_gp_total)
# displacement of each of the full ensemble
displacements = np.insert(np.cumsum(send_counts), 0, 0)[0:-1]
else:
displacements = None
field = None

pe.comm_model.Gatherv([field_p.ravel(order='F'), MPI.DOUBLE],
[field,
send_counts, displacements, MPI.DOUBLE],
root=0)

if pe.task_id == 1 and pe.mype_model == 0:
assert field is not None, '...ERROR: field is None'
field = field.reshape(ny, nx, order='F')
np.savetxt(f'true_step{step}.txt', field)

return field_p
13 changes: 10 additions & 3 deletions example/prepost_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ def get_full_ens(self, dim_p:int, dim_ens:int, ens_p:np.ndarray) -> np.ndarray |
# collect the length of state vector on each processor (local domain)
all_dim_p:np.ndarray = np.array(self.pe.comm_filter.gather(dim_p, root=0))

displacements : np.ndarray | None
send_counts : np.ndarray | None
ens : np.ndarray | None
if self.pe.mype_filter == 0:
# number of elements of the array on each processor
send_counts:np.ndarray = all_dim_p*dim_ens
send_counts = all_dim_p*dim_ens
# get the length of the full state vector
dim:int = np.sum(all_dim_p)
# declare the full ensemble
ens:np.ndarray = np.zeros(dim*dim_ens)
ens = np.zeros(dim*dim_ens)
# displacement of each of the full ensemble
displacements:np.ndarray = np.insert(np.cumsum(send_counts), 0, 0)[0:-1]
displacements = np.insert(np.cumsum(send_counts), 0, 0)[0:-1]
else:
displacements = None
ens = None
Expand All @@ -74,6 +77,7 @@ def get_full_ens(self, dim_p:int, dim_ens:int, ens_p:np.ndarray) -> np.ndarray |
send_counts, displacements, MPI.DOUBLE],
root=0)
if self.pe.mype_filter == 0:
assert isinstance(ens, np.ndarray), 'ens should be a numpy array'
ens = ens.reshape(dim, dim_ens)
# As a consequence of domain decomposition in nx instead of ny
# (following the PDAF tutorial)
Expand All @@ -96,6 +100,7 @@ def initial_process(self, step:int, dim_p:int, dim_ens:int, dim_ens_p:int,
"""
ens = self.get_full_ens(dim_p, dim_ens, ens_p)
if self.pe.mype_filter == 0:
assert isinstance(ens, np.ndarray), 'ens should be a numpy array'
logging.info (f'RMS error according to sampled variance: {np.sqrt(np.mean(np.var(ens, axis=1, ddof=1)))}')
return state_p, uinv, ens_p

Expand All @@ -104,6 +109,7 @@ def preprocess(self, step:int, dim_p:int, dim_ens:int, ens_p:np.ndarray) -> None
"""
ens = self.get_full_ens(dim_p, dim_ens, ens_p)
if self.pe.mype_filter == 0:
assert isinstance(ens, np.ndarray), 'ens should be a numpy array'
logging.info (f'Forecast RMS error according to sampled variance: {np.sqrt(np.mean(np.var(ens, axis=1, ddof=1)))}')
os.makedirs('outputs', exist_ok=True)
for i in range(dim_ens):
Expand All @@ -114,6 +120,7 @@ def postprocess(self, step:int, dim_p:int, dim_ens:int, ens_p:np.ndarray) -> Non
"""
ens = self.get_full_ens(dim_p, dim_ens, ens_p)
if self.pe.mype_filter == 0:
assert isinstance(ens, np.ndarray), 'ens should be a numpy array'
logging.info (f'Analysis RMS error according to sampled variance: {np.sqrt(np.mean(np.var(ens_p, axis=1, ddof=1)))}')
os.makedirs('outputs', exist_ok=True)
for i in range(dim_ens):
Expand Down
18 changes: 0 additions & 18 deletions example/step/__init__.py

This file was deleted.

74 changes: 0 additions & 74 deletions example/step/shift.py

This file was deleted.