Skip to content

Commit

Permalink
Verify keys on read (#495)
Browse files Browse the repository at this point in the history
  • Loading branch information
rhuffy authored Oct 26, 2024
1 parent c29c3d3 commit d27feca
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.exceptions.InvalidMutationException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;

public class MutationVerificationHandler implements OwnershipVerificationHandler
{
private static final Logger logger = LoggerFactory.getLogger(OwnershipVerificationUtils.class);

public static final OwnershipVerificationHandler INSTANCE = new MutationVerificationHandler();

@Override
public void onViolation(Keyspace keyspace, ByteBuffer key, List<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints)
{
keyspace.metric.invalidMutations.inc();
logger.error(
"InvalidMutation! Cannot apply mutation as this host {} does not contain key {} in keyspace {}. Only hosts {} and {} do.",
SafeArg.of("address", FBUtilities.getBroadcastAddress()),
UnsafeArg.of("key", Hex.bytesToHex(key.array())),
SafeArg.of("keyspace", keyspace.getName()),
SafeArg.of("naturalEndpoints", naturalEndpoints),
SafeArg.of("pendingEndpoints", pendingEndpoints));
throw new InvalidMutationException();
}

@Override
public void onValid(Keyspace keyspace)
{
keyspace.metric.validMutations.inc();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import org.apache.cassandra.db.Keyspace;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;

public interface OwnershipVerificationHandler
{
void onViolation(Keyspace keyspace, ByteBuffer key, List<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints);

void onValid(Keyspace keyspace);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@
package com.palantir.cassandra.utils;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.exceptions.InvalidMutationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.dht.Token;
Expand All @@ -37,72 +39,77 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;

public class MutationVerificationUtils
public class OwnershipVerificationUtils
{
private static final boolean VERIFY_KEYS_ON_WRITE = Boolean.getBoolean("palantir_cassandra.verify_keys_on_write");
private static final Logger logger = LoggerFactory.getLogger(MutationVerificationUtils.class);
private static final boolean VERIFY_KEYS_ON_READ = Boolean.getBoolean("palantir_cassandra.verify_keys_on_read");
private static final Logger logger = LoggerFactory.getLogger(OwnershipVerificationUtils.class);

private static volatile Instant lastTokenRingCacheUpdate = Instant.MIN;

private MutationVerificationUtils()
private OwnershipVerificationUtils()
{
}

public static void verifyRead(Keyspace keyspace, ByteBuffer key)
{
if (!VERIFY_KEYS_ON_READ)
{
return;
}
verifyOperation(keyspace, key, ReadVerificationHandler.INSTANCE);
}

public static void verifyMutation(Mutation mutation)
{
if (!VERIFY_KEYS_ON_WRITE)
{
return;
}
verifyOperation(Keyspace.open(mutation.getKeyspaceName()), mutation.key(), MutationVerificationHandler.INSTANCE);
}

Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
private static void verifyOperation(Keyspace keyspace, ByteBuffer key, OwnershipVerificationHandler handler)
{
if (!(keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy))
{
return;
}

Token tk = StorageService.getPartitioner().getToken(mutation.key());
List<InetAddress> cachedNaturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
String keyspaceName = keyspace.getName();
Token tk = StorageService.getPartitioner().getToken(key);
List<InetAddress> cachedNaturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);

if (mutationIsInvalid(cachedNaturalEndpoints, pendingEndpoints))
if (operationIsInvalid(cachedNaturalEndpoints, pendingEndpoints))
{
if (cacheWasRecentlyRefreshed())
{
throwInvalidMutationException(mutation, keyspace, cachedNaturalEndpoints, pendingEndpoints);
handler.onViolation(keyspace, key, cachedNaturalEndpoints, pendingEndpoints);
return;
}

refreshCache();

List<InetAddress> refreshedNaturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
List<InetAddress> refreshedNaturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);

if (mutationIsInvalid(refreshedNaturalEndpoints, pendingEndpoints))
if (operationIsInvalid(refreshedNaturalEndpoints, pendingEndpoints))
{
throwInvalidMutationException(mutation, keyspace, refreshedNaturalEndpoints, pendingEndpoints);
handler.onViolation(keyspace, key, refreshedNaturalEndpoints, pendingEndpoints);
return;
}
else
{
logger.warn("Ignoring InvalidMutation error detected using stale token ring cache. Error was originally detected for key {} in keyspace {}."
+ " Cached owners {} and {}. Actual owners {} and {}",
Hex.bytesToHex(mutation.key().array()),
mutation.getKeyspaceName(),
cachedNaturalEndpoints,
pendingEndpoints,
refreshedNaturalEndpoints,
pendingEndpoints);
logger.warn("Ignoring InvalidOwnership error detected using stale token ring cache. Error was originally detected for key {} in keyspace {}."
+ " Cached owners {}. Actual owners {}. Pending owners (non-cached) {}.",
UnsafeArg.of("key", Hex.bytesToHex(key.array())),
SafeArg.of("keyspace", keyspaceName),
SafeArg.of("cachedNaturalEndpoints", cachedNaturalEndpoints),
SafeArg.of("refreshedNaturalEndpoints", refreshedNaturalEndpoints),
SafeArg.of("pendingEndpoints", pendingEndpoints));
}
}

keyspace.metric.validMutations.inc();
}


private static void throwInvalidMutationException(Mutation mutation, Keyspace keyspace, List<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints)
{
keyspace.metric.invalidMutations.inc();
logger.error("InvalidMutation! Cannot apply mutation as this host {} does not contain key {} in keyspace {}. Only hosts {} and {} do.",
FBUtilities.getBroadcastAddress(), Hex.bytesToHex(mutation.key().array()), mutation.getKeyspaceName(), naturalEndpoints, pendingEndpoints);
throw new InvalidMutationException();
handler.onValid(keyspace);
}

private static void refreshCache()
Expand All @@ -116,7 +123,7 @@ private static boolean cacheWasRecentlyRefreshed()
return Duration.between(lastTokenRingCacheUpdate, Instant.now()).compareTo(Duration.ofMinutes(10)) < 0;
}

private static boolean mutationIsInvalid(List<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints)
private static boolean operationIsInvalid(List<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints)
{
return !naturalEndpoints.contains(FBUtilities.getBroadcastAddress()) && !pendingEndpoints.contains(FBUtilities.getBroadcastAddress());
}
Expand All @@ -126,5 +133,4 @@ static void clearLastTokenRingCacheUpdate()
{
lastTokenRingCacheUpdate = Instant.MIN;
}

}
58 changes: 58 additions & 0 deletions src/java/com/palantir/cassandra/utils/ReadVerificationHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;

public class ReadVerificationHandler implements OwnershipVerificationHandler
{
private static final Logger logger = LoggerFactory.getLogger(OwnershipVerificationUtils.class);

public static final OwnershipVerificationHandler INSTANCE = new ReadVerificationHandler();

@Override
public void onViolation(Keyspace keyspace, ByteBuffer key, List<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints)
{
keyspace.metric.invalidReads.inc();
logger.error(
"Executed InvalidRead! This host {} does not contain key {} in keyspace {}. Only hosts {} and {} do.",
SafeArg.of("address", FBUtilities.getBroadcastAddress()),
UnsafeArg.of("key", Hex.bytesToHex(key.array())),
SafeArg.of("keyspace", keyspace.getName()),
SafeArg.of("naturalEndpoints", naturalEndpoints),
SafeArg.of("pendingEndpoints", pendingEndpoints));
}

@Override
public void onValid(Keyspace keyspace)
{
keyspace.metric.validReads.inc();
}
}
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.net.InetAddress;

import com.palantir.cassandra.utils.MutationVerificationUtils;
import com.palantir.cassandra.utils.OwnershipVerificationUtils;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
Expand All @@ -47,7 +47,7 @@ public void doVerb(MessageIn<Mutation> message, int id) throws IOException
replyTo = InetAddress.getByAddress(from);
}

