Skip to content

Commit

Permalink
Always shift times to UTC, and render w/ 3 digits of millis
Browse files Browse the repository at this point in the history
  • Loading branch information
kshakir committed May 10, 2019
1 parent add73f1 commit 3b0c2be
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 41 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,25 @@ disks: "/some/mnt 20 SSD"
```
Because Cromwell's AWS backend auto-sizes disks, the size specification is simply discarded.

### Time Formatting

In previous versions of Cromwell, times were converted to strings using
[the default Java formatter](https://docs.oracle.com/javase/8/docs/api/java/time/OffsetDateTime.html#toString--) which
generates a variety of ISO-8601 formats. String conversions also retained whatever server time zone generated that
specific time instance.

Going forward, times stored in Cromwell metadata, and later returned via the HTTP endpoint, are now converted to UTC
then formatted with exactly three digits of milliseconds.

For example:
- `2017-01-19T12:34:56-04:00` will now be formatted as
- `2017-01-19T16:34:56.000Z`

This change only affects newly formatted dates. Older dates already formatted and stored by previous versions of
Cromwell will not be updated however they will still return a
[valid ISO-8601 format](https://en.wikipedia.org/wiki/ISO_8601). The older format may be in various non-UTC time zones,
and may or may not include microseconds or even nanoseconds, for example `2017-01-19T12:34:56.123456789-04:00`.

### Config Changes

#### Heartbeat failure shutdown
Expand Down
13 changes: 7 additions & 6 deletions centaur/src/it/scala/centaur/reporting/BigQueryReporter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import java.time.OffsetDateTime
import java.util

import cats.effect.IO
import cats.syntax.traverse._
import cats.syntax.apply._
import cats.instances.list._
import cats.syntax.apply._
import cats.syntax.traverse._
import centaur.reporting.BigQueryReporter._
import centaur.test.CentaurTestException
import centaur.test.metadata.CallAttemptFailure
Expand All @@ -15,6 +15,7 @@ import com.google.api.services.bigquery.BigqueryScopes
import com.google.auth.Credentials
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert
import com.google.cloud.bigquery.{BigQuery, BigQueryError, BigQueryOptions, InsertAllRequest, InsertAllResponse, TableId}
import common.util.TimeUtil._
import common.validation.Validation._
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.database.sql.SqlConverters._
Expand Down Expand Up @@ -154,7 +155,7 @@ class BigQueryReporter(override val params: ErrorReporterParams) extends ErrorRe
"test_message" -> Option(centaurTestException.message),
"test_name" -> Option(testEnvironment.name),
"test_stack_trace" -> Option(ExceptionUtils.getStackTrace(centaurTestException)),
"test_timestamp" -> Option(OffsetDateTime.now.toString),
"test_timestamp" -> Option(OffsetDateTime.now.toUtcMilliString),
"test_workflow_id" -> centaurTestException.workflowIdOption,
).collect {
case (key, Some(value)) => (key, value)
Expand All @@ -165,11 +166,11 @@ class BigQueryReporter(override val params: ErrorReporterParams) extends ErrorRe
RowToInsert of Map(
"call_fully_qualified_name" -> Option(callAttemptFailure.callFullyQualifiedName),
"call_root" -> callAttemptFailure.callRootOption,
"end" -> callAttemptFailure.endOption.map(_.toString),
"end" -> callAttemptFailure.endOption.map(_.toUtcMilliString),
"job_attempt" -> Option(callAttemptFailure.jobAttempt),
"job_index" -> Option(callAttemptFailure.jobIndex),
"message" -> Option(callAttemptFailure.message),
"start" -> callAttemptFailure.startOption.map(_.toString),
"start" -> callAttemptFailure.startOption.map(_.toUtcMilliString),
"stderr" -> callAttemptFailure.stderrOption,
"stdout" -> callAttemptFailure.stdoutOption,
"workflow_id" -> Option(callAttemptFailure.workflowId),
Expand All @@ -195,7 +196,7 @@ class BigQueryReporter(override val params: ErrorReporterParams) extends ErrorRe
"job_attempt" -> metadataEntry.jobAttempt,
"job_index" -> metadataEntry.jobIndex,
"metadata_key" -> Option(metadataEntry.metadataKey),
"metadata_timestamp" -> Option(metadataEntry.metadataTimestamp.toSystemOffsetDateTime.toString),
"metadata_timestamp" -> Option(metadataEntry.metadataTimestamp.toSystemOffsetDateTime.toUtcMilliString),
"metadata_value" -> metadataEntry.metadataValue.map(_.toRawString),
"metadata_value_type" -> metadataEntry.metadataValueType,
"workflow_execution_uuid" -> Option(metadataEntry.workflowExecutionUuid),
Expand Down
22 changes: 22 additions & 0 deletions common/src/main/scala/common/util/TimeUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common.util

import java.time.format.DateTimeFormatter
import java.time.{OffsetDateTime, ZoneOffset}

object TimeUtil {
/**
* Instead of "one of" the valid ISO-8601 formats, standardize on this one:
* https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/share/classes/java/time/OffsetDateTime.java#L1886
*/
private val Iso8601MillisecondsFormat = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSXXXXX")

implicit class EnhancedOffsetDateTime(val offsetDateTime: OffsetDateTime) extends AnyVal {
/**
* Discards the original timezone and shifts the time to UTC, then returns the ISO-8601 formatted string with
* exactly three digits of milliseconds.
*/
def toUtcMilliString: String = Option(offsetDateTime).map(
_.atZoneSameInstant(ZoneOffset.UTC).format(Iso8601MillisecondsFormat)
).orNull
}
}
22 changes: 22 additions & 0 deletions cromwellApiClient/src/main/scala/cromwell/api/model/TimeUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cromwell.api.model

import java.time.format.DateTimeFormatter
import java.time.{OffsetDateTime, ZoneOffset}

object TimeUtil {
/**
* Instead of "one of" the valid ISO-8601 formats, standardize on this one:
* https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/share/classes/java/time/OffsetDateTime.java#L1886
*/
private val Iso8601MillisecondsFormat = DateTimeFormatter.ofPattern("uuuu-MM-dd'T'HH:mm:ss.SSSXXXXX")

implicit class EnhancedOffsetDateTime(val offsetDateTime: OffsetDateTime) extends AnyVal {
/**
* Discards the original timezone and shifts the time to UTC, then returns the ISO-8601 formatted string with
* exactly three digits of milliseconds.
*/
def toUtcMilliString: String = Option(offsetDateTime).map(
_.atZoneSameInstant(ZoneOffset.UTC).format(Iso8601MillisecondsFormat)
).orNull
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cats.arrow.FunctionK
import cats.data.EitherT
import cats.effect.{ContextShift, IO, Timer}
import cromwell.api.CromwellClient.UnsuccessfulRequestException
import cromwell.api.model.TimeUtil._
import spray.json.{DefaultJsonProtocol, JsString, JsValue, RootJsonFormat}

import scala.concurrent.duration.FiniteDuration
Expand All @@ -20,7 +21,7 @@ package object model {

object OffsetDateTimeJsonFormatter extends DefaultJsonProtocol {
object OffsetDateTimeFormat extends RootJsonFormat[OffsetDateTime] {
def write(odt: OffsetDateTime) = new JsString(odt.toString)
def write(offsetDateTime: OffsetDateTime) = new JsString(offsetDateTime.toUtcMilliString)
def read(value: JsValue) = value match {
case JsString(string) => OffsetDateTime.parse(string)
case other => throw new UnsupportedOperationException(s"Cannot deserialize $other into an OffsetDateTime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,20 @@ class CromwellQueryResultJsonFormatterSpec extends FlatSpec with Matchers {
behavior of "CromwellQueryResultJsonFormat"

val sampleQueryResult = CromwellQueryResults(results = List(
CromwellQueryResult("switcheroo", WorkflowId.fromString("bee51f36-396d-4e22-8a81-33dedff66bf6"), Failed, OffsetDateTime.parse("2017-07-24T14:44:34.010-04:00"), OffsetDateTime.parse("2017-07-24T14:44:33.227-04:00")),
CromwellQueryResult("switcheroo", WorkflowId.fromString("0071495e-39eb-478e-bc98-8614b986c91e"), Succeeded, OffsetDateTime.parse("2017-07-24T15:06:45.940-04:00"), OffsetDateTime.parse("2017-07-24T15:04:54.372-04:00"))
CromwellQueryResult(
"switcheroo",
WorkflowId.fromString("bee51f36-396d-4e22-8a81-33dedff66bf6"),
Failed,
OffsetDateTime.parse("2017-07-24T14:44:34.010Z"),
OffsetDateTime.parse("2017-07-24T14:44:33.227Z")
),
CromwellQueryResult(
"switcheroo",
WorkflowId.fromString("0071495e-39eb-478e-bc98-8614b986c91e"),
Succeeded,
OffsetDateTime.parse("2017-07-24T15:06:45.940Z"),
OffsetDateTime.parse("2017-07-24T15:04:54.372Z")
),
))

val sampleJson = """|{
Expand All @@ -21,21 +33,20 @@ class CromwellQueryResultJsonFormatterSpec extends FlatSpec with Matchers {
| "name": "switcheroo",
| "id": "bee51f36-396d-4e22-8a81-33dedff66bf6",
| "status": "Failed",
| "end": "2017-07-24T14:44:34.010-04:00",
| "start": "2017-07-24T14:44:33.227-04:00"
| "end": "2017-07-24T14:44:34.010Z",
| "start": "2017-07-24T14:44:33.227Z"
| },
| {
| "name": "switcheroo",
| "id": "0071495e-39eb-478e-bc98-8614b986c91e",
| "status": "Succeeded",
| "end": "2017-07-24T15:06:45.940-04:00",
| "start": "2017-07-24T15:04:54.372-04:00"
| "end": "2017-07-24T15:06:45.940Z",
| "start": "2017-07-24T15:04:54.372Z"
| }
| ]
|}""".stripMargin.parseJson.asJsObject

it should "write a query result as a structured JsObject" in {

sampleQueryResult.toJson shouldEqual sampleJson
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ trait WorkflowMetadataHelper {
def serviceRegistryActor: ActorRef

def pushWorkflowStart(workflowId: WorkflowId) = {
val startEvent = MetadataEvent(MetadataKey(workflowId, None, WorkflowMetadataKeys.StartTime), MetadataValue(OffsetDateTime.now.toString))
val startEvent = MetadataEvent(
MetadataKey(workflowId, None, WorkflowMetadataKeys.StartTime),
MetadataValue(OffsetDateTime.now)
)
serviceRegistryActor ! PutMetadataAction(startEvent)
}

def pushWorkflowEnd(workflowId: WorkflowId) = {
val metadataEventMsg = MetadataEvent(MetadataKey(workflowId, None, WorkflowMetadataKeys.EndTime), MetadataValue(OffsetDateTime.now.toString))
val metadataEventMsg = MetadataEvent(
MetadataKey(workflowId, None, WorkflowMetadataKeys.EndTime),
MetadataValue(OffsetDateTime.now)
)
serviceRegistryActor ! PutMetadataAction(metadataEventMsg)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cromwell.engine.workflow.workflowstore

import java.time.{Instant, OffsetDateTime, ZoneId}
import java.time.OffsetDateTime

import cats.data.NonEmptyList
import common.validation.ErrorOr.ErrorOr
Expand Down Expand Up @@ -154,10 +154,9 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf
)

workflowStoreStateToStartableState(workflowStoreEntry) map { startableState =>
val instant = Instant.ofEpochMilli(workflowStoreEntry.submissionTime.getTime)
WorkflowToStart(
id = WorkflowId.fromString(workflowStoreEntry.workflowExecutionUuid),
submissionTime = OffsetDateTime.ofInstant(instant, ZoneId.of("UTC")),
submissionTime = workflowStoreEntry.submissionTime.toSystemOffsetDateTime,
sources = sources,
state = startableState)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ final case class WorkflowStoreSubmitActor(store: WorkflowStore, serviceRegistryA

processSource(_.clearEncryptedValues)(originalSourceFiles) map { sourceFiles =>
val submissionEvents: List[MetadataEvent] = List(
MetadataEvent(MetadataKey(id, None, WorkflowMetadataKeys.SubmissionTime), MetadataValue(OffsetDateTime.now.toString)),
MetadataEvent(MetadataKey(id, None, WorkflowMetadataKeys.SubmissionTime), MetadataValue(OffsetDateTime.now)),
MetadataEvent.empty(MetadataKey(id, None, WorkflowMetadataKeys.Inputs)),
MetadataEvent.empty(MetadataKey(id, None, WorkflowMetadataKeys.Outputs)),
MetadataEvent(MetadataKey(id, None, WorkflowMetadataKeys.Status), MetadataValue(actualWorkflowState)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{StatusChe
import cromwell.webservice.routes.CromwellApiService.BackendResponse
import cromwell.webservice.metadata.MetadataBuilderActor.BuiltMetadataResponse
import spray.json.{DefaultJsonProtocol, JsString, JsValue, RootJsonFormat}
import common.util.TimeUtil._

object WorkflowJsonSupport extends DefaultJsonProtocol {
implicit val workflowStatusResponseProtocol = jsonFormat2(WorkflowStatusResponse)
Expand Down Expand Up @@ -42,7 +43,7 @@ object WorkflowJsonSupport extends DefaultJsonProtocol {
implicit val successResponse = jsonFormat3(SuccessResponse)

implicit object DateJsonFormat extends RootJsonFormat[OffsetDateTime] {
override def write(obj: OffsetDateTime) = JsString(obj.toString)
override def write(offsetDateTime: OffsetDateTime) = JsString(offsetDateTime.toUtcMilliString)

override def read(json: JsValue): OffsetDateTime = json match {
case JsString(str) => OffsetDateTime.parse(str)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.time.OffsetDateTime

import cromwell.core.WorkflowId
import spray.json.{DefaultJsonProtocol, JsString, JsValue, RootJsonFormat}
import common.util.TimeUtil._

object MetadataJsonSupport extends DefaultJsonProtocol {
implicit object WorkflowIdJsonFormatter extends RootJsonFormat[WorkflowId] {
Expand All @@ -17,7 +18,7 @@ object MetadataJsonSupport extends DefaultJsonProtocol {
}

implicit object OffsetDateTimeFormatter extends RootJsonFormat[OffsetDateTime] {
def write(o: OffsetDateTime) = new JsString(o.toString)
def write(offsetDateTime: OffsetDateTime) = new JsString(offsetDateTime.toUtcMilliString)
def read(value: JsValue) = throw new UnsupportedOperationException("Reading OffsetDateTime from JSON is currently unsupported")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cromwell.core._
import cromwell.core.labels.Labels
import org.slf4j.{Logger, LoggerFactory}
import wom.values._
import common.util.TimeUtil._

case class MetadataJobKey(callFqn: String, index: Option[Int], attempt: Int)

Expand Down Expand Up @@ -62,6 +63,7 @@ object MetadataValue {
case _: Int | Long | _: java.lang.Long | _: java.lang.Integer => new MetadataValue(value.toString, MetadataInt)
case _: Double | Float | _: java.lang.Double | _: java.lang.Float => new MetadataValue(value.toString, MetadataNumber)
case _: Boolean | _: java.lang.Boolean => new MetadataValue(value.toString, MetadataBoolean)
case offsetDateTime: OffsetDateTime => new MetadataValue(offsetDateTime.toUtcMilliString, MetadataString)
case other => new MetadataValue(other.toString, MetadataString)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cromwell.services.metadata.impl
import java.time.OffsetDateTime

import com.typesafe.config.ConfigFactory
import common.util.TimeUtil._
import cromwell.core.Tags.DbmsTest
import cromwell.core._
import cromwell.core.labels.{Label, Labels}
Expand Down Expand Up @@ -70,10 +71,10 @@ class MetadataDatabaseAccessSpec extends FlatSpec with Matchers with ScalaFuture

val workflowKey = MetadataKey(workflowId, jobKey = None, key = null)
def keyAndValue(name: String) = Array(
(WorkflowMetadataKeys.SubmissionTime, OffsetDateTime.now.toString),
(WorkflowMetadataKeys.SubmissionTime, OffsetDateTime.now.toUtcMilliString),
(WorkflowMetadataKeys.Status, WorkflowSubmitted.toString),
(WorkflowMetadataKeys.Name, name),
(WorkflowMetadataKeys.StartTime, OffsetDateTime.now.toString)
(WorkflowMetadataKeys.StartTime, OffsetDateTime.now.toUtcMilliString)
) ++ labelMetadata

publishMetadataEvents(workflowKey, keyAndValue(name)).map(_ => workflowId)
Expand All @@ -85,7 +86,7 @@ class MetadataDatabaseAccessSpec extends FlatSpec with Matchers with ScalaFuture
val metadataKeys = Array(
(WorkflowMetadataKeys.Status, WorkflowRunning.toString),
(WorkflowMetadataKeys.Name, subworkflowName),
(WorkflowMetadataKeys.StartTime, OffsetDateTime.now.toString),
(WorkflowMetadataKeys.StartTime, OffsetDateTime.now.toUtcMilliString),
(WorkflowMetadataKeys.ParentWorkflowId, parentWorkflowId.toString),
(WorkflowMetadataKeys.RootWorkflowId, parentWorkflowId.toString),
)
Expand Down Expand Up @@ -154,7 +155,7 @@ class MetadataDatabaseAccessSpec extends FlatSpec with Matchers with ScalaFuture
val keyAndValue = Array(
(WorkflowMetadataKeys.Status, WorkflowRunning.toString),
(WorkflowMetadataKeys.Status, WorkflowSucceeded.toString),
(WorkflowMetadataKeys.EndTime, OffsetDateTime.now.toString))
(WorkflowMetadataKeys.EndTime, OffsetDateTime.now.toUtcMilliString))

publishMetadataEvents(workflowKey, keyAndValue)
}
Expand Down Expand Up @@ -308,27 +309,37 @@ class MetadataDatabaseAccessSpec extends FlatSpec with Matchers with ScalaFuture
}
// Filter by start date
_ <- dataAccess.queryWorkflowSummaries(WorkflowQueryParameters(Seq(
WorkflowQueryKey.StartDate.name -> workflowQueryResult2.start.get.toString))) map { case (response, _) =>
response.results partition { r => r.start.isDefined && r.start.get.compareTo(workflowQueryResult.start.get) >= 0 } match {
case (y, n) if y.nonEmpty && n.isEmpty => // good
case (y, n) => fail(s"Found ${y.size} later workflows and ${n.size} earlier")
}
WorkflowQueryKey.StartDate.name -> workflowQueryResult2.start.get.toUtcMilliString))) map {
case (response, _) =>
response.results partition {
r => r.start.isDefined && r.start.get.compareTo(workflowQueryResult.start.get) >= 0
} match {
case (y, n) if y.nonEmpty && n.isEmpty => // good
case (y, n) => fail(s"Found ${y.size} later workflows and ${n.size} earlier")
}
}
// Filter by end date
_ <- dataAccess.queryWorkflowSummaries(WorkflowQueryParameters(Seq(
WorkflowQueryKey.EndDate.name -> workflowQueryResult.end.get.toString))) map { case (response, _) =>
response.results partition { r => r.end.isDefined && r.end.get.compareTo(workflowQueryResult.end.get) <= 0 } match {
case (y, n) if y.nonEmpty && n.isEmpty => // good
case (y, n) => fail(s"Found ${y.size} earlier workflows and ${n.size} later")
}
WorkflowQueryKey.EndDate.name -> workflowQueryResult.end.get.toUtcMilliString))) map {
case (response, _) =>
response.results partition {
r => r.end.isDefined && r.end.get.compareTo(workflowQueryResult.end.get) <= 0
} match {
case (y, n) if y.nonEmpty && n.isEmpty => // good
case (y, n) => fail(s"Found ${y.size} earlier workflows and ${n.size} later")
}
}
// Filter by submission time
_ <- dataAccess.queryWorkflowSummaries(WorkflowQueryParameters(Seq(
WorkflowQueryKey.SubmissionTime.name -> workflowQueryResult2.submission.get.toString))) map { case (response, _) =>
response.results partition { r => r.submission.isDefined && r.submission.get.compareTo(workflowQueryResult2.submission.get) <= 0 } match {
case (y, n) if y.nonEmpty && n.isEmpty => // good
case (y, n) => fail(s"Found ${y.size} earlier workflows and ${n.size} later while filtering by submission timestamp")
}
WorkflowQueryKey.SubmissionTime.name -> workflowQueryResult2.submission.get.toUtcMilliString))) map {
case (response, _) =>
response.results partition { r =>
r.submission.isDefined && r.submission.get.compareTo(workflowQueryResult2.submission.get) <= 0
} match {
case (y, n) if y.nonEmpty && n.isEmpty => // good
case (y, n) =>
fail(s"Found ${y.size} earlier workflows and ${n.size} later while filtering by submission timestamp")
}
}
// Check for labels in query response
_ <- dataAccess.queryWorkflowSummaries(WorkflowQueryParameters(Seq(
Expand Down

0 comments on commit 3b0c2be

Please sign in to comment.