Skip to content

Commit 98b5ed3

Browse files
committed
Streamis scheduler module and Reconstruct the TaskService class (50%)
1 parent 94d7476 commit 98b5ed3

File tree

51 files changed

+1978
-381
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1978
-381
lines changed

db/streamis_ddl.sql

+4-2
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ CREATE TABLE `linkis_stream_job` (
139139
`job_type` varchar(30) DEFAULT NULL COMMENT '目前只支持flink.sql、flink.jar',
140140
`submit_user` varchar(100) DEFAULT NULL,
141141
`workspace_name` varchar(50) DEFAULT NULL,
142-
PRIMARY KEY (`id`) USING BTREE
142+
PRIMARY KEY (`id`) USING BTREE,
143+
UNIQUE KEY(`project_name`, `name`)
143144
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='作业表';
144145

145146
/*Table structure for table `linkis_stream_job_version` */
@@ -155,7 +156,8 @@ CREATE TABLE `linkis_stream_job_version` (
155156
`comment` varchar(255) DEFAULT NULL,
156157
`create_time` datetime DEFAULT NULL,
157158
`create_by` varchar(32) DEFAULT NULL,
158-
PRIMARY KEY (`id`) USING BTREE
159+
PRIMARY KEY (`id`) USING BTREE,
160+
UNIQUE KEY(`job_id`, `version`)
159161
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='作业表';
160162

161163
/*Table structure for table `linkis_stream_job_version_files` */

pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@
107107
<artifactId>linkis-mybatis</artifactId>
108108
<version>${linkis.version}</version>
109109
</dependency>
110+
<!--scheduler module-->
111+
<dependency>
112+
<groupId>org.apache.linkis</groupId>
113+
<artifactId>linkis-scheduler</artifactId>
114+
<version>${linkis.version}</version>
115+
</dependency>
110116
<dependency>
111117
<groupId>org.apache.linkis</groupId>
112118
<artifactId>linkis-module</artifactId>

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/java/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/state/AbstractLinkisJobStateFetcher.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,27 @@
2626
* 3) Destroy to close the http client when the system is closed
2727
* @param <T>
2828
*/
29-
public class AbstractLinkisJobStateFetcher<T extends JobState> implements JobStateFetcher<T> {
29+
public abstract class AbstractLinkisJobStateFetcher<T extends JobState> implements JobStateFetcher<T> {
3030
@Override
3131
public void init() {
3232

3333
}
3434

3535
@Override
3636
public T getState(JobInfo jobInfo) {
37-
return null;
37+
// Fetch and return FsStateInfo
38+
FsStateInfo fsStateInfo = null;
39+
return getState(fsStateInfo);
3840
}
3941

4042
@Override
4143
public void destroy() {
4244

4345
}
46+
47+
protected abstract T getState(FsStateInfo fsStateInfo);
48+
49+
protected static class FsStateInfo{
50+
51+
}
4452
}

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/FlinkJobLaunchManager.scala

+8-6
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
6666
* @return the job id.
6767
*/
6868
override def launch(job: LaunchJob, jobState: JobState): String = {
69-
70-
null
69+
launch(job)
7170
}
7271

7372
override def launch(job: LaunchJob): String = {
@@ -80,10 +79,13 @@ trait FlinkJobLaunchManager extends LinkisJobLaunchManager with Logging {
8079
val onceJob = buildOnceJob(job)
8180
onceJob.submit()
8281
onceJobs synchronized onceJobs.put(onceJob.getId, onceJob)
83-
val linkisJobInfo = Utils.tryCatch(createJobInfo(onceJob, job)){ t =>
84-
error(s"${job.getSubmitUser} create jobInfo failed, now stop this EngineConn ${onceJob.getId}.")
85-
stop(onceJob)
86-
throw t
82+
val linkisJobInfo = Utils.tryCatch(createJobInfo(onceJob, job)){
83+
case e: FlinkJobLaunchErrorException =>
84+
throw e
85+
case t: Throwable =>
86+
error(s"${job.getSubmitUser} create jobInfo failed, now stop this EngineConn ${onceJob.getId}.")
87+
stop(onceJob)
88+
throw new FlinkJobLaunchErrorException(-1, "Fail to obtain launched job info", t)
8789
}
8890
onceJobs synchronized onceJobIdToJobInfo.put(onceJob.getId, linkisJobInfo)
8991
onceJob.getId

streamis-jobmanager/streamis-job-launcher/streamis-job-launcher-linkis/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/launcher/linkis/job/manager/SimpleFlinkJobLaunchManager.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ class SimpleFlinkJobLaunchManager extends FlinkJobLaunchManager {
102102
jobInfo.setECMInstance(simpleOnceJob.getECMServiceInstance)
103103
case _ =>
104104
}
105-
fetchApplicationInfo(jobInfo)
105+
Utils.tryCatch(fetchApplicationInfo(jobInfo)) { t =>
106+
throw new FlinkJobLaunchErrorException(-1, "Unable to fetch the application info of launched job, maybe the engine has been shutdown", t)}
106107
jobInfo.setResources(nodeInfo.get("nodeResource").asInstanceOf[util.Map[String, Object]])
107108
jobInfo
108109
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/dao/StreamJobMapper.java

+27-5
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,30 @@
1616
package com.webank.wedatasphere.streamis.jobmanager.manager.dao;
1717

1818
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.*;
19-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.QueryJobListVO;
20-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.VersionDetailVO;
19+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.QueryJobListVo;
20+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.VersionDetailVo;
2121
import org.apache.ibatis.annotations.Param;
2222

2323
import java.util.List;
2424

2525

2626
public interface StreamJobMapper {
2727

28-
List<QueryJobListVO> getJobLists(@Param("projectName") String projectName, @Param("name") String name,
29-
@Param("status") Integer status, @Param("createBy") String createBy);
28+
List<QueryJobListVo> getJobLists(@Param("projectName") String projectName, @Param("name") String name,
29+
@Param("status") Integer status, @Param("createBy") String createBy);
3030

3131
StreamJob getJobById(@Param("jobId") Long jobId);
3232

3333

3434
List<StreamJobVersion> getJobVersions(@Param("jobId") Long jobId);
3535

36+
/**
37+
* Get the latest job version
38+
* @param jobId job id
39+
* @return job version
40+
*/
41+
StreamJobVersion getLatestJobVersion(@Param("jobId") Long jobId);
42+
3643
StreamJobVersion getJobVersionById(@Param("jobId") Long jobId, @Param("version") String version);
3744

3845
void insertJob(StreamJob streamJob);
@@ -43,11 +50,26 @@ List<QueryJobListVO> getJobLists(@Param("projectName") String projectName, @Para
4350

4451
List<StreamJob> getJobListsByProjectName(String projectName);
4552

46-
VersionDetailVO getVersionDetail(@Param("jobId") Long jobId, @Param("version") String version);
53+
VersionDetailVo getVersionDetail(@Param("jobId") Long jobId, @Param("version") String version);
4754

4855
void insertJobVersionFiles(StreamJobVersionFiles jobVersionFiles);
4956

5057
List<StreamJobVersionFiles> getStreamJobVersionFiles(@Param("jobId") Long jobId, @Param("jobVersionId") Long jobVersionId);
5158

5259
StreamJob getCurrentJob(@Param("projectName")String projectName, @Param("jobName")String jobName);
60+
61+
/**
62+
* Query and lock current job
63+
* @param projectName project name
64+
* @param jobName job name
65+
* @return stream job
66+
*/
67+
StreamJob queryAndLockJobInCondition(@Param("projectName")String projectName, @Param("jobName")String jobName);
68+
69+
/**
70+
* Query and lock by job id
71+
* @param jobId job id
72+
* @return stream job
73+
*/
74+
StreamJob queryAndLockJobById(@Param("jobId")Long jobId);
5375
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/dao/StreamTaskMapper.java

+10
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,17 @@ public interface StreamTaskMapper {
2727

2828
void updateTask(StreamTask streamTask);
2929

30+
/**
31+
* Update task which in specific status
32+
* @param streamTask stream task
33+
* @param status status
34+
*/
35+
int updateTaskInStatus(@Param("task")StreamTask streamTask, @Param("status")Integer status);
36+
3037
List<StreamTask> getByJobVersionId(@Param("jobVersionId") Long jobVersionId, @Param("version") String version);
3138

39+
StreamTask getLatestByJobVersionId(@Param("jobVersionId") Long jobVersionId, @Param("version") String version);
40+
3241
StreamTask getRunningTaskByJobId(@Param("jobId") Long jobId);
3342

3443
StreamTask getTaskById(@Param("id") Long id);
@@ -38,4 +47,5 @@ public interface StreamTaskMapper {
3847
List<StreamTask> getTasksByStatus(List<Integer> status);
3948

4049
String getTask(@Param("jobId") Long jobId, @Param("version") String version);
50+
4151
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/StreamJobMapper.xml

+17-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<result column="job_type" property="jobType" jdbcType="VARCHAR"/>
3232
</resultMap>
3333

34-
<resultMap id="StreamJobVOMap" type="com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.QueryJobListVO">
34+
<resultMap id="StreamJobVOMap" type="com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.QueryJobListVo">
3535
<id column="id" property="id" jdbcType="BIGINT"/>
3636
<result column="workspace_name" property="workspaceName" jdbcType="VARCHAR"/>
3737
<result column="project_name" property="projectName" jdbcType="VARCHAR"/>
@@ -124,6 +124,11 @@
124124
from linkis_stream_job_version where job_id=#{jobId} order by version desc
125125
</select>
126126

127+
<!--Get the latest job version-->
128+
<select id="getLatestJobVersion" resultMap="StreamJobVersionMap">
129+
SELECT * FROM linkis_stream_job_version WHERE job_id=#{jobId} ORDER BY version desc LIMIT 1
130+
</select>
131+
127132
<select id="getJobVersionById" resultMap="StreamJobVersionMap">
128133
select
129134
*
@@ -135,7 +140,7 @@
135140
</select>
136141

137142
<select id="getVersionDetail"
138-
resultType="com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.VersionDetailVO">
143+
resultType="com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.VersionDetailVo">
139144
SELECT j.id,v.version,j.description,DATE_FORMAT(v.create_time,"%Y-%m-%d %H:%i:%s") AS releaseTime,j.create_by AS createBy,j.project_name
140145
FROM `linkis_stream_job` j , linkis_stream_job_version v
141146
WHERE v.job_id = j.id AND j.id = #{jobId} AND v.version=#{version}
@@ -156,6 +161,16 @@
156161
ORDER BY l.create_time DESC LIMIT 1
157162
</select>
158163

164+
<select id="queryAndLockJobInCondition" parameterType="java.lang.String"
165+
resultType="com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob">
166+
SELECT <include refid="Job_Column"/> FROM
167+
linkis_stream_job WHERE project_name = #{projectName} AND `name` = #{jobName} FOR UPDATE;
168+
</select>
169+
170+
<select id="queryAndLockJobById" resultType="com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob">
171+
SELECT <include refid="Job_Column"/> FROM
172+
linkis_stream_job WHERE id = #{jobId};
173+
</select>
159174

160175
<insert id="insertJob" useGeneratedKeys="true" keyProperty="id"
161176
parameterType="com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob">

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/StreamTaskMapper.xml

+37
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,28 @@
6464
WHERE id=#{id}
6565
</update>
6666

67+
<update id="updateTaskInStatus">
68+
UPDATE linkis_stream_task
69+
<trim prefix="set" suffixOverrides=",">
70+
<if test="status != null">
71+
status=#{task.status},
72+
</if>
73+
<if test="lastUpdateTime != null">
74+
last_update_time=#{task.lastUpdateTime},
75+
</if>
76+
<if test="linkisJobId != null">
77+
linkis_job_id=#{task.linkisJobId},
78+
</if>
79+
<if test="linkisJobInfo != null">
80+
linkis_job_info=#{task.linkisJobInfo},
81+
</if>
82+
<if test="errDesc != null">
83+
err_desc=#{task.errDesc},
84+
</if>
85+
</trim>
86+
WHERE id=#{task.id} AND status = #{status};
87+
</update>
88+
6789
<select id="getByJobVersionId" resultMap="StreamTaskMap">
6890
SELECT `id`,`job_version_id`,`job_id`, status
6991
,`start_time`,`last_update_time`,
@@ -80,6 +102,21 @@
80102
ORDER BY start_time DESC
81103
</select>
82104

105+
<select id="getLatestByJobVersionId" resultMap="StreamTaskMap">
106+
SELECT `id`,`job_version_id`,`job_id`, status
107+
,`start_time`,`last_update_time`,
108+
`err_desc`,`submit_user`, `linkis_job_id`, `linkis_job_info`
109+
FROM linkis_stream_task
110+
<where>
111+
<if test="jobVersionId != null">
112+
AND job_version_id=#{jobVersionId}
113+
</if>
114+
<if test="version != null">
115+
AND version=#{version}
116+
</if>
117+
</where>
118+
ORDER BY start_time DESC LIMIT 1
119+
</select>
83120
<select id="getRunningTaskByJobId" resultMap="StreamTaskMap">
84121
SELECT <include refid="StreamTask_Column"/>
85122
FROM linkis_stream_task

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/entity/StreamTask.java

+14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.manager.entity;
1717

18+
import java.util.Calendar;
1819
import java.util.Date;
1920

2021

@@ -31,6 +32,19 @@ public class StreamTask {
3132
private String version;
3233
private Integer status;
3334

35+
public StreamTask(){
36+
Calendar calendar = Calendar.getInstance();
37+
this.lastUpdateTime = calendar.getTime();
38+
this.startTime = calendar.getTime();
39+
}
40+
41+
public StreamTask(Long jobId, Long jobVersionId, String version, String submitUser){
42+
this();
43+
this.jobId = jobId;
44+
this.jobVersionId = jobVersionId;
45+
this.version = version;
46+
this.submitUser = submitUser;
47+
}
3448
public String getVersion() {
3549
return version;
3650
}
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.util.List;
2121

22-
public class JobDetailsVO {
22+
public class JobDetailsVo {
2323

2424
private List<RealTimeTrafficDTO> realTimeTraffic;
2525
private List<DataNumberDTO> dataNumber;
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo;
1717

18-
public class JobProgressVO {
18+
public class JobProgressVo {
1919
private Long taskId;
2020
private Integer progress;
2121

Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo;
1717

18-
public class PublishRequestVO {
18+
public class PublishRequestVo {
1919

2020
private Long projectId;
2121
/**
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import java.util.Date;
1919

20-
public class QueryJobListVO {
20+
public class QueryJobListVo {
2121
private Long id;
2222
private String name;
2323
private Long workspaceName;
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo;
1717

1818

19-
public class StreamTaskListVO {
19+
public class StreamTaskListVo {
2020
private Long taskId;
2121
private Long jobVersionId;
2222
private String jobName;
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
/**
1919
* job核心指标
2020
*/
21-
public class TaskCoreNumVO {
21+
public class TaskCoreNumVo {
2222
private Long projectId;
2323
private String projectName;
2424
//失败任务数目
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo;
1717

18-
import java.util.Date;
19-
2018
/**
2119
* Created by v_wbyynie on 2021/6/18.
2220
*/
23-
public class VersionDetailVO {
21+
public class VersionDetailVo {
2422
private Long id;
2523
private String version;
2624
private String description;

0 commit comments

Comments
 (0)