-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnonblocking_read.py
150 lines (116 loc) · 4.38 KB
/
nonblocking_read.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#
# Copyright (C) 2024, Northwestern University and Argonne National Laboratory
# See COPYRIGHT notice in top-level directory.
#
"""
This example is the counterpart of nonblocking_write.py, but does reading
instead. It opens the output file created by nonblocking_write.py, a netcdf
file in CDF-5 format, and reads a number of 3D integer non-record variables.
To run:
mpiexec -n num_processes nonblocking_read.py [filename]
All variables are partitioned among all processes in a 3D block-block-block
fashion.
"""
import sys, os, argparse
import numpy as np
from mpi4py import MPI
import pnetcdf
def pnetcdf_io(filename):
# Open the file for reading
f = pnetcdf.File(filename = filename,
mode = 'r',
comm = comm,
info = None)
# obtain the number of variables defined in the input file
nvars = len(f.variables)
if verbose and rank == 0:
print("number of variables =", nvars)
buf = []
reqs = []
for key, value in f.variables.items():
var = value
# print names of all variables
if verbose and rank == 0:
print("variable: ",key," ndim=",var.ndim)
# obtain the number of dimensions of variable
ndim = var.ndim
# obtain sizes of dimensions
dims = var.get_dims()
if verbose and rank == 0:
for i in range(ndim):
print("variable ",key," dim[",i,"] name=",dims[i].name," size=",dims[i].size)
# set up subarray access pattern
start = np.zeros(ndim, dtype=np.int32)
count = np.zeros(ndim, dtype=np.int32)
gsizes = np.zeros(ndim, dtype=np.int32)
psizes = MPI.Compute_dims(nprocs, ndim)
start[0] = rank % psizes[0]
start[1] = (rank // psizes[1]) % psizes[1]
start[2] = (rank // (psizes[0] * psizes[1])) % psizes[2]
# calculate sizes of local read buffers
bufsize = 1
for i in range(ndim):
gsizes[i] = dims[i].size
start[i] *= dims[i].size // psizes[i]
count[i] = dims[i].size // psizes[i]
bufsize *= count[i]
if verbose:
print("rank ",rank," gsizes=",gsizes," start=",start," count=",count," bufsize=",bufsize)
# Allocate read buffer and initialize with all -1
rbuf = np.empty(bufsize, dtype=np.int32)
rbuf.fill(-1)
buf.append(rbuf)
# Read one variable at a time, using iput APIs
req_id = var.iget_var(rbuf, start = start, count = count)
reqs.append(req_id)
# commit posted nonblocking requests
errs = [None] * nvars
f.wait_all(nvars, reqs, errs)
# check errors
for i in range(nvars):
if pnetcdf.strerrno(errs[i]) != "NC_NOERR":
print(f"Error on request {i}:", pnetcdf.strerror(errs[i]))
# verify contents of read buffers
for i in range(nvars):
for j in range(len(buf[i])):
expect = rank * i + 123 + j
if buf[i][j] != expect:
print("Error: buf[",i,"][",j,"] expect ",expect," but got ",buf[i][j])
break
# Close the file
f.close()
def parse_help():
help_flag = "-h" in sys.argv or "--help" in sys.argv
if help_flag and rank == 0:
help_text = (
"Usage: {} [-h | -q] [file_name]\n"
" [-h] Print help\n"
" [-q] Quiet mode (reports when fail)\n"
" [filename] (Optional) input netCDF file name\n"
).format(sys.argv[0])
print(help_text)
return help_flag
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if parse_help():
MPI.Finalize()
sys.exit(1)
# get command-line arguments
args = None
parser = argparse.ArgumentParser()
parser.add_argument("dir", nargs="?", type=str, help="(Optional) input netCDF file name",\
default = "testfile.nc")
parser.add_argument("-q", help="Quiet mode (reports when fail)", action="store_true")
args = parser.parse_args()
verbose = False if args.q else True
filename = args.dir
if verbose and rank == 0:
print("{}: example of calling nonblocking read APIs".format(os.path.basename(__file__)))
try:
pnetcdf_io(filename)
except BaseException as err:
print("Error: type:", type(err), str(err))
raise
MPI.Finalize()