@@ -70,9 +70,9 @@ def udf_step_extract(row):
70
70
_fwjr_id = row ['meta_data' ]['fwjr_id' ]
71
71
_jobtype = row ['meta_data' ]['jobtype' ]
72
72
_ts = row ['meta_data' ]['ts' ]
73
- if 'steps' in row and row ['steps' ]:
73
+ if 'steps' in row and row ['steps' ] is not None :
74
74
for step in filter (None , row ['steps' ]):
75
- if ( 'name' in step ) and step ['name' ].lower ().startswith ('cmsrun' ):
75
+ if 'name' in step and step ['name' ].lower ().startswith ('cmsrun' ):
76
76
step_res = {'Task' : _task_name , 'ts' : _ts , 'fwjr_id' : _fwjr_id , 'JobType' : _jobtype }
77
77
78
78
count += 1
@@ -88,11 +88,11 @@ def udf_step_extract(row):
88
88
step_res ['threads_total_job_time' ] = step_res ['job_time' ] * step_res ['nthreads' ]
89
89
except Exception :
90
90
step_res ['threads_total_job_time' ] = None
91
- if step ['output' ]:
91
+ if 'output' in step and step ['output' ] is not None :
92
92
for outx in step ['output' ]:
93
93
if outx ['acquisitionEra' ]:
94
94
step_res ['acquisition_era' ].append (outx ['acquisitionEra' ])
95
- if 'performance' in step :
95
+ if 'performance' in step and step [ 'performance' ] is not None :
96
96
performance = step ['performance' ]
97
97
if 'storage' in performance :
98
98
if 'writeTotalMB' in performance ['storage' ]:
@@ -174,6 +174,7 @@ def main(start_date, end_date, hdfs_out_dir, last_n_days):
174
174
.filter (f"""data.meta_data.jobstate='success'
175
175
AND data.wmats >= { start_date .timestamp ()}
176
176
AND data.wmats < { end_date .timestamp ()}
177
+ AND data.task IS NOT NULL
177
178
""" )
178
179
.filter (col ('data.meta_data.jobtype' ).isin (_PROD_CMS_JOB_TYPES_FILTER ))
179
180
)
0 commit comments