Skip to content

Commit

Permalink
AlibabaCloud add docker registry, callcaching, glob supports
Browse files Browse the repository at this point in the history
  • Loading branch information
ysp0606 committed Jul 18, 2019
1 parent 6ab134d commit b9d442d
Show file tree
Hide file tree
Showing 47 changed files with 1,372 additions and 249 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Cromwell Change Log

## 45 Release Notes

### BCS backend new Features support

#### New docker registry
Alibaba Cloud Container Registry is now supported for the `docker` runtime attribute, and the previous `dockerTag`
runtime attribute continues to be available for Alibaba Cloud OSS Registry.
#### Call caching
Cromwell now supports Call caching when using the BCS backend.
#### Workflow output glob
Globs can be used to define outputs for BCS backend.
#### NAS mount
Alibaba Cloud NAS is now supported for the `mounts` runtime attribute.

## 44 Release Notes

### Improved PAPI v2 Preemptible VM Support
Expand Down
1 change: 1 addition & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ docker {
}
dockerhub.num-threads = 10
quay.num-threads = 10
alibabacloudcr.num-threads = 10
}
}

Expand Down
1 change: 0 additions & 1 deletion cromwell.example.backends/BCS.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ backend {
#reserveOnFail: true
#autoReleaseJob: true
#verbose: false
#workerPath: "oss://bcs-bucket/workflow/worker.tar.gz"
#systemDisk: "cloud 50"
#dataDisk: "cloud 250 /home/data/"
#timeout: 3000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import common.validation.Validation._
import cromwell.core.actor.StreamIntegration.{BackPressure, StreamContext}
import cromwell.core.{Dispatcher, DockerConfiguration}
import cromwell.docker.DockerInfoActor._
import cromwell.docker.registryv2.flows.alibabacloudcrregistry._
import cromwell.docker.registryv2.DockerRegistryV2Abstract
import cromwell.docker.registryv2.flows.dockerhub.DockerHubRegistry
import cromwell.docker.registryv2.flows.gcr.GcrRegistry
Expand Down Expand Up @@ -237,7 +238,8 @@ object DockerInfoActor {
List(
("dockerhub", { c: DockerRegistryConfig => new DockerHubRegistry(c) }),
("gcr", gcrConstructor),
("quay", { c: DockerRegistryConfig => new QuayRegistry(c) })
("quay", { c: DockerRegistryConfig => new QuayRegistry(c) }),
("alibabacloudcr", {c: DockerRegistryConfig => new AlibabaCloudCRRegistry(c)})
).traverse[ErrorOr, DockerRegistry]({
case (configPath, constructor) => DockerRegistryConfig.fromConfig(config.as[Config](configPath)).map(constructor)
}).unsafe("Docker registry configuration")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package cromwell.docker.registryv2.flows.alibabacloudcrregistry

import akka.stream._
import cats.effect.IO
import com.aliyuncs.DefaultAcsClient
import com.aliyuncs.auth.{AlibabaCloudCredentials, BasicCredentials, BasicSessionCredentials}
import com.aliyuncs.cr.model.v20160607.GetRepoTagsRequest
import com.aliyuncs.profile.DefaultProfile
import com.aliyuncs.profile.IClientProfile
import cromwell.docker.DockerHashResult
import cromwell.docker.DockerInfoActor._
import cromwell.docker._
import cromwell.docker.registryv2.DockerRegistryV2Abstract
import org.http4s.Header
import org.http4s.client.Client
import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}
import spray.json.DefaultJsonProtocol._
import spray.json._

class AlibabaCloudCRRegistry(config: DockerRegistryConfig) extends DockerRegistryV2Abstract(config) {
val ProductName = "cr"
val HashAlg = "sha256"
val regionPattern = """[^\s]+"""
val validAlibabaCloudCRHosts: Regex = s"""registry.($regionPattern).aliyuncs.com""".r


def isValidAlibabaCloudCRHost(host: Option[String]): Boolean = {
host.exists {
_ match {
case validAlibabaCloudCRHosts(_) => true
case _ => false
}
}
}

override def accepts(dockerImageIdentifier: DockerImageIdentifier): Boolean = isValidAlibabaCloudCRHost(dockerImageIdentifier.host)

override protected def getToken(dockerInfoContext: DockerInfoContext)(implicit client: Client[IO]): IO[Option[String]] = {
IO.pure(None)
}

override protected def registryHostName(dockerImageIdentifier: DockerImageIdentifier): String = ""
override protected def authorizationServerHostName(dockerImageIdentifier: DockerImageIdentifier): String = ""
override protected def buildTokenRequestHeaders(dockerInfoContext: DockerInfoContext): List[Header] = List.empty

override protected def getDockerResponse(token: Option[String], dockerInfoContext: DockerInfoContext)(implicit client: Client[IO]): IO[DockerInfoSuccessResponse] = {
getManifest(dockerInfoContext) match {
case success: DockerInfoSuccessResponse => IO(success)
case fail: DockerInfoFailedResponse => IO.raiseError(new Exception(fail.reason))
case other => IO.raiseError(new Exception(s"Get manifest failed, $other"))
}
}

private def getManifest(context: DockerInfoContext): DockerInfoResponse = {

val regionId = context.dockerImageID.host match {
case Some(validAlibabaCloudCRHosts(region)) => region
case _ => throw new Exception(s"The host ${context.dockerImageID.host} does not have the expected region id")
}

val endpoint = ProductName + "." + regionId + ".aliyuncs.com"
DefaultProfile.addEndpoint(regionId, ProductName, endpoint)

val profile: IClientProfile = getAliyunCredentialFromContext(context) match {
case Some(cred: BasicCredentials) => DefaultProfile.getProfile(regionId, cred.getAccessKeyId(), cred.getAccessKeySecret())
case Some(sCred: BasicSessionCredentials) => DefaultProfile.getProfile(regionId, sCred.getAccessKeyId(), sCred.getAccessKeySecret(), sCred.getSessionToken())
case _ => throw new Exception(s"Invalid credential from context, ${context}")
}

val client: DefaultAcsClient = new DefaultAcsClient(profile)
val request: GetRepoTagsRequest = new GetRepoTagsRequest()
val dockerImageID = context.dockerImageID
request.setRepoName(dockerImageID.image)
dockerImageID.repository foreach { repository => request.setRepoNamespace(repository) }

manifestResponseHandler(client, request, context)
.getOrElse(new Exception(s"handle response fail, please make sure the image id is correct: ${context.dockerImageID}")) match {
case succ: DockerInfoSuccessResponse => succ
case fail: DockerInfoFailedResponse => fail
case ex: Exception => throw new Exception(s"Get AliyunCr manifest failed, ${ex.getMessage}")
}
}

private[alibabacloudcrregistry] def getAliyunCredentialFromContext(context: DockerInfoContext): Option[AlibabaCloudCredentials] = {
context.credentials find {
_.isInstanceOf[AlibabaCloudCredentials]
} match {
case Some(cred: BasicCredentials) => Some(cred)
case Some(sCred: BasicSessionCredentials) => Some(sCred)
case _ => None
}
}

private def matchTag(jsObject: JsObject, dockerHashContext: DockerInfoContext): Boolean = {
val tag = dockerHashContext.dockerImageID.reference
jsObject.fields.get("tag") match {
case Some(tagObj: JsString) if tagObj.value == tag => true
case _ => false
}
}

private[alibabacloudcrregistry] def extractDigestFromBody(jsObject: JsObject, dockerHashContext: DockerInfoContext): DockerInfoResponse = {
val tags = jsObject.fields.get("data") match {
case Some(data) => data.asJsObject().convertTo[Map[String, JsValue]].get("tags") match {
case Some(tag) => tag.convertTo[Seq[JsObject]]
case None => throw new Exception(s"Manifest response did not contain a tags field, ${jsObject}")
}
case None => throw new Exception(s"Manifest response did not contain a data field, Please make sure the existence of image, ${jsObject}")
}

tags find { matchTag(_, dockerHashContext)} match {
case Some(tagObj) =>
tagObj.fields.get("digest") match {
case Some(digest: JsString) =>
DockerHashResult.fromString(HashAlg + ":" + digest.value) match {
case Success(r) => DockerInfoSuccessResponse(DockerInformation(r, None), dockerHashContext.request)
case Failure(t) => DockerInfoFailedResponse(t, dockerHashContext.request)
}
case Some(_) => DockerInfoFailedResponse((new Exception(s"Manifest response contains a non-string digest field, ${jsObject}")), dockerHashContext.request)
case None => DockerInfoFailedResponse((new Exception(s"Manifest response did not contain a digest field, ${jsObject}")), dockerHashContext.request)
}
case None => DockerInfoFailedResponse((new Exception(s"Manifest response did not contain a expected tag: ${dockerHashContext.dockerImageID.reference}, ${jsObject}")), dockerHashContext.request)
}
}

private def manifestResponseHandler(client: DefaultAcsClient, request: GetRepoTagsRequest, dockerHashContext: DockerInfoContext): Try[DockerInfoResponse] = {
for {
response <- Try(client.doAction(request))
jsObj <- Try(if (response.isSuccess) response.getHttpContentString.parseJson.asJsObject()
else throw new Exception(s"Get manifest request not success: ${response}"))
dockInfoRes <- Try(extractDigestFromBody(jsObj, dockerHashContext))
} yield dockInfoRes
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ class DockerImageIdentifierSpec extends FlatSpec with Matchers with TableDrivenP
("sourceString", "host", "repo", "image", "reference"),
// Without tags -> latest
("ubuntu", None, None, "ubuntu", "latest"),
("broad/cromwell", None, Some("broad"), "cromwell", "latest"),
("broad/cromwell", None, Option("broad"), "cromwell", "latest"),
("index.docker.io/ubuntu", Option("index.docker.io"), None, "ubuntu", "latest"),
("broad/cromwell/submarine", None, Some("broad/cromwell"), "submarine", "latest"),
("gcr.io/google/slim", Option("gcr.io"), Some("google"), "slim", "latest"),
("broad/cromwell/submarine", None, Option("broad/cromwell"), "submarine", "latest"),
("gcr.io/google/slim", Option("gcr.io"), Option("google"), "slim", "latest"),
// With tags
("ubuntu:latest", None, None, "ubuntu", "latest"),
("ubuntu:1235-SNAP", None, None, "ubuntu", "1235-SNAP"),
("ubuntu:V3.8-5_1", None, None, "ubuntu", "V3.8-5_1"),
("index.docker.io:9999/ubuntu:170904", Some("index.docker.io:9999"), None, "ubuntu", "170904"),
("localhost:5000/capture/transwf:170904", Some("localhost:5000"), Some("capture"), "transwf", "170904"),
("quay.io/biocontainers/platypus-variant:0.8.1.1--htslib1.5_0", Option("quay.io"), Some("biocontainers"), "platypus-variant", "0.8.1.1--htslib1.5_0")
("index.docker.io:9999/ubuntu:170904", Option("index.docker.io:9999"), None, "ubuntu", "170904"),
("localhost:5000/capture/transwf:170904", Option("localhost:5000"), Option("capture"), "transwf", "170904"),
("quay.io/biocontainers/platypus-variant:0.8.1.1--htslib1.5_0", Option("quay.io"), Option("biocontainers"), "platypus-variant", "0.8.1.1--htslib1.5_0"),
("registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:0.2", Option("registry.cn-shanghai.aliyuncs.com"), Option("batchcompute"), "ubuntu", "0.2")
)

forAll(valid) { (dockerString, host, repo, image, reference) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package cromwell.docker.registryv2.flows.alibabacloudcrregistry

import com.aliyuncs.auth.{BasicCredentials, BasicSessionCredentials}
import cromwell.docker.DockerInfoActor.{DockerInfoContext, DockerInfoFailedResponse, DockerInfoSuccessResponse, DockerInformation}
import cromwell.docker.{DockerHashResult, DockerImageIdentifier, DockerInfoRequest, DockerRegistryConfig}

import net.ceedubs.ficus.Ficus._
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.core.TestKitSuite
import org.scalatest.{BeforeAndAfter, FlatSpecLike, Matchers}
import org.scalatest.mockito.MockitoSugar
import spray.json._

object AlibabaCloudCRRegistrySpec {

val AlibabaCloudCRRegistryConfigString =
s"""
|enable = true
|# How should docker hashes be looked up. Possible values are "local" and "remote"
|# "local": Lookup hashes on the local docker daemon using the cli
|# "remote": Lookup hashes on docker hub and gcr
|method = "remote"
|alibabacloudcr {
| num-threads = 5
| auth {
| access-id = "test-access-id"
| access-key = "test-access-key"
| security-token = "test-security-token"
| }
|}
|
""".stripMargin

val AlibabaCloudCRRegistryConfig = ConfigFactory.parseString(AlibabaCloudCRRegistryConfigString)
}

class AlibabaCloudCRRegistrySpec extends TestKitSuite with FlatSpecLike with Matchers with MockitoSugar with BeforeAndAfter {
behavior of "AlibabaCloudCRRegistry"

val hashValue = "fcf39ed78ef0fa27bcc74713b85259alop1b12e6a201e3083af50fd8eda1cbe1"
val tag = "0.2"
val notExistTag = "0.3"
val CRResponse =
s"""
|{
| "data": {
| "total": 2,
| "pageSize": 30,
| "page": 1,
| "tags": [
| {
| "imageUpdate": 1514432549000,
| "imageId": "2842876c9b98f8c7607c1123ks18ff040b76a1d932c6d60c96aa3c283bd221cd",
| "digest": "83414d2c3b04e0lo1q7693e31aeca95b82c61949ea8de858579bf16bd92490c6",
| "imageSize": 715764,
| "tag": "0.1",
| "imageCreate": 1514432549000,
| "status": "NORMAL"
| },
| {
| "imageUpdate": 1514372113000,
| "imageId": "414e6daa772a8cd5dfloqpe503e6e313c372d2e15958ab649709daf9b1065479",
| "digest": "$hashValue",
| "imageSize": 715653,
| "tag": "$tag",
| "imageCreate": 1514372044000,
| "status": "NORMAL"
| }
| ]
| },
| "requestId": "9AFB52D3-6631-4B00-A857-932492097726"
|}""".stripMargin


it should "have correct Alibaba Cloud CR image" in {
val configPath = "alibabacloudcr"
val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default))

val testCRDockerImage = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$tag"
val testInvalidCRDockerImage = "registry.cn-not-exist.aliyuncs.com/batchcompute/ubuntu:0.2"
registry.accepts(DockerImageIdentifier.fromString(testCRDockerImage).get) shouldEqual true
registry.isValidAlibabaCloudCRHost(Some(testInvalidCRDockerImage)) shouldEqual false
registry.isValidAlibabaCloudCRHost(None) shouldEqual false
}

it should "successfully extract digest from body" in {
val configPath = "alibabacloudcr"
val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default))

val testCRDockerImage = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$tag"

val expectedDockerHashResult = DockerHashResult("sha256", hashValue)
val expectedDockerInfomation = DockerInformation(expectedDockerHashResult, None)
val dockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImage).get, List.empty)
val expectedDockerResponse = DockerInfoSuccessResponse(expectedDockerInfomation, dockerRequest)

