22
22
import markupsafe
23
23
import pytest
24
24
25
- from airflow .dag_processing . processor import DagFileProcessor
25
+ from airflow .models . errors import ParseImportError
26
26
from airflow .security import permissions
27
27
from airflow .utils .state import State
28
28
from airflow .www .utils import UIAlert
37
37
client_with_login ,
38
38
)
39
39
40
- pytestmark = pytest .mark .db_test
40
+ pytestmark = [ pytest .mark .db_test , pytest . mark . need_serialized_dag ]
41
41
42
42
43
43
def clean_db ():
@@ -203,81 +203,58 @@ def client_single_dag_edit(app, user_single_dag_edit):
203
203
TEST_TAGS = ["example" , "test" , "team" , "group" ]
204
204
205
205
206
- def _process_file (file_path ):
207
- dag_file_processor = DagFileProcessor (dag_ids = [], dag_directory = "/tmp" , log = mock .MagicMock ())
208
- dag_file_processor .process_file (file_path , [])
209
-
210
-
211
206
@pytest .fixture
212
- def _working_dags (tmp_path ):
213
- dag_contents_template = "from airflow import DAG\n dag = DAG('{}', schedule=None, tags=['{}'])"
207
+ def _working_dags (dag_maker ):
214
208
for dag_id , tag in zip (TEST_FILTER_DAG_IDS , TEST_TAGS ):
215
- path = tmp_path / f" { dag_id } .py"
216
- path . write_text ( dag_contents_template . format ( dag_id , tag ))
217
- _process_file ( path )
209
+ with dag_maker ( dag_id = dag_id , fileloc = f"/ { dag_id } .py", tags = [ tag ]):
210
+ # We need to enter+exit the dag maker context for it to create the dag
211
+ pass
218
212
219
213
220
214
@pytest .fixture
221
- def _working_dags_with_read_perm (tmp_path ):
222
- dag_contents_template = "from airflow import DAG\n dag = DAG('{}', schedule=None, tags=['{}'])"
223
- dag_contents_template_with_read_perm = (
224
- "from airflow import DAG\n dag = DAG('{}', schedule=None, tags=['{}'], "
225
- "access_control={{'role_single_dag':{{'can_read'}}}}) "
226
- )
215
+ def _working_dags_with_read_perm (dag_maker ):
227
216
for dag_id , tag in zip (TEST_FILTER_DAG_IDS , TEST_TAGS ):
228
- path = tmp_path / f"{ dag_id } .py"
229
217
if dag_id == "filter_test_1" :
230
- path . write_text ( dag_contents_template_with_read_perm . format ( dag_id , tag ))
218
+ access_control = { "role_single_dag" : { "can_read" }}
231
219
else :
232
- path .write_text (dag_contents_template .format (dag_id , tag ))
233
- _process_file (path )
220
+ access_control = None
221
+
222
+ with dag_maker (dag_id = dag_id , fileloc = f"/{ dag_id } .py" , tags = [tag ], access_control = access_control ):
223
+ pass
234
224
235
225
236
226
@pytest .fixture
237
- def _working_dags_with_edit_perm (tmp_path ):
238
- dag_contents_template = "from airflow import DAG\n dag = DAG('{}', schedule=None, tags=['{}'])"
239
- dag_contents_template_with_read_perm = (
240
- "from airflow import DAG\n dag = DAG('{}', schedule=None, tags=['{}'], "
241
- "access_control={{'role_single_dag':{{'can_edit'}}}}) "
242
- )
227
+ def _working_dags_with_edit_perm (dag_maker ):
243
228
for dag_id , tag in zip (TEST_FILTER_DAG_IDS , TEST_TAGS ):
244
- path = tmp_path / f"{ dag_id } .py"
245
229
if dag_id == "filter_test_1" :
246
- path . write_text ( dag_contents_template_with_read_perm . format ( dag_id , tag ))
230
+ access_control = { "role_single_dag" : { "can_edit" }}
247
231
else :
248
- path .write_text (dag_contents_template .format (dag_id , tag ))
249
- _process_file (path )
232
+ access_control = None
250
233
251
-
252
- @pytest .fixture
253
- def _broken_dags (tmp_path , _working_dags ):
254
- for dag_id in TEST_FILTER_DAG_IDS :
255
- path = tmp_path / f"{ dag_id } .py"
256
- path .write_text ("airflow DAG" )
257
- _process_file (path )
234
+ with dag_maker (dag_id = dag_id , fileloc = f"/{ dag_id } .py" , tags = [tag ], access_control = access_control ):
235
+ pass
258
236
259
237
260
238
@pytest .fixture
261
- def _broken_dags_with_read_perm (tmp_path , _working_dags_with_read_perm ):
239
+ def _broken_dags (session ):
240
+ from airflow .models .errors import ParseImportError
241
+
262
242
for dag_id in TEST_FILTER_DAG_IDS :
263
- path = tmp_path / f"{ dag_id } .py"
264
- path .write_text ("airflow DAG" )
265
- _process_file (path )
243
+ session .add (ParseImportError (filename = f"/{ dag_id } .py" , stacktrace = "Some Error\n Traceback:\n " ))
244
+ session .commit ()
266
245
267
246
268
247
@pytest .fixture
269
- def _broken_dags_after_working (tmp_path ):
248
+ def _broken_dags_after_working (dag_maker , session ):
270
249
# First create and process a DAG file that works
271
- path = tmp_path / "all_in_one.py"
272
- contents = "from airflow import DAG\n "
273
- for i , dag_id in enumerate (TEST_FILTER_DAG_IDS ):
274
- contents += f"dag{ i } = DAG('{ dag_id } ', schedule=None)\n "
275
- path .write_text (contents )
276
- _process_file (path )
250
+ path = "/all_in_one.py"
251
+ for dag_id in TEST_FILTER_DAG_IDS :
252
+ with dag_maker (dag_id = dag_id , fileloc = path , session = session ):
253
+ pass
277
254
278
- contents += "foobar()"
279
- path . write_text ( contents )
280
- _process_file ( path )
255
+ # Then create an import error against that file
256
+ session . add ( ParseImportError ( filename = path , stacktrace = "Some Error \n Traceback: \n " ) )
257
+ session . commit ( )
281
258
282
259
283
260
def test_home_filter_tags (_working_dags , admin_client ):
@@ -289,6 +266,7 @@ def test_home_filter_tags(_working_dags, admin_client):
289
266
assert flask .session [FILTER_TAGS_COOKIE ] is None
290
267
291
268
269
+ @pytest .mark .usefixtures ("_broken_dags" , "_working_dags" )
292
270
def test_home_importerrors (_broken_dags , user_client ):
293
271
# Users with "can read on DAGs" gets all DAG import errors
294
272
resp = user_client .get ("home" , follow_redirects = True )
@@ -297,6 +275,7 @@ def test_home_importerrors(_broken_dags, user_client):
297
275
check_content_in_response (f"/{ dag_id } .py" , resp )
298
276
299
277
278
+ @pytest .mark .usefixtures ("_broken_dags" , "_working_dags" )
300
279
def test_home_no_importerrors_perm (_broken_dags , client_no_importerror ):
301
280
# Users without "can read on import errors" don't see any import errors
302
281
resp = client_no_importerror .get ("home" , follow_redirects = True )
@@ -315,7 +294,8 @@ def test_home_no_importerrors_perm(_broken_dags, client_no_importerror):
315
294
"home?lastrun=all_states" ,
316
295
],
317
296
)
318
- def test_home_importerrors_filtered_singledag_user (_broken_dags_with_read_perm , client_single_dag , page ):
297
+ @pytest .mark .usefixtures ("_working_dags_with_read_perm" , "_broken_dags" )
298
+ def test_home_importerrors_filtered_singledag_user (client_single_dag , page ):
319
299
# Users that can only see certain DAGs get a filtered list of import errors
320
300
resp = client_single_dag .get (page , follow_redirects = True )
321
301
check_content_in_response ("Import Errors" , resp )
0 commit comments