MutationVerificationUtils.verifyMutation(message.payload);
OwnershipVerificationUtils.verifyMutation(message.payload);

message.payload.apply();
WriteResponse response = new WriteResponse();
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.cassandra.db;

import com.palantir.cassandra.utils.MutationVerificationUtils;
import com.palantir.cassandra.utils.OwnershipVerificationUtils;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
Expand All @@ -26,7 +26,7 @@ public class ReadRepairVerbHandler implements IVerbHandler<Mutation>
{
public void doVerb(MessageIn<Mutation> message, int id)
{
MutationVerificationUtils.verifyMutation(message.payload);
OwnershipVerificationUtils.verifyMutation(message.payload);
message.payload.apply();
WriteResponse response = new WriteResponse();
MessagingService.instance().sendReply(response.createMessage(), id, message.from);
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/ReadVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.util.concurrent.Uninterruptibles;

import com.palantir.cassandra.utils.OwnershipVerificationUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.IsBootstrappingException;
import org.apache.cassandra.net.IVerbHandler;
Expand Down Expand Up @@ -49,6 +50,8 @@ public void doVerb(MessageIn<ReadCommand> message, int id)
ReadResponse.serializer);
Tracing.trace("Enqueuing response to {}", message.from);
MessagingService.instance().sendReply(reply, id, message.from);

OwnershipVerificationUtils.verifyRead(keyspace, command.key);
}

public static ReadResponse getResponse(ReadCommand command, Row row)
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public class KeyspaceMetrics
public final Counter invalidMutations;
/** Number of mutation requests which are valid */
public final Counter validMutations;
/** Number of read requests which were not for a row this node owned */
public final Counter invalidReads;
/** Number of read requests which are valid */
public final Counter validReads;

public final MetricNameFactory factory;
private Keyspace keyspace;
Expand Down Expand Up @@ -273,6 +277,8 @@ public Long getValue(ColumnFamilyMetrics metric)
casCommit = new LatencyMetrics(factory, "CasCommit");
invalidMutations = Metrics.counter(factory.createMetricName("InvalidMutations"));
validMutations = Metrics.counter(factory.createMetricName("ValidMutations"));
invalidReads = Metrics.counter(factory.createMetricName("InvalidReads"));
validReads = Metrics.counter(factory.createMetricName("ValidReads"));
}

/**
Expand Down
Loading

0 comments on commit d27feca

Please sign in to comment.