val context: DockerInfoContext = DockerInfoContext(dockerRequest, null)
registry.extractDigestFromBody(CRResponse.parseJson.asJsObject(), context) shouldEqual expectedDockerResponse
}

it should "NOT successfully extract digest from body" in {
val configPath = "alibabacloudcr"
val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default))

val testCRDockerImageTagNotExist = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$notExistTag"

val dockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List.empty)
val context: DockerInfoContext = DockerInfoContext(dockerRequest, null)

val cRResponseJsObj = CRResponse.parseJson.asJsObject()
registry.extractDigestFromBody(cRResponseJsObj, context) match {
case DockerInfoFailedResponse(t, _) => t.getMessage should be(s"Manifest response did not contain a expected tag: $notExistTag, ${cRResponseJsObj}")
case _ => fail("Failed to get a DockerInfoFailedResponse result.")
}
}

it should "successfully get the correct credentials from context" in {
val configPath = "alibabacloudcr"
val registry = new AlibabaCloudCRRegistry(DockerRegistryConfig.fromConfig(AlibabaCloudCRRegistrySpec.AlibabaCloudCRRegistryConfig.as[Config](configPath)).getOrElse(DockerRegistryConfig.default))

val testCRDockerImageTagNotExist = s"registry.cn-shanghai.aliyuncs.com/batchcompute/ubuntu:$tag"
val access_id = "test-access-id"
val access_key = "test-access-key"
val security_token = "test-token"

val basicCredential = new BasicCredentials(access_id, access_key)
val sessionCredential = new BasicSessionCredentials(access_id, access_key, security_token)

val dockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List(basicCredential))
val context: DockerInfoContext = DockerInfoContext(dockerRequest, null)
registry.getAliyunCredentialFromContext(context) shouldEqual Some(basicCredential)

val sessionDockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List(sessionCredential))
val sessionContext: DockerInfoContext = DockerInfoContext(sessionDockerRequest, null)
registry.getAliyunCredentialFromContext(sessionContext) shouldEqual Some(sessionCredential)

val invalidDockerRequest = DockerInfoRequest(DockerImageIdentifier.fromString(testCRDockerImageTagNotExist).get, List.empty)
val invalidContext: DockerInfoContext = DockerInfoContext(invalidDockerRequest, null)
registry.getAliyunCredentialFromContext(invalidContext) shouldEqual None
}

}
Loading

0 comments on commit b9d442d

Please sign in to comment.