diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 0060d8a..3c24b3d 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -6,7 +6,7 @@ on: pull_request: branches: [ master ] env: - DASK_SERVER_IP: 52.190.25.183 + DASK_SERVER_IP: ${{ secrets.DASK_SERVER_IP }} BLOB_ACCOUNT_NAME: ${{ secrets.BLOB_ACCOUNT_NAME }} BLOB_ACCOUNT_KEY: ${{ secrets.BLOB_ACCOUNT_KEY }} jobs: diff --git a/fwi/run.py b/fwi/run.py index 101e2a5..aa14911 100644 --- a/fwi/run.py +++ b/fwi/run.py @@ -251,7 +251,7 @@ def fwi_gradient(vp_in, nshots, client, solver, shots_container, auth, scale_gra solver.model.update("vp", vp) # Dask enforces this for large objects - f_solver = client.scatter(solver, broadcast=True) + f_solver = client.scatter(solver, broadcast=False) futures = [] diff --git a/fwi/shotprocessors.py b/fwi/shotprocessors.py index 764a641..f886807 100644 --- a/fwi/shotprocessors.py +++ b/fwi/shotprocessors.py @@ -27,10 +27,10 @@ def process_shot(i, solver, shots_container, auth, exclude_boundaries=True, dt=N rec0, u0, _ = solver.forward(save=True, dt=dt) residual = Receiver(name='rec', grid=solver.model.grid, data=rec0.data - rec, - time_range=solver.geometry.time_axis, + time_range=solver.geometry.time_axis, dtype=solver.model.dtype, coordinates=solver.geometry.rec_positions) - objective = .5*np.linalg.norm(residual.data.ravel())**2 + objective = (.5*np.linalg.norm(residual.data.ravel())**2).astype(solver.model.dtype) grad, _ = solver.gradient(residual, u=u0, dt=dt) diff --git a/kubernetes/dask-cluster.yaml b/kubernetes/dask-cluster.yaml index 4f4cc61..512b638 100644 --- a/kubernetes/dask-cluster.yaml +++ b/kubernetes/dask-cluster.yaml @@ -22,7 +22,7 @@ spec: "beta.kubernetes.io/os": linux containers: - name: devito-server - image: devitoaks.azurecr.io/daks-base:v8 + image: devitoaks.azurecr.io/daks-base:v9 command: ['/venv/bin/dask-scheduler'] ports: - containerPort: 8786 @@ -52,7 +52,7 @@ kind: Deployment metadata: name: devito-worker spec: - replicas: 15 + replicas: 2 strategy: rollingUpdate: maxSurge: 1 @@ -83,7 +83,7 @@ spec: value: "cores" - name: "OMP_NUM_THREADS" value: "4" - image: devitoaks.azurecr.io/daks-base:v8 + image: devitoaks.azurecr.io/daks-base:v9 command: ['/venv/bin/dask-worker', 'tcp://devito-server:8786', '--memory-limit', '13G', '--resources', 'tasks=1', '--nprocs', '1', '--nthreads', '1'] ports: - containerPort: 80 diff --git a/requirements.txt b/requirements.txt index 3224ad0..b7cc052 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ tqdm bokeh azure-storage-blob==2.1.0 devito -dask +dask==2.22.0 distributed numpy cloudpickle diff --git a/tests/conftest.py b/tests/conftest.py index 429d5ca..954d230 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,7 +15,7 @@ def auth(): def model(): initial_model_filename = "overthrust_3D_initial_model_2D.h5" datakey = "m0" - return "%s:%s" % (initial_model_filename, datakey) + return initial_model_filename, datakey @pytest.fixture @@ -40,7 +40,7 @@ def nbl(): @pytest.fixture def solver_params(model, auth, tn, so, dtype, nbl): - initial_model_filename, datakey = model.split(":") + initial_model_filename, datakey = model return {'h5_file': Blob("models", initial_model_filename, auth=auth), 'tn': tn, 'space_order': so, 'dtype': dtype, 'datakey': datakey, 'nbl': nbl, 'opt': ('noop', {'openmp': True, 'par-dynamic-work': 1000})} diff --git a/tests/test_dask.py b/tests/test_dask.py index 1f26818..f6200e7 100644 --- a/tests/test_dask.py +++ b/tests/test_dask.py @@ -20,12 +20,57 @@ def remote_test(): assert(result) -def test_dask_pickling(solver, client): +@pytest.mark.skip(reason="Platforms don't always match") +def test_dask_configuration(solver, client): + + def get_configuration(): + from devito import configuration + return str(configuration) + + config_future = client.submit(get_configuration) + + wait(config_future) + + config_remote = config_future.result() + print("Remote") + print(config_remote) + print("Local") + print(get_configuration()) + assert(config_remote == get_configuration()) + + +@pytest.mark.skip(reason="Sonames are different if platforms are different") +def test_dask_soname(solver, client): + op = solver.op_fwd() + + def soname(op): + return op._soname + + # Force local JITing rec1, u1, _ = solver.forward() + soname_future = client.submit(soname, op) + + wait(soname_future) + + soname_remote = soname_future.result() + + assert(soname_remote == op._soname) + + +def test_dask_pickling(solver, client): + def noop_function(x): return x + # Important that the solver is sent before being JITed + # See above skipped test for reason + solver_future = client.submit(noop_function, solver) + wait(solver_future) + + solver2 = solver_future.result() + + rec1, u1, _ = solver.forward() rec_future = client.submit(noop_function, rec1) u_future = client.submit(noop_function, u1) @@ -41,8 +86,26 @@ def noop_function(x): assert(np.allclose(u1.data, u2.data, atol=0., rtol=0.)) + assert(solver2.model.shape == solver.model.shape) + assert(solver2.model.nbl == solver.model.nbl) + assert(solver2.model.origin == solver.model.origin) + assert(solver2.model.spacing == solver.model.spacing) + assert(solver2.model.dtype == solver.model.dtype) + assert((solver2.geometry.src_positions == solver.geometry.src_positions).all()) + assert((solver2.geometry.rec_positions == solver.geometry.rec_positions).all()) + assert(solver2.geometry.f0 == solver.geometry.f0) + assert(solver2.geometry.tn == solver.geometry.tn) + assert(solver2.geometry.t0 == solver.geometry.t0) + assert(solver2.geometry.dt == solver.geometry.dt) + assert(solver2.geometry.nt == solver.geometry.nt) + assert(solver2.space_order == solver.space_order) + assert(solver2.kernel == solver.kernel) + assert(solver2._kwargs == solver._kwargs) + + assert(str(solver2.op_fwd()) == str(solver.op_fwd())) + assert(str(solver2.op_grad()) == str(solver.op_grad())) + -@pytest.mark.skip(reason="Numerical mismatch") def test_remote_devito(solver, client): future = client.submit(solver.forward) rec1, u1, _ = solver.forward() @@ -50,5 +113,12 @@ def test_remote_devito(solver, client): rec2, u2, _ = future.result() print(np.linalg.norm(rec1.data)) print(np.linalg.norm(rec2.data)) - assert(np.allclose(rec1.data, rec2.data, atol=0., rtol=0.)) - assert((u1.data == u2.data).all()) + print(np.linalg.norm(u1.data), np.linalg.norm(u2.data)) + error_rec = rec1.data - rec2.data + rel_rec = error_rec/rec1.data + print("rec", np.min(error_rec), np.max(error_rec), np.min(rel_rec), np.max(rel_rec)) + error_u = np.absolute(u1.data - u2.data) + rel_u = error_u/u1.data + print("u", np.min(error_u), np.max(error_u), np.min(rel_u), np.max(rel_u)) + assert(np.allclose(rec1.data, rec2.data, atol=1e-7, rtol=1e-5)) + assert(np.allclose(u1.data, u2.data, atol=1e-7, rtol=1e-5)) diff --git a/tests/test_fwi.py b/tests/test_fwi.py index 8e4ae98..b4b34b3 100644 --- a/tests/test_fwi.py +++ b/tests/test_fwi.py @@ -6,7 +6,6 @@ from fwi.run import initial_setup, fwi_gradient from fwi.dasksetup import setup_dask from fwi.io import Blob -from fwi.overthrust import overthrust_solver_iso from fwi.shotprocessors import process_shot, process_shot_checkpointed from util import mat2vec, trim_boundary, vec2mat @@ -16,7 +15,7 @@ def shots_container(): return "shots-iso-40-nbl-40-so-16" -def fwi_gradient_local(vp_in, nshots, solver, shots_container): +def fwi_gradient_local(vp_in, nshots, solver, shots_container, auth): model = solver.model vp_in = np.array(vec2mat(vp_in, solver.model.vp.shape), dtype=solver.model.dtype) @@ -30,32 +29,26 @@ def fwi_gradient_local(vp_in, nshots, solver, shots_container): grad = np.zeros(model.vp.shape) for i in range(nshots): - o, g = process_shot(i, solver, shots_container, exclude_boundaries=False) + o, g = process_shot(i, solver, shots_container, auth, exclude_boundaries=False) objective += o grad += g return objective, -mat2vec(grad).astype(np.float64) -@pytest.mark.skip(reason="Hangs indefinitely") -def test_equivalence_local_remote_single_shot(shots_container): +@pytest.mark.xfail(reason="Numerical mismatch") +def test_equivalence_local_remote_single_shot(shots_container, solver, auth, client): initial_model_filename, tn, dtype, so, nbl = "overthrust_3D_initial_model_2D.h5", 4000, np.float32, 6, 40 - model, _, bounds = initial_setup(filename=Blob("models", initial_model_filename), tn=tn, dtype=dtype, + model, _, bounds = initial_setup(filename=Blob("models", initial_model_filename, auth=auth), tn=tn, dtype=dtype, space_order=so, nbl=nbl) - solver_params = {'h5_file': Blob("models", initial_model_filename), 'tn': tn, 'space_order': so, 'dtype': dtype, - 'datakey': 'm0', 'nbl': nbl} - - solver = overthrust_solver_iso(**solver_params) - v0 = mat2vec(model.vp.data).astype(np.float64) - local_results = fwi_gradient_local(v0, 1, solver, shots_container) + remote_results = fwi_gradient(v0, 1, client, solver, shots_container, auth, exclude_boundaries=False, + scale_gradient=None, mute_water=False) - client = setup_dask() - - remote_results = fwi_gradient(v0, 1, client, solver, shots_container, exclude_boundaries=False, - scale_gradient=False, mute_water=False) + # Calling local version first leads to random "CommClosed" errors + local_results = fwi_gradient_local(v0, 1, solver, shots_container, auth) np.testing.assert_approx_equal(local_results[0], remote_results[0]) @@ -77,31 +70,18 @@ def test_vec2mat(): @pytest.mark.xfail(reason="Numerical mismatch") @pytest.mark.parametrize('shot_id', [20]) @pytest.mark.parametrize('exclude_boundaries', [True, False]) -def test_shot(shot_id, shots_container, exclude_boundaries): - initial_model_filename = "overthrust_3D_initial_model_2D.h5" - tn = 4000 - dtype = np.float32 - so = 6 - nbl = 40 - exclude_boundaries = True - shot_id = 1 - solver_params = {'h5_file': Blob("models", initial_model_filename), 'tn': tn, - 'space_order': so, 'dtype': dtype, 'datakey': 'm0', 'nbl': nbl, - 'opt': ('noop', {'openmp': True, 'par-dynamic-work': 1000})} - - solver1 = overthrust_solver_iso(**solver_params) - o1, grad1 = process_shot(shot_id, solver1, shots_container, exclude_boundaries) +def test_shot(shot_id, shots_container, exclude_boundaries, solver, auth): client = setup_dask() - future = client.submit(process_shot, shot_id, solver1, shots_container, exclude_boundaries) + future = client.submit(process_shot, shot_id, solver, shots_container, auth, exclude_boundaries) wait(future) - o2, grad2 = future.result() + o1, grad1 = process_shot(shot_id, solver, shots_container, auth, exclude_boundaries) assert(np.allclose(grad1, grad2, atol=0., rtol=0.)) assert(o1 == o2) -def test_equivalence_shot_checkpointing(shots_container, auth): +def test_equivalence_shot_checkpointing(shots_container, auth, solver): initial_model_filename = "overthrust_3D_initial_model_2D.h5" tn = 4000 dtype = np.float32 @@ -111,28 +91,20 @@ def test_equivalence_shot_checkpointing(shots_container, auth): water_depth = 20 shot_id = 1 - solver_params = {'h5_file': Blob("models", initial_model_filename, auth=auth), 'tn': tn, - 'space_order': so, 'dtype': dtype, 'datakey': 'm0', 'nbl': nbl, - 'opt': ('noop', {'openmp': True, 'par-dynamic-work': 1000})} - - solver1 = overthrust_solver_iso(**solver_params) - solver2 = overthrust_solver_iso(**solver_params) - model, geometry, _ = initial_setup(Blob("models", initial_model_filename, auth=auth), tn, dtype, so, nbl, datakey="m0", exclude_boundaries=exclude_boundaries, water_depth=water_depth) - o2, grad2 = process_shot(shot_id, solver1, shots_container, auth, exclude_boundaries) - o1, grad1 = process_shot_checkpointed(shot_id, solver2, shots_container, auth, exclude_boundaries) + o2, grad2 = process_shot(shot_id, solver, shots_container, auth, exclude_boundaries) + o1, grad1 = process_shot_checkpointed(shot_id, solver, shots_container, auth, exclude_boundaries) - np.testing.assert_approx_equal(o1, o2, significant=5) - assert(np.allclose(grad1, grad2, rtol=1e-4)) + np.testing.assert_approx_equal(o1, o2, significant=7) + assert(np.allclose(grad1, grad2, atol=0.)) -@pytest.mark.skip(reason="Hangs indefinitely") @pytest.mark.parametrize('mute_water', [True, False]) @pytest.mark.parametrize('scale_gradient', [None, 'L', 'W']) @pytest.mark.parametrize('exclude_boundaries', [True, False]) -def test_equivalence_checkpointing(shots_container, exclude_boundaries, scale_gradient, mute_water): +def test_equivalence_checkpointing(shots_container, exclude_boundaries, scale_gradient, mute_water, client, solver, auth): initial_model_filename = "overthrust_3D_initial_model_2D.h5" tn = 4000 dtype = np.float32 @@ -140,12 +112,6 @@ def test_equivalence_checkpointing(shots_container, exclude_boundaries, scale_gr nbl = 40 water_depth = 20 nshots = 1 - client = setup_dask() - - solver_params = {'h5_file': Blob("models", initial_model_filename), 'tn': tn, - 'space_order': so, 'dtype': dtype, 'datakey': 'm0', 'nbl': nbl} - - solver = overthrust_solver_iso(**solver_params) model, geometry, _ = initial_setup(initial_model_filename, tn, dtype, so, nbl, datakey="m0", exclude_boundaries=exclude_boundaries, @@ -156,23 +122,15 @@ def test_equivalence_checkpointing(shots_container, exclude_boundaries, scale_gr else: v0 = mat2vec(model.vp.data).astype(np.float64) - o1, grad1 = fwi_gradient(v0, nshots, client, solver, shots_container, scale_gradient, mute_water, + o1, grad1 = fwi_gradient(v0, nshots, client, solver, shots_container, auth, scale_gradient, mute_water, exclude_boundaries, water_depth, checkpointing=True) - o2, grad2 = fwi_gradient(v0, nshots, client, solver, shots_container, scale_gradient, mute_water, + o2, grad2 = fwi_gradient(v0, nshots, client, solver, shots_container, auth, scale_gradient, mute_water, exclude_boundaries, water_depth) - print(o1, np.linalg.norm(grad1), grad1.shape) - print(o2, np.linalg.norm(grad2), grad2.shape) - - grad1_diag = [grad1[k, k] for k in range(40)] - grad2_diag = [grad2[k, k] for k in range(40)] - - print(grad1_diag) - print(grad2_diag) - np.testing.assert_approx_equal(o1, o2, significant=5) + np.testing.assert_approx_equal(o1, o2, significant=7) - np.testing.assert_array_almost_equal(grad1, grad2, significant=5) + np.testing.assert_array_almost_equal(grad1, grad2) if __name__ == "__main__":