@@ -86,10 +86,9 @@ class TaskMonitorService extends Logging {
86
86
streamTasks.filter(shouldMonitor).foreach { streamTask =>
87
87
val job = streamJobMapper.getJobById(streamTask.getJobId)
88
88
if (! JobConf .SUPPORTED_MANAGEMENT_JOB_TYPES .getValue.contains(job.getJobType)) {
89
- val userList = Sets .newHashSet(job.getSubmitUser, job.getCreateBy)
90
- userList.addAll(getAlertUsers(job))
89
+ val userList = getAlertUsers(job)
91
90
val alertMsg = s " Spark Streaming应用[ ${job.getName}]已经超过 ${Utils .msDurationToString(System .currentTimeMillis - streamTask.getLastUpdateTime.getTime)} 没有更新状态, 请及时确认应用是否正常! "
92
- alert(jobService.getAlertLevel(job), alertMsg, new util. ArrayList [ String ]( userList) , streamTask)
91
+ alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask)
93
92
} else {
94
93
streamTask.setLastUpdateTime(new Date )
95
94
streamTaskMapper.updateTask(streamTask)
@@ -110,7 +109,6 @@ class TaskMonitorService extends Logging {
110
109
} else {
111
110
// 连续三次还是出现异常,说明Linkis的Manager已经不能正常提供服务,告警并不再尝试获取状态,等待下次尝试
112
111
val users = getAlertUsers(job)
113
- users.add(job.getCreateBy)
114
112
alert(jobService.getAlertLevel(job), s " 请求LinkisManager失败,Linkis集群出现异常,请关注!影响任务[ ${job.getName}] " , users, streamTask)
115
113
}
116
114
}
@@ -138,9 +136,8 @@ class TaskMonitorService extends Logging {
138
136
}
139
137
case _ =>
140
138
}
141
- val userList = Sets .newHashSet(job.getSubmitUser, job.getCreateBy)
142
- userList.addAll(getAlertUsers(job))
143
- alert(jobService.getAlertLevel(job), alertMsg, new util.ArrayList [String ](userList), streamTask)
139
+ val userList = getAlertUsers(job)
140
+ alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask)
144
141
}
145
142
}
146
143
}
@@ -159,12 +156,19 @@ class TaskMonitorService extends Logging {
159
156
}
160
157
161
158
protected def getAlertUsers (job : StreamJob ): util.List [String ] = {
162
- var users = jobService.getAlertUsers(job)
163
- if (users == null ) {
164
- users = new util.ArrayList [String ]()
159
+ val allUsers = new util.LinkedHashSet [String ]()
160
+ val alertUsers = jobService.getAlertUsers(job)
161
+ if (alertUsers!= null ) {
162
+ alertUsers.foreach(user => {
163
+ allUsers.add(user)
164
+ })
165
165
}
166
- users.addAll(util.Arrays .asList(JobConf .STREAMIS_DEVELOPER .getValue.split(" ," ):_* ))
167
- users
166
+ allUsers.add(job.getSubmitUser)
167
+ allUsers.add(job.getCreateBy)
168
+ util.Arrays .asList(JobConf .STREAMIS_DEVELOPER .getValue.split(" ," ):_* ).foreach(user => {
169
+ allUsers.add(user)
170
+ })
171
+ new util.ArrayList [String ](allUsers)
168
172
}
169
173
170
174
protected def alert (alertLevel : AlertLevel , alertMsg : String , users : util.List [String ], streamTask: StreamTask ): Unit = alerters.foreach{ alerter =>
0 commit comments