|
20 | 20 |
|
21 | 21 | import dask
|
22 | 22 | import distributed
|
23 |
| -from dask.dataframe.optimize import optimize as dd_optimize |
24 | 23 | from dask.distributed import Client, get_client
|
25 | 24 |
|
26 | 25 | from crossfit.backend.gpu import HAS_GPU
|
@@ -93,60 +92,6 @@ def increase_gc_threshold():
|
93 | 92 | gc.set_threshold(g0 * 3, g1 * 3, g2 * 3)
|
94 | 93 |
|
95 | 94 |
|
96 |
| -def ensure_optimize_dataframe_graph(ddf=None, dsk=None, keys=None): |
97 |
| - """Perform HLG DataFrame optimizations |
98 |
| -
|
99 |
| - If `ddf` is specified, an optimized Dataframe |
100 |
| - collection will be returned. If `dsk` and `keys` |
101 |
| - are specified, an optimized graph will be returned. |
102 |
| -
|
103 |
| - These optimizations are performed automatically |
104 |
| - when a DataFrame collection is computed/persisted, |
105 |
| - but they are NOT always performed when statistics |
106 |
| - are computed. The purpose of this utility is to |
107 |
| - ensure that the Dataframe-based optimizations are |
108 |
| - always applied. |
109 |
| -
|
110 |
| - Parameters |
111 |
| - ---------- |
112 |
| - ddf : dask_cudf.DataFrame, optional |
113 |
| - The dataframe to optimize, by default None |
114 |
| - dsk : dask.highlevelgraph.HighLevelGraph, optional |
115 |
| - Dask high level graph, by default None |
116 |
| - keys : List[str], optional |
117 |
| - The keys to optimize, by default None |
118 |
| -
|
119 |
| - Returns |
120 |
| - ------- |
121 |
| - Union[dask_cudf.DataFrame, dask.highlevelgraph.HighLevelGraph] |
122 |
| - A dask_cudf DataFrame or dask HighLevelGraph depending |
123 |
| - on the parameters provided. |
124 |
| -
|
125 |
| - Raises |
126 |
| - ------ |
127 |
| - ValueError |
128 |
| - If ddf is not provided and one of dsk or keys are None. |
129 |
| - """ |
130 |
| - |
131 |
| - if ddf is None: |
132 |
| - if dsk is None or keys is None: |
133 |
| - raise ValueError("Must specify both `dsk` and `keys` if `ddf` is not supplied.") |
134 |
| - dsk = ddf.dask if dsk is None else dsk |
135 |
| - keys = ddf.__dask_keys__() if keys is None else keys |
136 |
| - |
137 |
| - if isinstance(dsk, dask.highlevelgraph.HighLevelGraph): |
138 |
| - with dask.config.set({"optimization.fuse.active": False}): |
139 |
| - dsk = dd_optimize(dsk, keys=keys) |
140 |
| - |
141 |
| - if ddf is None: |
142 |
| - # Return optimized graph |
143 |
| - return dsk |
144 |
| - |
145 |
| - # Return optimized ddf |
146 |
| - ddf.dask = dsk |
147 |
| - return ddf |
148 |
| - |
149 |
| - |
150 | 95 | class Distributed:
|
151 | 96 | """Distributed-Execution Context Manager
|
152 | 97 |
|
|
0 commit comments