-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cassandra 19480 Additional task execution specific instrumentation of job stats #51
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks good in general. A rebase is required though.
668a44e
to
7bb12eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments
|
||
import java.util.Map; | ||
|
||
public class JobEventDetail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a javadoc here?
@@ -61,6 +61,14 @@ public CassandraBulkSourceRelation(BulkWriterContext writerContext, SQLContext s | |||
this.sqlContext = sqlContext; | |||
this.sparkContext = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()); | |||
this.broadcastContext = sparkContext.<BulkWriterContext>broadcast(writerContext); | |||
this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { | |||
if (writerContext.job().getId().toString().equals(jobEventDetail.internalJobID())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment in code mentioning why we need this condition here?
rowCount, | ||
totalBytesWritten, | ||
hasClusterTopologyChanged); | ||
publishSuccessfulJobStats(rowCount, totalBytesWritten, hasClusterTopologyChanged); | ||
} | ||
catch (Throwable throwable) | ||
{ | ||
publishFailureJobStats(throwable.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what happens here, do we no longer publish failure stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are not explicitly publishing failure stats at the point of failure. Instead, we rely on the job failure event, and the listener now publishes these stats.
put("sparkVersion", sparkVersion); | ||
put("keyspace", jobInfo.getId().toString()); | ||
put("table", jobInfo.getId().toString()); | ||
put("keyspace", jobInfo.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the jobInfo.getId()
the keyspace? shouldn't we use qualifiedTableName().keyspace()
here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, probably got mixed-up during rebase. Corrected.
put("keyspace", jobInfo.getId().toString()); | ||
put("table", jobInfo.getId().toString()); | ||
put("keyspace", jobInfo.getId()); | ||
put("table", jobInfo.qualifiedTableName().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be qualifiedTableName().table()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, probably got mixed-up during rebase. Corrected.
*/ | ||
void publish(Map<String, String> stats); | ||
|
||
Map<String, String> stats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer being used. Removed.
@@ -129,6 +129,7 @@ protected static CassandraDataLayer createAndInitCassandraDataLayer( | |||
|
|||
dataLayer.startupValidate(); | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: unnecessary extra line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an opportunity here to move all the hardcoded stat names to a Consts file. Maybe worth a separate ticket?
put("clusterResizeDetected", String.valueOf(hasClusterTopologyChanged)); | ||
put("jobElapsedTimeMillis", Long.toString(elapsedTimeMillis())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we removing the jobElapsedTimeMillis stat?
@@ -258,28 +269,17 @@ private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, Str | |||
|
|||
private void publishSuccessfulJobStats(long rowCount, long totalBytesWritten, boolean hasClusterTopologyChanged) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to keep the Successful name on the method if we are ignoring failure stats?
@@ -79,6 +82,15 @@ public CassandraBulkSourceRelation(BulkWriterContext writerContext, SQLContext s | |||
ReplicaAwareFailureHandler<RingInstance> failureHandler = new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); | |||
this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); | |||
onCloudStorageTransport(ignored -> this.heartbeatReporter = new HeartbeatReporter()); | |||
this.jobStatsListener = new JobStatsListener((jobEventDetail) -> { | |||
// Note: Consumers are called for all jobs and tasks. We only publish for the existing job | |||
if (writerContext.job().getId().equals(jobEventDetail.internalJobID())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check for !internalJobId.isEmpty()
?
jobStats.put("failureReason", reason); | ||
jobStats.put("jobElapsedTimeMillis", String.valueOf(elapsedTimeMillis)); | ||
|
||
LOGGER.debug("Job END for jobId:{} status:{} Reason:{} ElapsedTime: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an extra space after ElapsedTime
@@ -81,12 +82,14 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo | |||
new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); | |||
private ConsistencyLevel.CL consistencyLevel; | |||
private int sstableDataSizeInMB = 128; | |||
private int sstableWriteBatchSize = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this new variable doing? Where is it used?
… job stats
Changes
jobGroup
based UUID to reader, similar to existing implementation in writer. This is used as thejobId
to uniquely identify and potentially merge multiple stats published from the same job.Testing