")
email.getEmailContents.foreach {
case emailContent: ArrayEmailContent =>
- if (emailContent.getContent != null) {
- emailContent.getContent.foreach(content => sb.append("").append(content).append(" |
"))
- }
+ emailContent.getContent.foreach(content => sb.append("").append(content).append(" |
"))
case emailContent: StringEmailContent =>
sb.append("").append(emailContent.getContent).append(" |
")
}
@@ -52,13 +45,4 @@ class MultiEmailContentGenerator extends AbstractEmailContentGenerator with Logg
email.setContent(sb.toString)
}
- protected def setHtmlContent(email: MultiContentEmail): Unit = {
- email.getEmailContents.foreach {
- case emailContent: StringEmailContent =>
- if (emailContent.getContent != null) {
- email.setContent(emailContent.getContent)
- }
- }
- }
-
}
\ No newline at end of file
diff --git a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala
index f58d908f28..2bbcb6c217 100644
--- a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala
+++ b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala
@@ -17,7 +17,6 @@
package com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.parser
import java.lang.reflect.{ParameterizedType, Type}
-import java.util
import com.webank.wedatasphere.dss.appconn.sendemail.email.Email
import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.MultiContentEmail
@@ -27,7 +26,7 @@ import org.apache.linkis.common.io.resultset.ResultSetReader
import org.apache.linkis.common.io.{MetaData, Record}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.storage.LineRecord
-import org.apache.linkis.storage.resultset.ResultSetReaderFactory
+import org.apache.linkis.storage.resultset.ResultSetReader
import org.apache.commons.io.IOUtils
abstract class AbstractEmailContentParser[T] extends EmailContentParser {
@@ -42,8 +41,8 @@ abstract class AbstractEmailContentParser[T] extends EmailContentParser {
case _ =>
}
- protected def getResultSetReader(fsPathStore: FsPathStoreEmailContent): ResultSetReader[_, _ ] = {
- val reader = ResultSetReaderFactory.getResultSetReader(fsPathStore.getFsPath.getSchemaPath)
+ protected def getResultSetReader(fsPathStore: FsPathStoreEmailContent): ResultSetReader[_ <: MetaData, _ <: Record] = {
+ val reader = ResultSetReader.getResultSetReader(fsPathStore.getFsPath.getSchemaPath)
reader.getMetaData
reader
}
@@ -56,19 +55,6 @@ abstract class AbstractEmailContentParser[T] extends EmailContentParser {
})(IOUtils.closeQuietly(reader))
}
- protected def getLineRecord(fsPathStore: FsPathStoreEmailContent): Option[String] = {
- val reader = getResultSetReader(fsPathStore)
- val result = new util.ArrayList[String]
- Utils.tryFinally(
- while (reader.hasNext) {
- reader.getRecord match {
- case record: LineRecord => result.add(record.getLine)
- }
- }
- ) (IOUtils.closeQuietly(reader))
- Option.apply(String.join("",result))
- }
-
protected def getEmailContentClass: Type = getClass.getGenericSuperclass match {
case pType: ParameterizedType => pType.getActualTypeArguments.head
}
diff --git a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala
index d2b5520d88..cab083f70c 100644
--- a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala
+++ b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala
@@ -16,40 +16,12 @@
package com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.parser
-import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.{CsvAttachment, MultiContentEmail, PngAttachment}
-import com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.HtmlItem
+import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.MultiContentEmail
import com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.domain.HtmlEmailContent
-import org.apache.linkis.server.JSONUtils
-
-import java.nio.charset.StandardCharsets
-import java.util.Base64
-import scala.sys.error
object HtmlEmailContentParser extends AbstractEmailContentParser[HtmlEmailContent] {
override protected def parseEmailContent(emailContent: HtmlEmailContent,
multiContentEmail: MultiContentEmail): Unit = {
- getFirstLineRecord(emailContent).foreach(htmlStr =>
- emailContent.getFileType match {
- case "html" =>
- val htmlItems: Array[HtmlItem] = JSONUtils.gson.fromJson(htmlStr, classOf[Array[HtmlItem]])
- htmlItems.foreach {
- case htmlItem: HtmlItem =>
- if (htmlItem.getFileType.equals("attachment") && htmlItem.getContentType.equals("csv")) {
- val csvName = htmlItem.getFileName
- val csvContent = htmlItem.getContent
- val csvContentBytes = csvContent.getBytes
- val csvBase64BaseContent = new String(Base64.getEncoder.encode(csvContentBytes), StandardCharsets.UTF_8)
- multiContentEmail.addAttachment(new CsvAttachment(csvName, csvBase64BaseContent))
- } else if (htmlItem.getContentType.equals("image") && htmlItem.getFileType.equals("inline")) {
- multiContentEmail.addAttachment(new PngAttachment(htmlItem.getContentId, htmlItem.getContent))
- } else if (htmlItem.getContentType.equals("html")) {
- emailContent.setContent(htmlItem.getContent)
- } else {
- error("unknow content type: " + emailContent.getFileType)
- }
- }
- case _ =>
- }
- )
+ getFirstLineRecord(emailContent).foreach(emailContent.setContent)
}
}
diff --git a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala
index 8f4ac80417..5162486081 100644
--- a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala
+++ b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala
@@ -17,88 +17,36 @@
package com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.parser
import java.awt.image.BufferedImage
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
-import java.util
-import java.util.{Base64, Iterator, UUID}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.util.{Base64, UUID}
-import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.{AbstractEmail, ExcelAttachment, MultiContentEmail, PdfAttachment, PngAttachment}
+import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.{AbstractEmail, MultiContentEmail, PngAttachment}
import com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.domain.PictureEmailContent
import org.apache.linkis.common.conf.Configuration
-import javax.imageio.{ImageIO, ImageReader}
+import javax.imageio.ImageIO
import org.apache.commons.codec.binary.Base64OutputStream
import com.webank.wedatasphere.dss.appconn.sendemail.conf.SendEmailAppConnConfiguration._
-import com.webank.wedatasphere.dss.appconn.sendemail.exception.EmailSendFailedException
-import javax.imageio.stream.ImageInputStream
-import org.apache.commons.io.IOUtils
-import org.apache.linkis.common.utils.Utils
object PictureEmailContentParser extends AbstractEmailContentParser[PictureEmailContent] {
override protected def parseEmailContent(emailContent: PictureEmailContent,
multiContentEmail: MultiContentEmail): Unit = {
- getLineRecord(emailContent).foreach { imageStr =>
- emailContent.getFileType match {
- case "checkData" =>
- //对于邮件校验数据不进行处理
- case "pdf" =>
- val pdfUUID: String = UUID.randomUUID.toString
- val pdfName = pdfUUID + ".pdf"
- val decoder = Base64.getDecoder
- val byteArr = decoder.decode(imageStr)
- multiContentEmail.addAttachment(new PdfAttachment(pdfName, Base64.getEncoder.encodeToString(byteArr)))
- case "excel" =>
- //val excelUUID: String = UUID.randomUUID.toString
- val excelName = emailContent.getFileName + ".xlsx"
- val decoder = Base64.getDecoder
- val byteArr = decoder.decode(imageStr)
- multiContentEmail.addAttachment(new ExcelAttachment(excelName, Base64.getEncoder.encodeToString(byteArr)))
- case _ =>
- var inputStream: ByteArrayInputStream = null
- Utils.tryFinally({
- val decoder = Base64.getDecoder
- val byteArr = decoder.decode(imageStr)
- if (CHECK_EMAIL_IMAGE_SWITCH.getValue) {
- checkImageSize(byteArr)
- }
- inputStream = new ByteArrayInputStream(byteArr)
- val image = ImageIO.read(inputStream)
- val contents = generateImage(image, multiContentEmail)
- emailContent.setContent(contents)
- })(IOUtils.closeQuietly(inputStream))
-
- }
+ getFirstLineRecord(emailContent).foreach { imageStr =>
+ val decoder = Base64.getDecoder
+ val byteArr = decoder.decode(imageStr)
+ val inputStream = new ByteArrayInputStream(byteArr)
+ val image = ImageIO.read(inputStream)
+ val contents = generateImage(image, multiContentEmail)
+ emailContent.setContent(contents)
}
}
- protected def checkImageSize(byteArr: Array[Byte]): Unit = {
- var reader: ImageReader = null
- val inputStream: InputStream = new ByteArrayInputStream(byteArr)
- var imageInputStream: ImageInputStream = null
- Utils.tryFinally({
- imageInputStream = ImageIO.createImageInputStream(inputStream)
- val imageReaders: util.Iterator[ImageReader] = ImageIO.getImageReaders(imageInputStream)
- if (!imageReaders.hasNext) throw new EmailSendFailedException(80002,"Unsupported image format!")
- reader = imageReaders.next
- reader.setInput(imageInputStream)
- val height = reader.getHeight(0)
- val width = reader.getWidth(0)
- if ((height * width) > EMAIL_IMAGE_MAXSIZE.getValue) {
- throw new EmailSendFailedException(80002, "too large picture size :" + (height * width) + ", expect max picture size is :" + EMAIL_IMAGE_MAXSIZE.getValue)
- }
- })({
- if (reader != null) reader.dispose()
- IOUtils.closeQuietly(imageInputStream)
- IOUtils.closeQuietly(inputStream)
- })
-
- }
-
protected def generateImage(bufferedImage: BufferedImage, email: AbstractEmail): Array[String] = {
val imageUUID: String = UUID.randomUUID.toString
val width: Int = bufferedImage.getWidth
val height: Int = bufferedImage.getHeight
// 只支持修改visualis图片大小,后续如果有新增其他类型的邮件需要修改图片大小,需要在if中加上该邮件类型
- val imagesCuts = if (height > EMAIL_IMAGE_HEIGHT.getValue) {
+ val imagesCuts = if (email.getEmailType.contains("visualis") && height > EMAIL_IMAGE_HEIGHT.getValue) {
val numOfCut = Math.ceil(height.toDouble / EMAIL_IMAGE_HEIGHT.getValue).toInt
val realHeight = height / numOfCut
(0 until numOfCut).map(i => bufferedImage.getSubimage(0, i * realHeight, width, realHeight)).toArray
@@ -114,7 +62,8 @@ object PictureEmailContentParser extends AbstractEmailContentParser[PictureEmail
var iHeight = image.getHeight
var iWidth = image.getWidth
- if (iWidth > EMAIL_IMAGE_WIDTH.getValue) {
+
+ if (email.getEmailType.contains("visualis") && iWidth > EMAIL_IMAGE_WIDTH.getValue) {
iHeight = ((EMAIL_IMAGE_WIDTH.getValue.toDouble / iWidth.toDouble) * iHeight.toDouble).toInt
iWidth = EMAIL_IMAGE_WIDTH.getValue
}
diff --git a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/svgs/sendemail.svg b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/svgs/sendemail.svg
deleted file mode 100644
index b27d35c970..0000000000
--- a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/svgs/sendemail.svg
+++ /dev/null
@@ -1 +0,0 @@
-
\ No newline at end of file
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/pom.xml b/dss-appconn/appconns/dss-sparketl-appconn/pom.xml
deleted file mode 100644
index 5e79c264a3..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/pom.xml
+++ /dev/null
@@ -1,166 +0,0 @@
-
-
-
- dss
- com.webank.wedatasphere.dss
- 1.1.2
- ../../../pom.xml
-
- 4.0.0
-
- dss-sparketl-appconn
-
-
-
- com.webank.wedatasphere.dss
- dss-appconn-core
- ${dss.version}
-
-
- linkis-common
- org.apache.linkis
-
-
- json4s-jackson_2.11
- org.json4s
-
-
- com.webank.wedatasphere.dss
- dss-origin-sso-integration-standard
-
-
-
-
-
- com.webank.wedatasphere.dss
- dss-development-process-standard
- ${dss.version}
-
-
-
- com.webank.wedatasphere.dss
- dss-development-process-standard-execution
- ${dss.version}
-
-
-
- org.apache.linkis
- linkis-module
- ${linkis.version}
- provided
-
-
- httpclient
- org.apache.httpcomponents
-
-
- true
-
-
- org.apache.linkis
- linkis-cs-common
- ${linkis.version}
- compile
-
-
- linkis-bml-client
-
-
- gson
- com.google.code.gson
-
-
- org.apache.linkis
- ${linkis.version}
- provided
- true
-
-
-
- org.apache.linkis
- linkis-httpclient
- ${linkis.version}
-
-
- linkis-common
- org.apache.linkis
-
-
- json4s-jackson_2.11
- org.json4s
-
-
-
-
-
- org.apache.linkis
- linkis-storage
- ${linkis.version}
- provided
-
-
- linkis-common
- org.apache.linkis
-
-
-
-
-
- com.webank.wedatasphere.dss
- dss-common
- ${dss.version}
- provided
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-deploy-plugin
-
-
-
- net.alchim31.maven
- scala-maven-plugin
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
- org.apache.maven.plugins
- maven-assembly-plugin
- 2.3
- false
-
-
- make-assembly
- package
-
- single
-
-
-
- src/main/assembly/distribution.xml
-
-
-
-
-
- false
- out
- false
- false
-
- src/main/assembly/distribution.xml
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/assembly/distribution.xml b/dss-appconn/appconns/dss-sparketl-appconn/src/main/assembly/distribution.xml
deleted file mode 100644
index e54763fcae..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/assembly/distribution.xml
+++ /dev/null
@@ -1,76 +0,0 @@
-
-
-
- dss-sparketl-appconn
-
- dir
-
- true
- sparketl
-
-
-
-
-
- lib
- true
- true
- false
- true
- true
-
-
-
-
-
- ${basedir}/src/main/resources
-
- appconn.properties
-
- 0777
- /
- unix
-
-
-
- ${basedir}/src/main/resources
-
- log4j.properties
- log4j2.xml
-
- 0777
- conf
- unix
-
-
-
- ${basedir}/src/main/resources
-
- init.sql
-
- 0777
- db
-
-
-
-
-
-
-
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/icons/sparketl.icon b/dss-appconn/appconns/dss-sparketl-appconn/src/main/icons/sparketl.icon
deleted file mode 100644
index bf047fb7b2..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/icons/sparketl.icon
+++ /dev/null
@@ -1 +0,0 @@
-
\ No newline at end of file
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/SparkEtlAppConn.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/SparkEtlAppConn.java
deleted file mode 100644
index 1f6dfc3dfc..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/SparkEtlAppConn.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.webank.wedatasphere.dss.appconn.sparketl;
-
-import com.webank.wedatasphere.dss.appconn.core.ext.OnlyDevelopmentAppConn;
-import com.webank.wedatasphere.dss.appconn.core.impl.AbstractAppConn;
-import com.webank.wedatasphere.dss.appconn.sparketl.standard.SparkEtlDevelopmentStandard;
-import com.webank.wedatasphere.dss.standard.app.development.standard.DevelopmentIntegrationStandard;
-import org.apache.linkis.common.conf.CommonVars;
-
-
-public class SparkEtlAppConn extends AbstractAppConn implements OnlyDevelopmentAppConn {
-
- private SparkEtlDevelopmentStandard developmentStandard;
-
- @Override
- protected void initialize() {
- developmentStandard = new SparkEtlDevelopmentStandard();
- }
-
- @Override
- public DevelopmentIntegrationStandard getOrCreateDevelopmentStandard() {
- return developmentStandard;
- }
-
-}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/constant/SparkEtlTaskStatusEnum.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/constant/SparkEtlTaskStatusEnum.java
deleted file mode 100644
index d7bc0dd50a..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/constant/SparkEtlTaskStatusEnum.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package com.webank.wedatasphere.dss.appconn.sparketl.constant;
-
-public enum SparkEtlTaskStatusEnum {
-
- /**
- * Task status enum
- */
- SUBMITTED(1, "已提交", "SUBMITTED"),
- INITED(2, "初始化", "INITED"),
- RUNNING(3, "运行中", "RUNNING"),
- SUCCEED(4, "成功", "SUCCEED"),
- PASS_CHECKOUT(5, "通过校验", "PASS_CHECKOUT"),
- FAIL_CHECKOUT(6, "未通过校验", "FAIL_CHECKOUT"),
- FAILED(7, "失败", "FAILED"),
- TASK_NOT_EXIST(8, "Task不存在", "TASK_NOT_EXIST"),
- CANCELLED(9, "取消", "CANCELLED"),
- TIMEOUT(10, "超时", "TIMEOUT"),
- SCHEDULED(11, "调度中", "SCHEDULED"),
- SUBMIT_PENDING(12, "提交阻塞", "SUBMIT_PENDING");
-
- private Integer code;
- private String message;
- private String state;
-
- SparkEtlTaskStatusEnum(Integer code, String message, String state) {
- this.code = code;
- this.message = message;
- this.state = state;
- }
-
- public Integer getCode() {
- return code;
- }
-
- public String getMessage() {
- return message;
- }
-
- public String getState() {
- return state;
- }
-
-}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlExecutionService.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlExecutionService.java
deleted file mode 100644
index b8e9cd0f34..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlExecutionService.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.webank.wedatasphere.dss.appconn.sparketl.execution;
-
-import com.webank.wedatasphere.dss.standard.app.development.operation.RefExecutionOperation;
-import com.webank.wedatasphere.dss.standard.app.development.service.AbstractRefExecutionService;
-
-public class SparkEtlExecutionService extends AbstractRefExecutionService {
-
- @Override
- public RefExecutionOperation createRefExecutionOperation() {
- return new SparkEtlRefExecutionOperation();
- }
-
-}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionAction.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionAction.java
deleted file mode 100644
index fc94e8c84b..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionAction.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2019 WeBank
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.webank.wedatasphere.dss.appconn.sparketl.execution;
-
-import com.webank.wedatasphere.dss.standard.app.development.listener.common.AbstractRefExecutionAction;
-import com.webank.wedatasphere.dss.standard.app.development.listener.common.LongTermRefExecutionAction;
-
-public class SparkEtlRefExecutionAction extends AbstractRefExecutionAction implements LongTermRefExecutionAction {
-
- private String applicationId;
- private String executionUser;
- private int schedulerId;
-
- public SparkEtlRefExecutionAction() {
- }
-
- public SparkEtlRefExecutionAction(String applicationId) {
- this.applicationId = applicationId;
- }
-
- public SparkEtlRefExecutionAction(String applicationId, String executionUser) {
- this.applicationId = applicationId;
- this.executionUser = executionUser;
- }
-
- public String getApplicationId() {
- return applicationId;
- }
-
- public void setApplicationId(String applicationId) {
- this.applicationId = applicationId;
- }
-
- public String getExecutionUser() {
- return executionUser;
- }
-
- public void setExecutionUser(String executionUser) {
- this.executionUser = executionUser;
- }
-
- @Override
- public void setSchedulerId(int schedulerId) {
- this.schedulerId = schedulerId;
- }
-
- @Override
- public int getSchedulerId() {
- return schedulerId;
- }
-}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionOperation.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionOperation.java
deleted file mode 100644
index 22a6db0841..0000000000
--- a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionOperation.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Copyright 2019 WeBank
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.webank.wedatasphere.dss.appconn.sparketl.execution;
-
-import com.google.gson.Gson;
-import com.webank.wedatasphere.dss.appconn.sparketl.constant.SparkEtlTaskStatusEnum;
-import com.webank.wedatasphere.dss.appconn.sparketl.utils.HttpUtils;
-import com.webank.wedatasphere.dss.standard.app.development.listener.common.RefExecutionState;
-import com.webank.wedatasphere.dss.standard.app.development.listener.common.RefExecutionAction;
-import com.webank.wedatasphere.dss.standard.app.development.listener.core.LongTermRefExecutionOperation;
-import com.webank.wedatasphere.dss.standard.app.development.listener.core.Killable;
-import com.webank.wedatasphere.dss.standard.app.development.listener.core.Procedure;
-import com.webank.wedatasphere.dss.standard.app.development.listener.ref.ExecutionResponseRef;
-import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException;
-import com.webank.wedatasphere.dss.standard.app.development.listener.ref.RefExecutionRequestRef.RefExecutionProjectWithContextRequestRef;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.ws.rs.HttpMethod;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.web.client.RestTemplate;
-
-public class SparkEtlRefExecutionOperation extends LongTermRefExecutionOperation
- implements Killable, Procedure {
- private static final String SUBMIT_TASK_PATH = "sparketl/outer/api/v1/execution";
- private static final String GET_TASK_STATUS_PATH = "sparketl/outer/api/v1/application/{applicationId}/status/";
- private static final String GET_TASK_RESULT_PATH = "sparketl/outer/api/v1/application/{applicationId}/result/";
- private static final String KILL_TASK_PATH = "sparketl/outer/api/v1/execution/application/kill/{applicationId}/{executionUser}";
- private static final String NODE_NAME_KEY = "nodeName";
- private static final String EXECUTION_USER_KEY = "executionUser";
- private static final String WDS_SUBMIT_USER_KEY = "wds.dss.workflow.submit.user";
-
- private static Logger LOGGER = LoggerFactory.getLogger(SparkEtlRefExecutionOperation.class);
-
- private String appId = "linkis_id";
- private String appToken = "***REMOVED***";
-
- private static final String FILTER = "filter";
-
- @Override
- public boolean kill(RefExecutionAction action) {
- String applicationId = ((SparkEtlRefExecutionAction) action).getApplicationId();
- String executtionUser = ((SparkEtlRefExecutionAction) action).getExecutionUser();
- if (applicationId == null) {
- LOGGER.error("Cannot get application id from SparkEtlNodeExecutionAction. Kill SparkEtl job failed.");
- return false;
- }
- try {
- // Send request and get response
- RestTemplate restTemplate = new RestTemplate();
- HttpHeaders headers = new HttpHeaders();
- HttpEntity entity = new HttpEntity(headers);
-
- String path = KILL_TASK_PATH.replace("{applicationId}", applicationId).replace("{executionUser}", executtionUser);
- URI url = HttpUtils.buildUrI(getBaseUrl(), path, appId, appToken, RandomStringUtils.randomNumeric(5), String.valueOf(System.currentTimeMillis()));
- String startLog = String.format("Start to kill job. url: %s, method: %s, body: %s", url, HttpMethod.GET, entity);
- LOGGER.info(startLog);
- Map response = restTemplate.getForEntity(url.toString(), Map.class).getBody();
-
- if (response == null) {
- String errorMsg = String.format("Error! Can not get kill result, job_id: %s, response is null", applicationId);
- LOGGER.error(errorMsg);
- return false;
- }
-
- if (!checkResponse(response)) {
- String message = (String) response.get("message");
- String errorMsg = String.format("Error! Can not get kill result, exception: {}", message);
- LOGGER.error(errorMsg);
- return false;
- }
-
- String finishLog = String.format("Succeed to get kill result. response: %s", response);
- LOGGER.info(finishLog);
- return true;
- } catch (Exception e) {
- String errorMsg = String.format("Error! Can not kill job result, job_id: %s", applicationId);
- LOGGER.error(errorMsg, e);
- return false;
- }
-
- }
-
- @Override
- protected RefExecutionAction submit(RefExecutionProjectWithContextRequestRef requestRef) throws ExternalOperationFailedException {
- try {
- Map jobContent = requestRef.getRefJobContent();
- Map runtimeMap = requestRef.getExecutionRequestRefContext().getRuntimeMap();
- String executionUser = String.valueOf(runtimeMap.get(WDS_SUBMIT_USER_KEY).toString());
- String realExecutionUser = String.valueOf(runtimeMap.get(EXECUTION_USER_KEY) != null ? runtimeMap.get(EXECUTION_USER_KEY).toString() : "");
- String nodeName = String.valueOf(runtimeMap.get(NODE_NAME_KEY));
- if (StringUtils.isEmpty(nodeName)) {
- nodeName = requestRef.getName();
- }
- LOGGER.info("The node name: " + nodeName);
-
- if (nodeName == null) {
- String errorMsg = "Error! Can not submit job, node name is null";
- LOGGER.error(errorMsg);
- return null;
- }
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
-
- Gson gson = new Gson();
-
- Map requestPayLoad = new HashMap<>();
- requestPayLoad.put("execution_user", StringUtils.isNotBlank(realExecutionUser) ? realExecutionUser : executionUser);
- requestPayLoad.put("create_user", executionUser);
- requestPayLoad.put("node_name", nodeName);
-
- LOGGER.info("The execution user: " + (StringUtils.isNotBlank(realExecutionUser) ? realExecutionUser : executionUser));
- // Get parameters.
- LOGGER.info("The execution request: " + runtimeMap);
-
- StringBuffer executionParam = new StringBuffer();
-
- getVariables(requestRef, runtimeMap, requestPayLoad, executionParam);
-
- HttpEntity