diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
index 82cef04ba..889bc67af 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java
@@ -67,8 +67,7 @@ public class StreamIdentifier {
      *         or {@link #streamName} in single-stream mode.
      */
     public String serialize() {
-        if (!streamCreationEpochOptional.isPresent()) {
-            // creation epoch is expected to be empty in single-stream mode
+        if (!isMultiStreamInstance()) {
             return streamName;
         }
 
@@ -85,6 +84,16 @@ public String toString() {
         return serialize();
     }
 
+    /**
+     * Determine whether this {@link StreamIdentifier} is a multi-stream instance.
+     *
+     * @return true if this is a multi-stream instance, false otherwise.
+     */
+    public boolean isMultiStreamInstance() {
+        // creation epoch is expected to be present if and only if in multi-stream mode
+        return streamCreationEpochOptional.isPresent();
+    }
+
     /**
      * Create a multi stream instance for StreamIdentifier from serialized stream identifier
      * of format {@link #STREAM_IDENTIFIER_PATTERN}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index bb389ce94..c5d65f1ba 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -68,7 +68,6 @@
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.leases.ShardPrioritization;
 import software.amazon.kinesis.leases.ShardSyncTaskManager;
-import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
 import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
 import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
 import software.amazon.kinesis.leases.exceptions.DependencyException;
@@ -812,7 +811,7 @@ Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
             for (Lease lease : leases) {
                 ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator,
                         lease, notificationCompleteLatch, shutdownCompleteLatch);
-                ShardInfo shardInfo = DynamoDBLeaseCoordinator.convertLeaseToAssignment(lease);
+                final ShardInfo shardInfo = constructShardInfoFromLease(lease);
                 ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
                 if (consumer != null) {
                     consumer.gracefulShutdown(shutdownNotification);
@@ -895,7 +894,9 @@ private void finalShutdown() {
     }
 
     private List<ShardInfo> getShardInfoForAssignments() {
-        List<ShardInfo> assignedStreamShards = leaseCoordinator.getCurrentAssignments();
+        final List<ShardInfo> assignedStreamShards = leaseCoordinator.getAssignments().stream()
+                .map(this::constructShardInfoFromLease)
+                .collect(Collectors.toList());
         List<ShardInfo> prioritizedShards = shardPrioritization.prioritize(assignedStreamShards);
 
         if ((prioritizedShards != null) && (!prioritizedShards.isEmpty())) {
@@ -952,26 +953,20 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
                                           @NonNull final LeaseCleanupManager leaseCleanupManager) {
         ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
                         checkpoint);
-        // The only case where streamName is not available will be when multistreamtracker not set. In this case,
-        // get the default stream name for the single stream application.
-        final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
-
-        // Irrespective of single stream app or multi stream app, streamConfig should always be available.
-        // If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
-        // to gracefully complete the reading.
-        StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
-        if (streamConfig == null) {
-            streamConfig = streamTracker.createStreamConfig(streamIdentifier);
+        final StreamConfig streamConfig = shardInfo.streamConfig();
+        if (!currentStreamConfigMap.containsKey(streamConfig.streamIdentifier())) {
             log.info("Created orphan {}", streamConfig);
         }
-        Validate.notNull(streamConfig, "StreamConfig should not be null");
+        /*
+         * NOTE: RecordsPublisher#createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory) is deprecated.
+         *  RecordsPublisher#createGetRecordsCache(ShardInfo, MetricsFactory) will be called directly in the future.
+         */
         RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
         ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
-                streamConfig.streamIdentifier(),
                 leaseCoordinator,
                 executorService,
                 cache,
-                shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier),
+                shardRecordProcessorFactory.shardRecordProcessor(streamConfig.streamIdentifier()),
                 checkpoint,
                 checkpointer,
                 parentShardPollIntervalMillis,
@@ -981,7 +976,6 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
                 maxListShardsRetryAttempts,
                 processorConfig.callProcessRecordsEvenForEmptyRecordList(),
                 shardConsumerDispatchPollIntervalMillis,
-                streamConfig.initialPositionInStreamExtended(),
                 cleanupLeasesUponShardCompletion,
                 ignoreUnexpetedChildShards,
                 shardDetectorProvider.apply(streamConfig),
@@ -1039,18 +1033,6 @@ private void logExecutorState() {
         executorStateEvent.accept(diagnosticEventHandler);
     }
 
-    private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
-        final StreamIdentifier streamIdentifier;
-        if (streamIdentifierString.isPresent()) {
-            streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get());
-        } else {
-            Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
-            streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();
-        }
-        Validate.notNull(streamIdentifier, "Stream identifier should not be empty");
-        return streamIdentifier;
-    }
-
     /**
      * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
      * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
@@ -1090,6 +1072,31 @@ private void resetInfoLogging() {
         }
     }
 
+    private ShardInfo constructShardInfoFromLease(final Lease lease) {
+        final boolean isMultiStreamLease = lease instanceof MultiStreamLease;
+
+        final Optional<String> streamIdentifierSerialization = isMultiStreamLease ?
+                Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty();
+        final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization);
+
+        final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey();
+        return new ShardInfo(
+                shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig);
+    }
+
+    private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) {
+        if (!streamIdentifierSerialization.isPresent()) {
+            Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
+            final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next();
+            Validate.notNull(streamConfig, "StreamConfig should not be null");
+            return streamConfig;
+        }
+
+        final StreamIdentifier streamIdentifier =
+                StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get());
+        return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
+    }
+
     @Deprecated
     public Future<Void> requestShutdown() {
         return null;
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
index 861626b63..44b2e91e3 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java
@@ -117,8 +117,8 @@ public void shutdown() {
     public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
         final Lease lease = leasePendingDeletion.lease();
         if (lease == null) {
-            log.warn("Cannot enqueue {} for {} as instance doesn't hold the lease for that shard.",
-                    leasePendingDeletion.shardInfo(), leasePendingDeletion.streamIdentifier());
+            log.warn("Cannot enqueue {} as instance doesn't hold the lease for that shard.",
+                    leasePendingDeletion.shardInfo());
         } else {
             log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
             if (!deletionQueue.add(leasePendingDeletion)) {
@@ -166,7 +166,7 @@ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion
             InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
         final Lease lease = leasePendingDeletion.lease();
         final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
-        final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
+        final StreamIdentifier streamIdentifier = shardInfo.streamConfig().streamIdentifier();
 
         final AWSExceptionManager exceptionManager = createExceptionManager();
 
@@ -328,7 +328,8 @@ void cleanupLeases() {
             while (!deletionQueue.isEmpty()) {
                 final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
                 final String leaseKey = leasePendingDeletion.lease().leaseKey();
-                final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
+                final StreamIdentifier streamIdentifier =
+                        leasePendingDeletion.shardInfo().streamConfig().streamIdentifier();
                 boolean deletionSucceeded = false;
                 try {
                     final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
index 6437f3390..73f83018b 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java
@@ -125,8 +125,14 @@ boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String
 
     /**
      * @return Current shard/lease assignments
+     * @deprecated This method is deprecated and will be removed in future versions.
+     *             {@link LeaseCoordinator} implementations should not be required to construct and return
+     *             {@link ShardInfo} objects. {@link #getAssignments()} can be used to return the currently held leases.
      */
-    List<ShardInfo> getCurrentAssignments();
+    @Deprecated
+    default List<ShardInfo> getCurrentAssignments() {
+        throw new UnsupportedOperationException("This method is deprecated and should not be used.");
+    }
 
     /**
      * Default implementation returns an empty list and concrete implementation is expected to return all leases
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java
index aff3f6f01..5e048fecc 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardInfo.java
@@ -18,8 +18,8 @@
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Optional;
 
+import lombok.AccessLevel;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
@@ -27,6 +27,7 @@
 import lombok.NonNull;
 import lombok.ToString;
 import lombok.experimental.Accessors;
+import software.amazon.kinesis.common.StreamConfig;
 import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
 
 /**
@@ -34,15 +35,20 @@
  */
 @Getter
 @Accessors(fluent = true)
-@ToString
+@ToString(exclude = {"isMultiStreamMode", "streamIdentifierStr"})
 public class ShardInfo {
 
-    private final Optional<String> streamIdentifierSerOpt;
     private final String shardId;
     private final String concurrencyToken;
     // Sorted list of parent shardIds.
     private final List<String> parentShardIds;
     private final ExtendedSequenceNumber checkpoint;
+    private final StreamConfig streamConfig;
+
+    @Getter(AccessLevel.NONE)
+    private final boolean isMultiStreamMode;
+    @Getter(AccessLevel.NONE)
+    private final String streamIdentifierStr;
 
     /**
      * Creates a new ShardInfo object. The checkpoint is not part of the equality, but is used for debugging output.
@@ -55,28 +61,14 @@ public class ShardInfo {
      *            Parent shards of the shard identified by Kinesis shardId
      * @param checkpoint
      *            the latest checkpoint from lease
-     */
-    public ShardInfo(@NonNull final String shardId,
-            final String concurrencyToken,
-            final Collection<String> parentShardIds,
-            final ExtendedSequenceNumber checkpoint) {
-        this(shardId, concurrencyToken, parentShardIds, checkpoint, null);
-    }
-
-    /**
-     * Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier.
-     * The checkpoint is not part of the equality, but is used for debugging output.
-     * @param shardId
-     * @param concurrencyToken
-     * @param parentShardIds
-     * @param checkpoint
-     * @param streamIdentifierSer
+     * @param streamConfig
+     *            The {@link StreamConfig} instance for the stream that the shard belongs to
      */
     public ShardInfo(@NonNull final String shardId,
             final String concurrencyToken,
             final Collection<String> parentShardIds,
             final ExtendedSequenceNumber checkpoint,
-            final String streamIdentifierSer) {
+            @NonNull final StreamConfig streamConfig) {
         this.shardId = shardId;
         this.concurrencyToken = concurrencyToken;
         this.parentShardIds = new LinkedList<>();
@@ -87,7 +79,9 @@ public ShardInfo(@NonNull final String shardId,
         // This makes it easy to check for equality in ShardInfo.equals method.
         Collections.sort(this.parentShardIds);
         this.checkpoint = checkpoint;
-        this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer);
+        this.streamConfig = streamConfig;
+        this.isMultiStreamMode = streamConfig.streamIdentifier().isMultiStreamInstance();
+        this.streamIdentifierStr = streamConfig.streamIdentifier().serialize();
     }
 
     /**
@@ -114,7 +108,8 @@ public boolean isCompleted() {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-                .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode();
+                .append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierStr)
+                .toHashCode();
     }
 
     /**
@@ -139,7 +134,7 @@ public boolean equals(Object obj) {
         ShardInfo other = (ShardInfo) obj;
         return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
                 .append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
-                .append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals();
+                .append(streamIdentifierStr, other.streamIdentifierStr).isEquals();
 
     }
 
@@ -159,8 +154,8 @@ public static String getLeaseKey(ShardInfo shardInfo) {
      * @return lease key
      */
     public static String getLeaseKey(ShardInfo shardInfo, String shardIdOverride) {
-        return shardInfo.streamIdentifierSerOpt().isPresent() ?
-               MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardIdOverride) :
+        return shardInfo.isMultiStreamMode ?
+               MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierStr, shardIdOverride) :
                shardIdOverride;
     }
 
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
index da6d8e075..825b6243d 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
@@ -17,7 +17,6 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -29,7 +28,6 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import software.amazon.kinesis.annotations.KinesisClientInternalApi;
 import software.amazon.kinesis.leases.Lease;
@@ -37,8 +35,6 @@
 import software.amazon.kinesis.leases.LeaseRefresher;
 import software.amazon.kinesis.leases.LeaseRenewer;
 import software.amazon.kinesis.leases.LeaseTaker;
-import software.amazon.kinesis.leases.MultiStreamLease;
-import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.leases.exceptions.DependencyException;
 import software.amazon.kinesis.leases.exceptions.InvalidStateException;
 import software.amazon.kinesis.leases.exceptions.LeasingException;
@@ -366,34 +362,6 @@ private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSiz
                 new LinkedTransferQueue<>(), LEASE_RENEWAL_THREAD_FACTORY);
     }
 
-    @Override
-    public List<ShardInfo> getCurrentAssignments() {
-        Collection<Lease> leases = getAssignments();
-        return convertLeasesToAssignments(leases);
-    }
-
-    private static List<ShardInfo> convertLeasesToAssignments(final Collection<Lease> leases) {
-        if (leases == null) {
-            return Collections.emptyList();
-        }
-        return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
-    }
-
-    /**
-     * Utility method to convert the basic lease or multistream lease to ShardInfo
-     * @param lease
-     * @return ShardInfo
-     */
-    public static ShardInfo convertLeaseToAssignment(final Lease lease) {
-        if (lease instanceof MultiStreamLease) {
-            return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),
-                    lease.checkpoint(), ((MultiStreamLease) lease).streamIdentifier());
-        } else {
-            return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
-                    lease.checkpoint());
-        }
-    }
-
     /**
      * {@inheritDoc}
      *
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java
index ab81d7cea..e1ebfae96 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java
@@ -18,7 +18,6 @@
 import lombok.EqualsAndHashCode;
 import lombok.Value;
 import lombok.experimental.Accessors;
-import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.Lease;
 import software.amazon.kinesis.leases.ShardDetector;
 import software.amazon.kinesis.leases.ShardInfo;
@@ -36,7 +35,6 @@
 @Value
 public class LeasePendingDeletion {
 
-    StreamIdentifier streamIdentifier;
     Lease lease;
     ShardInfo shardInfo;
     ShardDetector shardDetector;
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
index 058b3009b..d818e9e40 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java
@@ -188,7 +188,7 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
             return new InitializeTask(argument.shardInfo(),
                     argument.shardRecordProcessor(),
                     argument.checkpoint(),
-                    argument.recordProcessorCheckpointer(), argument.initialPositionInStream(),
+                    argument.recordProcessorCheckpointer(),
                     argument.recordsPublisher(),
                     argument.taskBackoffTimeMillis(),
                     argument.metricsFactory());
@@ -479,7 +479,6 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
                     argument.recordProcessorCheckpointer(),
                     consumer.shutdownReason(),
                     consumer.shutdownNotification(),
-                    argument.initialPositionInStream(),
                     argument.cleanupLeasesOfCompletedShards(),
                     argument.ignoreUnexpectedChildShards(),
                     argument.leaseCoordinator(),
@@ -488,7 +487,6 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
                     argument.hierarchicalShardSyncer(),
                     argument.metricsFactory(),
                     input == null ? null : input.childShards(),
-                    argument.streamIdentifier(),
                     argument.leaseCleanupManager());
         }
 
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java
index 7816c1e14..d4a2e95b6 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/InitializeTask.java
@@ -51,8 +51,6 @@ public class InitializeTask implements ConsumerTask {
     @NonNull
     private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
     @NonNull
-    private final InitialPositionInStreamExtended initialPositionInStream;
-    @NonNull
     private final RecordsPublisher cache;
 
     // Back off for this interval if we encounter a problem (exception)
@@ -78,6 +76,8 @@ public TaskResult call() {
             final String leaseKey = ShardInfo.getLeaseKey(shardInfo);
             Checkpoint initialCheckpointObject = checkpoint.getCheckpointObject(leaseKey);
             ExtendedSequenceNumber initialCheckpoint = initialCheckpointObject.checkpoint();
+            final InitialPositionInStreamExtended initialPositionInStream =
+                    shardInfo.streamConfig().initialPositionInStreamExtended();
             log.debug("[{}]: Checkpoint: {} -- Initial Position: {}", leaseKey, initialCheckpoint,
                     initialPositionInStream);
 
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
index fb398cdab..fcf1fd5e9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java
@@ -23,7 +23,6 @@
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.kinesis.annotations.KinesisClientInternalApi;
 import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
-import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardDetector;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@@ -120,8 +119,7 @@ public TaskResult call() {
          */
         final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION);
         final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
-        shardInfo.streamIdentifierSerOpt()
-                .ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));
+        MetricsUtil.addStreamId(shardScope, shardInfo.streamConfig().streamIdentifier());
         MetricsUtil.addShardId(shardScope, shardInfo.shardId());
         long startTimeMillis = System.currentTimeMillis();
         boolean success = false;
@@ -218,8 +216,7 @@ private void callProcessRecords(ProcessRecordsInput input, List<KinesisClientRec
                 .millisBehindLatest(input.millisBehindLatest()).build();
 
         final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
-        shardInfo.streamIdentifierSerOpt()
-                .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
+        MetricsUtil.addStreamId(scope, shardInfo.streamConfig().streamIdentifier());
         MetricsUtil.addShardId(scope, shardInfo.shardId());
         final long startTime = System.currentTimeMillis();
         try {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
index 4162ea812..6eb6d76ad 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
@@ -133,7 +133,7 @@ public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executor
         this.recordsPublisher = recordsPublisher;
         this.executorService = executorService;
         this.shardInfo = shardInfo;
-        this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode");
+        this.streamIdentifier = shardInfo.streamConfig().streamIdentifier().serialize();
         this.shardConsumerArgument = shardConsumerArgument;
         this.logWarningForTaskAfterMillis = logWarningForTaskAfterMillis;
         this.taskExecutionListener = taskExecutionListener;
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
index 0518b830d..ec322d3a7 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java
@@ -20,8 +20,6 @@
 import lombok.experimental.Accessors;
 import software.amazon.kinesis.annotations.KinesisClientInternalApi;
 import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
-import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.LeaseCleanupManager;
 import software.amazon.kinesis.leases.LeaseCoordinator;
 import software.amazon.kinesis.leases.ShardDetector;
@@ -43,8 +41,6 @@ public class ShardConsumerArgument {
     @NonNull
     private final ShardInfo shardInfo;
     @NonNull
-    private final StreamIdentifier streamIdentifier;
-    @NonNull
     private final LeaseCoordinator leaseCoordinator;
     @NonNull
     private final ExecutorService executorService;
@@ -63,8 +59,6 @@ public class ShardConsumerArgument {
     private final int maxListShardsRetryAttempts;
     private final boolean shouldCallProcessRecordsEvenForEmptyRecordList;
     private final long idleTimeInMilliseconds;
-    @NonNull
-    private final InitialPositionInStreamExtended initialPositionInStream;
     private final boolean cleanupLeasesOfCompletedShards;
     private final boolean ignoreUnexpectedChildShards;
     @NonNull
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
index d8c9d3791..ca1ed16d4 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java
@@ -27,8 +27,6 @@
 import software.amazon.awssdk.utils.CollectionUtils;
 import software.amazon.kinesis.annotations.KinesisClientInternalApi;
 import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
-import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
 import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.leases.Lease;
@@ -88,8 +86,6 @@ public class ShutdownTask implements ConsumerTask {
     @NonNull
     private final ShutdownReason reason;
     private final ShutdownNotification shutdownNotification;
-    @NonNull
-    private final InitialPositionInStreamExtended initialPositionInStream;
     private final boolean cleanupLeasesOfCompletedShards;
     private final boolean ignoreUnexpectedChildShards;
     @NonNull
@@ -106,8 +102,6 @@ public class ShutdownTask implements ConsumerTask {
 
     private final List<ChildShard> childShards;
     @NonNull
-    private final StreamIdentifier streamIdentifier;
-    @NonNull
     private final LeaseCleanupManager leaseCleanupManager;
 
     /*
@@ -198,8 +192,8 @@ private void takeShardEndAction(Lease currentShardLease,
             createLeasesForChildShardsIfNotExist(scope);
             updateLeaseWithChildShards(currentShardLease);
         }
-        final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
-                shardInfo, shardDetector);
+        final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(
+                currentShardLease, shardInfo, shardDetector);
         if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
             boolean isSuccess = false;
             try {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java
index 5703e1af7..8edd6e4ae 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalFactory.java
@@ -23,11 +23,31 @@
  *
  */
 public interface RetrievalFactory {
-    GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory);
 
+    /**
+     * @deprecated This method was only used by specific implementations of {@link RetrievalFactory} and should not be
+     *             required to be implemented; will be removed in future versions.
+     */
     @Deprecated
+    default GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory) {
+        throw new UnsupportedOperationException("This method is deprecated and should not be used.");
+    }
+
+    /**
+     * Creates a {@link RecordsPublisher} instance to retrieve records for the specified shard.
+     *
+     * @param shardInfo The {@link ShardInfo} representing the shard for which records are to be retrieved.
+     * @param metricsFactory The {@link MetricsFactory} for recording metrics.
+     * @return A {@link RecordsPublisher} instance for retrieving records from the shard.
+     */
     RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory);
 
+    /**
+     * @deprecated {@link ShardInfo} now includes a reference to {@link StreamConfig}.
+     *             Please use {@link #createGetRecordsCache(ShardInfo, MetricsFactory)} instead.
+     *             This method will be removed in future versions.
+     */
+    @Deprecated
     default RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, StreamConfig streamConfig, MetricsFactory metricsFactory) {
         return createGetRecordsCache(shardInfo, metricsFactory);
     }
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java
index bcfb1081d..039d15335 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/fanout/FanOutRetrievalFactory.java
@@ -19,17 +19,14 @@
 import lombok.RequiredArgsConstructor;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.kinesis.annotations.KinesisClientInternalApi;
-import software.amazon.kinesis.common.StreamConfig;
 import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.metrics.MetricsFactory;
-import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
 import software.amazon.kinesis.retrieval.RecordsPublisher;
 import software.amazon.kinesis.retrieval.RetrievalFactory;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.Function;
 
 @RequiredArgsConstructor
@@ -41,36 +38,25 @@ public class FanOutRetrievalFactory implements RetrievalFactory {
     private final String defaultConsumerArn;
     private final Function<String, String> consumerArnCreator;
 
-    private Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
-
-    @Override
-    public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final ShardInfo shardInfo,
-            final MetricsFactory metricsFactory) {
-        return null;
-    }
+    private final Map<StreamIdentifier, String> implicitConsumerArnTracker = new HashMap<>();
 
+    /**
+     + {@inheritDoc}
+     */
     @Override
     public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
-            final StreamConfig streamConfig,
             final MetricsFactory metricsFactory) {
-        final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
-        if (streamIdentifierStr.isPresent()) {
-            final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
+        final StreamIdentifier streamIdentifier = shardInfo.streamConfig().streamIdentifier();
+        if (streamIdentifier.isMultiStreamInstance()) {
             return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
-                    getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),
-                    streamIdentifierStr.get());
+                    getOrCreateConsumerArn(streamIdentifier, shardInfo.streamConfig().consumerArn()),
+                    streamIdentifier.serialize());
         } else {
-            final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName);
             return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
                     getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn));
         }
     }
 
-    @Override
-    public RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory) {
-        throw new UnsupportedOperationException("FanoutRetrievalFactory needs StreamConfig Info");
-    }
-
     private String getOrCreateConsumerArn(StreamIdentifier streamIdentifier, String consumerArn) {
         return consumerArn != null ? consumerArn : implicitConsumerArnTracker
                     .computeIfAbsent(streamIdentifier, sId -> consumerArnCreator.apply(sId.streamName()));
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java
index 071763fc4..21cf9ac71 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousBlockingRetrievalFactory.java
@@ -21,11 +21,9 @@
 import lombok.NonNull;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.kinesis.annotations.KinesisClientInternalApi;
-import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.metrics.MetricsFactory;
 import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
-import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
 import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
 import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
 import software.amazon.kinesis.retrieval.RecordsPublisher;
@@ -50,20 +48,6 @@ public class SynchronousBlockingRetrievalFactory implements RetrievalFactory {
 
     private final Function<DataFetcherProviderConfig, DataFetcher> dataFetcherProvider;
 
-    @Deprecated
-    public SynchronousBlockingRetrievalFactory(String streamName,
-                                               KinesisAsyncClient kinesisClient,
-                                               RecordsFetcherFactory recordsFetcherFactory,
-                                               int maxRecords,
-                                               Duration kinesisRequestTimeout) {
-        this(streamName,
-                kinesisClient,
-                recordsFetcherFactory,
-                maxRecords,
-                kinesisRequestTimeout,
-                defaultDataFetcherProvider(kinesisClient));
-    }
-
     public SynchronousBlockingRetrievalFactory(String streamName,
                                                KinesisAsyncClient kinesisClient,
                                                RecordsFetcherFactory recordsFetcherFactory,
@@ -79,28 +63,19 @@ public SynchronousBlockingRetrievalFactory(String streamName,
                 defaultDataFetcherProvider(kinesisClient) : dataFetcherProvider;
     }
 
-    @Deprecated
-    public SynchronousBlockingRetrievalFactory(String streamName,
-                                               KinesisAsyncClient kinesisClient,
-                                               RecordsFetcherFactory recordsFetcherFactory,
-                                               int maxRecords) {
-        this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, PollingConfig.DEFAULT_REQUEST_TIMEOUT);
-    }
-
     private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetcherProvider(
             KinesisAsyncClient kinesisClient) {
         return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
-                                                                         @NonNull final MetricsFactory metricsFactory) {
-        final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
-                StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
-                StreamIdentifier.singleStreamInstance(streamName);
-
+    public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
+            @NonNull final MetricsFactory metricsFactory) {
         final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(
-                streamIdentifier,
+                shardInfo.streamConfig().streamIdentifier(),
                 shardInfo.shardId(),
                 metricsFactory,
                 maxRecords,
@@ -108,13 +83,7 @@ public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull fi
 
         final DataFetcher dataFetcher = this.dataFetcherProvider.apply(kinesisDataFetcherProviderConfig);
 
-        return new SynchronousGetRecordsRetrievalStrategy(dataFetcher);
-    }
-
-    @Override
-    public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
-            @NonNull final MetricsFactory metricsFactory) {
-        return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo, metricsFactory),
+        return recordsFetcherFactory.createRecordsFetcher(new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
                 shardInfo.shardId(), metricsFactory, maxRecords);
     }
 }
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java
deleted file mode 100644
index efa11e701..000000000
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/SynchronousPrefetchingRetrievalFactory.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2019 Amazon.com, Inc. or its affiliates.
- * Licensed under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package software.amazon.kinesis.retrieval.polling;
-
-import java.time.Duration;
-import java.util.concurrent.ExecutorService;
-import lombok.NonNull;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.kinesis.annotations.KinesisClientInternalApi;
-import software.amazon.kinesis.common.StreamIdentifier;
-import software.amazon.kinesis.leases.ShardInfo;
-import software.amazon.kinesis.metrics.MetricsFactory;
-import software.amazon.kinesis.retrieval.GetRecordsRetrievalStrategy;
-import software.amazon.kinesis.retrieval.KinesisDataFetcherProviderConfig;
-import software.amazon.kinesis.retrieval.RecordsFetcherFactory;
-import software.amazon.kinesis.retrieval.RecordsPublisher;
-import software.amazon.kinesis.retrieval.RetrievalFactory;
-
-/**
- *
- */
-@KinesisClientInternalApi
-public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory {
-    @NonNull
-    private final String streamName;
-    @NonNull
-    private final KinesisAsyncClient kinesisClient;
-    @NonNull
-    private final RecordsFetcherFactory recordsFetcherFactory;
-    private final int maxRecords;
-    @NonNull
-    private final ExecutorService executorService;
-    private final long idleMillisBetweenCalls;
-    private final Duration maxFutureWait;
-
-    @Deprecated
-    public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
-            RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
-            long idleMillisBetweenCalls) {
-        this(streamName, kinesisClient, recordsFetcherFactory, maxRecords, executorService, idleMillisBetweenCalls,
-                PollingConfig.DEFAULT_REQUEST_TIMEOUT);
-    }
-
-    public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncClient kinesisClient,
-            RecordsFetcherFactory recordsFetcherFactory, int maxRecords, ExecutorService executorService,
-            long idleMillisBetweenCalls, Duration maxFutureWait) {
-        this.streamName = streamName;
-        this.kinesisClient = kinesisClient;
-        this.recordsFetcherFactory = recordsFetcherFactory;
-        this.maxRecords = maxRecords;
-        this.executorService = executorService;
-        this.idleMillisBetweenCalls = idleMillisBetweenCalls;
-        this.maxFutureWait = maxFutureWait;
-    }
-
-    @Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
-            @NonNull final MetricsFactory metricsFactory) {
-        final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
-                StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
-                StreamIdentifier.singleStreamInstance(streamName);
-
-        return new SynchronousGetRecordsRetrievalStrategy(
-                new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig(
-                        streamIdentifier,
-                        shardInfo.shardId(),
-                        metricsFactory,
-                        maxRecords,
-                        maxFutureWait
-                )));
-    }
-
-    @Override
-    public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
-            @NonNull final MetricsFactory metricsFactory) {
-        return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(),
-                recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords,
-                createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls,
-                metricsFactory, "Prefetching", shardInfo.shardId());
-    }
-}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java
index 98ce1dc58..74ee4f687 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/checkpoint/ShardShardRecordProcessorCheckpointerTest.java
@@ -30,6 +30,10 @@
 import org.mockito.runners.MockitoJUnitRunner;
 
 import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.processor.Checkpointer;
 import software.amazon.kinesis.processor.PreparedCheckpointer;
@@ -40,6 +44,11 @@
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardShardRecordProcessorCheckpointerTest {
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
     private String startingSequenceNumber = "13";
     private ExtendedSequenceNumber startingExtendedSequenceNumber = new ExtendedSequenceNumber(startingSequenceNumber);
     private String testConcurrencyToken = "testToken";
@@ -57,7 +66,8 @@ public void setup() throws Exception {
         checkpoint.setCheckpoint(shardId, startingExtendedSequenceNumber, testConcurrencyToken);
         assertThat(this.startingExtendedSequenceNumber, equalTo(checkpoint.getCheckpoint(shardId)));
 
-        shardInfo = new ShardInfo(shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
+        shardInfo = new ShardInfo(
+                shardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
     }
 
     /**
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
index 9671bb78e..20faa671a 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
@@ -17,6 +17,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
@@ -39,6 +40,7 @@
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -47,11 +49,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
+import com.amazonaws.util.CollectionUtils;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Sets;
 import io.reactivex.rxjava3.plugins.RxJavaPlugins;
@@ -61,11 +66,13 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 
 import org.mockito.stubbing.OngoingStubbing;
+import software.amazon.awssdk.arns.Arn;
 import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
@@ -78,6 +85,7 @@
 import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.exceptions.KinesisClientLibException;
 import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
+import software.amazon.kinesis.leases.Lease;
 import software.amazon.kinesis.leases.LeaseCleanupManager;
 import software.amazon.kinesis.leases.HierarchicalShardSyncer;
 import software.amazon.kinesis.leases.LeaseCoordinator;
@@ -111,6 +119,7 @@
 import software.amazon.kinesis.processor.ProcessorConfig;
 import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
 import software.amazon.kinesis.processor.ShardRecordProcessor;
+import software.amazon.kinesis.processor.StreamTracker;
 import software.amazon.kinesis.retrieval.RecordsPublisher;
 import software.amazon.kinesis.retrieval.RetrievalConfig;
 import software.amazon.kinesis.retrieval.RetrievalFactory;
@@ -129,6 +138,10 @@ public class SchedulerTest {
     private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L;
     private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
     private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final String TEST_SHARD_ID = "shardId-000000000001";
+    private static final UUID TEST_CONCURRENCY_TOKEN = UUID.randomUUID();
 
     private Scheduler scheduler;
     private ShardRecordProcessorFactory shardRecordProcessorFactory;
@@ -139,6 +152,7 @@ public class SchedulerTest {
     private MetricsConfig metricsConfig;
     private ProcessorConfig processorConfig;
     private RetrievalConfig retrievalConfig;
+    private StreamConfig streamConfig;
 
     @Mock
     private KinesisAsyncClient kinesisClient;
@@ -196,6 +210,7 @@ public void setup() {
 
         scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
                 metricsConfig, processorConfig, retrievalConfig);
+        streamConfig = scheduler.currentStreamConfigMap().values().iterator().next();
     }
 
     /**
@@ -212,9 +227,9 @@ public void testGetStageName() {
 
     @Test
     public final void testCreateOrGetShardConsumer() {
-        final String shardId = "shardId-000000000000";
         final String concurrencyToken = "concurrencyToken";
-        final ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
+        final ShardInfo shardInfo =
+                new ShardInfo(TEST_SHARD_ID, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig);
         final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
         assertNotNull(shardConsumer1);
         final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
@@ -223,8 +238,8 @@ public final void testCreateOrGetShardConsumer() {
         assertSame(shardConsumer1, shardConsumer2);
 
         final String anotherConcurrencyToken = "anotherConcurrencyToken";
-        final ShardInfo shardInfo2 = new ShardInfo(shardId, anotherConcurrencyToken, null,
-                ExtendedSequenceNumber.TRIM_HORIZON);
+        final ShardInfo shardInfo2 = new ShardInfo(
+                TEST_SHARD_ID, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig);
         final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory, leaseCleanupManager);
         assertNotNull(shardConsumer3);
 
@@ -234,33 +249,29 @@ public final void testCreateOrGetShardConsumer() {
     // TODO: figure out the behavior of the test.
     @Test
     public void testWorkerLoopWithCheckpoint() throws Exception {
-        final String shardId = "shardId-000000000000";
-        final String concurrencyToken = "concurrencyToken";
         final ExtendedSequenceNumber firstSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON;
         final ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("1000");
         final ExtendedSequenceNumber finalSequenceNumber = new ExtendedSequenceNumber("2000");
-
-        final List<ShardInfo> initialShardInfo = Collections.singletonList(
-                new ShardInfo(shardId, concurrencyToken, null, firstSequenceNumber));
-        final List<ShardInfo> firstShardInfo = Collections.singletonList(
-                new ShardInfo(shardId, concurrencyToken, null, secondSequenceNumber));
-        final List<ShardInfo> secondShardInfo = Collections.singletonList(
-                new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber));
-
         final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
+        when(checkpoint.getCheckpointObject(eq(TEST_SHARD_ID))).thenReturn(firstCheckpoint);
 
-        when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
-        when(checkpoint.getCheckpointObject(eq(shardId))).thenReturn(firstCheckpoint);
+        when(leaseCoordinator.getAssignments()).thenReturn(
+                Stream.of(firstSequenceNumber, secondSequenceNumber, finalSequenceNumber)
+                        .map(SchedulerTest::constructLease)
+                        .collect(Collectors.toList()));
 
         Scheduler schedulerSpy = spy(scheduler);
         schedulerSpy.runProcessLoop();
         schedulerSpy.runProcessLoop();
         schedulerSpy.runProcessLoop();
 
-        verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
-        verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
-        verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
-        verify(checkpoint).getCheckpointObject(eq(shardId));
+        final ShardInfo expectedShardInfo =
+                new ShardInfo(TEST_SHARD_ID, TEST_CONCURRENCY_TOKEN.toString(), null, null, streamConfig);
+        verify(schedulerSpy).buildConsumer(
+                eq(expectedShardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
+        // consumer is only built a total of once for all the three returned assignments
+        verify(schedulerSpy, times(1)).buildConsumer(any(), any(), any());
+        verify(checkpoint).getCheckpointObject(eq(TEST_SHARD_ID));
     }
 
     @Test
@@ -270,10 +281,12 @@ public final void testCleanupShardConsumers() {
         final String concurrencyToken = "concurrencyToken";
         final String anotherConcurrencyToken = "anotherConcurrencyToken";
 
-        final ShardInfo shardInfo0 = new ShardInfo(shard0, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
-        final ShardInfo shardInfo0WithAnotherConcurrencyToken = new ShardInfo(shard0, anotherConcurrencyToken, null,
-                ExtendedSequenceNumber.TRIM_HORIZON);
-        final ShardInfo shardInfo1 = new ShardInfo(shard1, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
+        final ShardInfo shardInfo0 =
+                new ShardInfo(shard0, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig);
+        final ShardInfo shardInfo0WithAnotherConcurrencyToken =
+                new ShardInfo(shard0, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig);
+        final ShardInfo shardInfo1 =
+                new ShardInfo(shard1, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, streamConfig);
 
         final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory, leaseCleanupManager);
         final ShardConsumer shardConsumer0WithAnotherConcurrencyToken =
@@ -362,25 +375,23 @@ public final void testMultiStreamInitializationWithFailures() {
 
     @Test
     public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() throws KinesisClientLibException {
-        final String shardId = "shardId-000000000000";
-        final String concurrencyToken = "concurrencyToken";
         final ExtendedSequenceNumber firstSequenceNumber = ExtendedSequenceNumber.TRIM_HORIZON;
         final ExtendedSequenceNumber secondSequenceNumber = new ExtendedSequenceNumber("1000");
         final ExtendedSequenceNumber finalSequenceNumber = new ExtendedSequenceNumber("2000");
 
-        final List<ShardInfo> initialShardInfo = multiStreamTracker.streamConfigList().stream()
-                .map(sc -> new ShardInfo(shardId, concurrencyToken, null, firstSequenceNumber,
-                        sc.streamIdentifier().serialize())).collect(Collectors.toList());
-        final List<ShardInfo> firstShardInfo = multiStreamTracker.streamConfigList().stream()
-                .map(sc -> new ShardInfo(shardId, concurrencyToken, null, secondSequenceNumber,
-                        sc.streamIdentifier().serialize())).collect(Collectors.toList());
-        final List<ShardInfo> secondShardInfo = multiStreamTracker.streamConfigList().stream()
-                .map(sc -> new ShardInfo(shardId, concurrencyToken, null, finalSequenceNumber,
-                        sc.streamIdentifier().serialize())).collect(Collectors.toList());
+        final List<Lease> initialLeases = multiStreamTracker.streamConfigList().stream()
+                .map(sc -> constructMultiStreamLease(sc.streamIdentifier().serialize(), firstSequenceNumber))
+                .collect(Collectors.toList());
+        final List<Lease> firstLeases = multiStreamTracker.streamConfigList().stream()
+                .map(sc -> constructMultiStreamLease(sc.streamIdentifier().serialize(), secondSequenceNumber))
+                .collect(Collectors.toList());
+        final List<Lease> secondLeases = multiStreamTracker.streamConfigList().stream()
+                .map(sc -> constructMultiStreamLease(sc.streamIdentifier().serialize(), finalSequenceNumber))
+                .collect(Collectors.toList());
 
         final Checkpoint firstCheckpoint = new Checkpoint(firstSequenceNumber, null, null);
 
-        when(leaseCoordinator.getCurrentAssignments()).thenReturn(initialShardInfo, firstShardInfo, secondShardInfo);
+        when(leaseCoordinator.getAssignments()).thenReturn(initialLeases, firstLeases, secondLeases);
         when(checkpoint.getCheckpointObject(anyString())).thenReturn(firstCheckpoint);
         retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
                 .retrievalFactory(retrievalFactory);
@@ -391,12 +402,14 @@ public final void testMultiStreamConsumersAreBuiltOncePerAccountStreamShard() th
         schedulerSpy.runProcessLoop();
         schedulerSpy.runProcessLoop();
 
-        initialShardInfo.forEach(
-                shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager)));
-        firstShardInfo.forEach(
-                shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
-        secondShardInfo.forEach(
-                shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
+        final List<ShardInfo> expectedShardInfos = multiStreamTracker.streamConfigList().stream()
+                .map(sc -> new ShardInfo(TEST_SHARD_ID, TEST_CONCURRENCY_TOKEN.toString(), null, null, sc))
+                .collect(Collectors.toList());
+
+        expectedShardInfos.forEach(shardInfo -> verify(schedulerSpy).buildConsumer(
+                eq(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager)));
+        // consumer is only built once for each of the unique shards assigned to it
+        verify(schedulerSpy, times(multiStreamTracker.streamConfigList().size())).buildConsumer(any(), any(), any());
     }
 
     @Test
@@ -1080,6 +1093,114 @@ private void mockListLeases(List<StreamConfig> configs) throws ProvisionedThroug
                         .shardId("some_random_shard_id")).collect(Collectors.toList()));
     }
 
+    @Test
+    public void testShardInfoConstructionFromSingleStreamLease() {
+        final List<String> parentShardIds = Arrays.asList("shardId-000000000000", "shardId-000000000001");
+        final ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("1234");
+
+        final Lease lease = constructLease(parentShardIds, checkpoint);
+        when(leaseCoordinator.getAssignments()).thenReturn(Collections.singletonList(lease));
+
+        final Scheduler schedulerSpy = spy(scheduler);
+        schedulerSpy.runProcessLoop();
+
+        final ArgumentCaptor<ShardInfo> shardInfoArgumentCaptor = ArgumentCaptor.forClass(ShardInfo.class);
+        verify(schedulerSpy).buildConsumer(shardInfoArgumentCaptor.capture(), any(), any());
+
+        final ShardInfo expectedShardInfo = new ShardInfo(
+                TEST_SHARD_ID, TEST_CONCURRENCY_TOKEN.toString(), parentShardIds, checkpoint, streamConfig);
+        final ShardInfo actualShardInfo = shardInfoArgumentCaptor.getValue();
+        assertEquals(expectedShardInfo, actualShardInfo);
+        // checkpoint is not included in ShardInfo equality check
+        assertEquals(expectedShardInfo.checkpoint(), actualShardInfo.checkpoint());
+    }
+
+    @Test
+    public void testShardInfoConstructionFromMultiStreamLeases() {
+        final StreamConfig streamConfigWithSerialization = new StreamConfig(StreamIdentifier.multiStreamInstance(
+                "123456789012:streamBySerialization:1111111111"), TEST_INITIAL_POSITION);
+        final StreamConfig streamConfigWithArn = new StreamConfig(StreamIdentifier.multiStreamInstance(
+                Arn.fromString("arn:aws:kinesis:us-east-1:123456789012:stream/streamByArn"), 2222222222L),
+                TEST_INITIAL_POSITION);
+        final StreamConfig streamConfigForOrphanStream = new StreamConfig(StreamIdentifier.multiStreamInstance(
+                "123456789012:streamNotProvidedInStreamConfigList:3333333333"), TEST_INITIAL_POSITION);
+
+        final StreamTracker streamTracker = new MultiStreamTracker() {
+            @Override
+            public List<StreamConfig> streamConfigList() {
+                return Arrays.asList(streamConfigWithSerialization, streamConfigWithArn);
+            }
+            @Override
+            public FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy() {
+                return new FormerStreamsLeasesDeletionStrategy.NoLeaseDeletionStrategy();
+            }
+        };
+
+        retrievalConfig =
+                new RetrievalConfig(kinesisClient, streamTracker, applicationName).retrievalFactory(retrievalFactory);
+        scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
+                metricsConfig, processorConfig, retrievalConfig);
+        when(leaseCoordinator.getAssignments()).thenReturn(
+                Stream.of(streamConfigWithSerialization, streamConfigWithArn, streamConfigForOrphanStream)
+                        .map(streamConfig -> constructMultiStreamLease(streamConfig.streamIdentifier().serialize()))
+                        .collect(Collectors.toList()));
+
+        final Scheduler schedulerSpy = spy(scheduler);
+        schedulerSpy.runProcessLoop();
+
+        final ArgumentCaptor<ShardInfo> shardInfoArgumentCaptor = ArgumentCaptor.forClass(ShardInfo.class);
+        verify(schedulerSpy, times(3)).buildConsumer(
+                shardInfoArgumentCaptor.capture(),
+                any(ShardRecordProcessorFactory.class),
+                any(LeaseCleanupManager.class));
+
+        // shardInfo should be constructed with a reference to the original streamConfig from streamConfigList
+        assertSame(streamConfigWithSerialization, shardInfoArgumentCaptor.getAllValues().get(0).streamConfig());
+        assertSame(streamConfigWithArn, shardInfoArgumentCaptor.getAllValues().get(1).streamConfig());
+
+        // for a streamConfig that is not present in the streamConfigList/currentStreamConfigMap,
+        // shardInfo is constructed with a newly created StreamConfig instance
+        final StreamConfig actualStreamConfigForOrphanStream =
+                shardInfoArgumentCaptor.getAllValues().get(2).streamConfig();
+        assertEquals(streamConfigForOrphanStream.streamIdentifier(),
+                actualStreamConfigForOrphanStream.streamIdentifier());
+        assertNotSame(streamConfigForOrphanStream, actualStreamConfigForOrphanStream);
+        assertNotEquals(streamConfigForOrphanStream.initialPositionInStreamExtended(),
+                actualStreamConfigForOrphanStream.initialPositionInStreamExtended());
+        assertEquals(multiStreamTracker.orphanedStreamInitialPositionInStream(),
+                actualStreamConfigForOrphanStream.initialPositionInStreamExtended());
+    }
+
+    private static Lease constructMultiStreamLease(String streamIdentifier) {
+        return constructMultiStreamLease(streamIdentifier, ExtendedSequenceNumber.TRIM_HORIZON);
+    }
+
+    private static Lease constructMultiStreamLease(String streamIdentifier, ExtendedSequenceNumber checkpoint) {
+        final MultiStreamLease lease = new MultiStreamLease();
+        lease.streamIdentifier(streamIdentifier);
+        lease.shardId(TEST_SHARD_ID);
+        return updateLease(lease, String.join(":", streamIdentifier, TEST_SHARD_ID), null, checkpoint);
+    }
+
+    private static Lease constructLease(ExtendedSequenceNumber checkpoint) {
+        return constructLease(null, checkpoint);
+    }
+
+    private static Lease constructLease(Collection<String> parentShardIds, ExtendedSequenceNumber checkpoint) {
+        return updateLease(new Lease(), TEST_SHARD_ID, parentShardIds, checkpoint);
+    }
+
+    private static Lease updateLease(Lease leaseToUpdate, String leaseKey, Collection<String> parentShardIds,
+            ExtendedSequenceNumber checkpoint) {
+        leaseToUpdate.leaseKey(leaseKey);
+        leaseToUpdate.concurrencyToken(TEST_CONCURRENCY_TOKEN);
+        if (!CollectionUtils.isNullOrEmpty(parentShardIds)) {
+            leaseToUpdate.parentShardIds(parentShardIds);
+        }
+        leaseToUpdate.checkpoint(checkpoint);
+        return leaseToUpdate;
+    }
+
     /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception {
         final int numberOfRecordsPerShard = 10;
         final String kinesisShardPrefix = "kinesis-0-";
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java
index 9a731f802..d0ce2841b 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java
@@ -23,6 +23,9 @@
 import org.mockito.runners.MockitoJUnitRunner;
 import software.amazon.awssdk.services.kinesis.model.ChildShard;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
 import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
 import software.amazon.kinesis.metrics.MetricsFactory;
@@ -45,10 +48,13 @@
 @RunWith(MockitoJUnitRunner.class)
 public class LeaseCleanupManagerTest {
 
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
     private static final ShardInfo SHARD_INFO = new ShardInfo("shardId", "concurrencyToken",
-            Collections.emptySet(), ExtendedSequenceNumber.LATEST);
-
-    private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+            Collections.emptySet(), ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
 
     private final long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
     private final long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
@@ -175,7 +181,7 @@ private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenc
     @Test
     public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
         final ShardInfo shardInfo = new ShardInfo("shardId-0", "concurrencyToken", Collections.singleton("parent"),
-                ExtendedSequenceNumber.LATEST);
+                ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
 
         verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
     }
@@ -296,6 +302,6 @@ private List<ChildShard> childShardsForMerge() {
     }
 
     private LeasePendingDeletion createLeasePendingDeletion(final Lease lease, final ShardInfo shardInfo) {
-        return new LeasePendingDeletion(STREAM_IDENTIFIER, lease, shardInfo, shardDetector);
+        return new LeasePendingDeletion(lease, shardInfo, shardDetector);
     }
 }
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java
index 5147ba796..daa797045 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ParentsFirstShardPrioritizationUnitTest.java
@@ -25,9 +25,18 @@
 
 import org.junit.Test;
 
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
 
 public class ParentsFirstShardPrioritizationUnitTest {
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
 
     @Test(expected = IllegalArgumentException.class)
     public void testMaxDepthNegativeShouldFail() {
@@ -193,7 +202,7 @@ ShardInfoBuilder withCheckpoint(ExtendedSequenceNumber checkpoint) {
         }
 
         ShardInfo build() {
-            return new ShardInfo(shardId, concurrencyToken, parentShardIds, checkpoint);
+            return new ShardInfo(shardId, concurrencyToken, parentShardIds, checkpoint, TEST_STREAM_CONFIG);
         }
     }
 
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java
index 4ccafe523..f96f6048f 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardInfoTest.java
@@ -17,6 +17,7 @@
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -29,9 +30,18 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
 
 public class ShardInfoTest {
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
     private static final String CONCURRENCY_TOKEN = UUID.randomUUID().toString();
     private static final String SHARD_ID = "shardId-test";
     private final Set<String> parentShardIds = new HashSet<>();
@@ -43,12 +53,14 @@ public void setUpPacboyShardInfo() {
         parentShardIds.add("shard-1");
         parentShardIds.add("shard-2");
 
-        testShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST);
+        testShardInfo = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
     }
 
     @Test
     public void testPacboyShardInfoEqualsWithSameArgs() {
-        ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST);
+        final ShardInfo equalShardInfo = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
         assertTrue("Equal should return true for arguments all the same", testShardInfo.equals(equalShardInfo));
     }
 
@@ -59,11 +71,15 @@ public void testPacboyShardInfoEqualsWithNull() {
 
     @Test
     public void testPacboyShardInfoEqualsForfToken() {
-        ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(), parentShardIds, ExtendedSequenceNumber.LATEST);
-        assertFalse("Equal should return false with different concurrency token",
-                diffShardInfo.equals(testShardInfo));
-        diffShardInfo = new ShardInfo(SHARD_ID, null, parentShardIds, ExtendedSequenceNumber.LATEST);
-        assertFalse("Equal should return false for null concurrency token", diffShardInfo.equals(testShardInfo));
+        final ShardInfo shardInfoWithDifferentConcurrencyToken = new ShardInfo(SHARD_ID, UUID.randomUUID().toString(),
+                parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
+        assertNotEquals("Equal should return false with different concurrency token",
+                shardInfoWithDifferentConcurrencyToken, testShardInfo);
+
+        final ShardInfo shardInfoWithNullConcurrencyToken =
+                new ShardInfo(SHARD_ID, null, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
+        assertNotEquals("Equal should return false for null concurrency token",
+                shardInfoWithNullConcurrencyToken, testShardInfo);
     }
 
     @Test
@@ -72,7 +88,8 @@ public void testPacboyShardInfoEqualsForDifferentlyOrderedParentIds() {
         differentlyOrderedParentShardIds.add("shard-2");
         differentlyOrderedParentShardIds.add("shard-1");
         ShardInfo shardInfoWithDifferentlyOrderedParentShardIds =
-                new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds, ExtendedSequenceNumber.LATEST);
+                new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, differentlyOrderedParentShardIds,
+                        ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
         assertTrue("Equal should return true even with parent shard Ids reordered",
                 shardInfoWithDifferentlyOrderedParentShardIds.equals(testShardInfo));
     }
@@ -82,20 +99,24 @@ public void testPacboyShardInfoEqualsForParentIds() {
         Set<String> diffParentIds = new HashSet<>();
         diffParentIds.add("shard-3");
         diffParentIds.add("shard-4");
-        ShardInfo diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, diffParentIds, ExtendedSequenceNumber.LATEST);
-        assertFalse("Equal should return false with different parent shard Ids",
-                diffShardInfo.equals(testShardInfo));
-        diffShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, null, ExtendedSequenceNumber.LATEST);
-        assertFalse("Equal should return false with null parent shard Ids", diffShardInfo.equals(testShardInfo));
+        final ShardInfo shardInfoWithDifferentParents = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, diffParentIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
+        assertNotEquals("Equal should return false with different parent shard Ids",
+                shardInfoWithDifferentParents, testShardInfo);
+        final ShardInfo shardInfoWithNullParents = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, null, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
+        assertNotEquals("Equal should return false with null parent shard Ids",
+                shardInfoWithNullParents, testShardInfo);
     }
 
     @Test
     public void testShardInfoCheckpointEqualsHashCode() {
-        ShardInfo baseInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds,
-                ExtendedSequenceNumber.TRIM_HORIZON);
-        ShardInfo differentCheckpoint = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds,
-                new ExtendedSequenceNumber("1234"));
-        ShardInfo nullCheckpoint = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, null);
+        final ShardInfo baseInfo = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
+        final ShardInfo differentCheckpoint = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, new ExtendedSequenceNumber("1234"), TEST_STREAM_CONFIG);
+        final ShardInfo nullCheckpoint = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, null, TEST_STREAM_CONFIG);
 
         assertThat("Checkpoint should not be included in equality.", baseInfo.equals(differentCheckpoint), is(true));
         assertThat("Checkpoint should not be included in equality.", baseInfo.equals(nullCheckpoint), is(true));
@@ -108,7 +129,8 @@ public void testShardInfoCheckpointEqualsHashCode() {
 
     @Test
     public void testPacboyShardInfoSameHashCode() {
-        ShardInfo equalShardInfo = new ShardInfo(SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST);
+        final ShardInfo equalShardInfo = new ShardInfo(
+                SHARD_ID, CONCURRENCY_TOKEN, parentShardIds, ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
         assertTrue("Shard info objects should have same hashCode for the same arguments",
                 equalShardInfo.hashCode() == testShardInfo.hashCode());
     }
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java
index 61473833f..e58df5bd6 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BlockOnParentShardTaskTest.java
@@ -26,6 +26,10 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.Lease;
 import software.amazon.kinesis.leases.LeaseRefresher;
 import software.amazon.kinesis.leases.ShardInfo;
@@ -38,16 +42,28 @@
  *
  */
 public class BlockOnParentShardTaskTest {
+    private static final String TEST_STREAM_NAME = "stream";
+    private static final String TEST_ACCOUNT_ID = "123456789012";
+    private static final long TEST_CREATION_EPOCH = 1234567890L;
+    private static final String TEST_STREAM_ID_SERIALIZATION =
+            String.join(":", TEST_ACCOUNT_ID, TEST_STREAM_NAME, String.valueOf(TEST_CREATION_EPOCH));
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG = new StreamConfig(
+            StreamIdentifier.singleStreamInstance(TEST_STREAM_NAME), TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
+    private static final StreamConfig TEST_MULTI_STREAM_CONFIG = new StreamConfig(
+            StreamIdentifier.multiStreamInstance(TEST_STREAM_ID_SERIALIZATION),
+            TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
     private final long backoffTimeInMillis = 50L;
     private final String shardId = "shardId-97";
-    private final String streamId = "123:stream:146";
     private final String concurrencyToken = "testToken";
     private final List<String> emptyParentShardIds = new ArrayList<>();
     private ShardInfo shardInfo;
 
     @Before
     public void setup() {
-        shardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds, ExtendedSequenceNumber.TRIM_HORIZON);
+        shardInfo = new ShardInfo(shardId, concurrencyToken, emptyParentShardIds,
+                ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
     }
 
     /**
@@ -94,14 +110,16 @@ public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinished()
 
         // test single parent
         parentShardIds.add(parent1ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON);
+        shardInfo = new ShardInfo(
+                shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNull(result.getException());
 
         // test two parents
         parentShardIds.add(parent2ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON);
+        shardInfo = new ShardInfo(
+                shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNull(result.getException());
@@ -118,8 +136,8 @@ public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMu
             throws DependencyException, InvalidStateException, ProvisionedThroughputException {
         ShardInfo shardInfo = null;
         BlockOnParentShardTask task = null;
-        String parent1LeaseKey = streamId + ":" + "shardId-1";
-        String parent2LeaseKey = streamId + ":" + "shardId-2";
+        final String parent1LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-1";
+        final String parent2LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-2";
         String parent1ShardId = "shardId-1";
         String parent2ShardId = "shardId-2";
         List<String> parentShardIds = new ArrayList<>();
@@ -136,15 +154,16 @@ public final void testCallShouldNotThrowBlockedOnParentWhenParentsHaveFinishedMu
 
         // test single parent
         parentShardIds.add(parent1ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON,
-                streamId);
+        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds,
+                ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNull(result.getException());
 
         // test two parents
         parentShardIds.add(parent2ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
+        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds,
+                ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNull(result.getException());
@@ -178,14 +197,16 @@ public final void testCallWhenParentsHaveNotFinished()
 
         // test single parent
         parentShardIds.add(parent1ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON);
+        shardInfo = new ShardInfo(
+                shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNotNull(result.getException());
 
         // test two parents
         parentShardIds.add(parent2ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON);
+        shardInfo = new ShardInfo(
+                shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNotNull(result.getException());
@@ -203,8 +224,8 @@ public final void testCallWhenParentsHaveNotFinishedMultiStream()
 
         ShardInfo shardInfo = null;
         BlockOnParentShardTask task = null;
-        String parent1LeaseKey = streamId + ":" + "shardId-1";
-        String parent2LeaseKey = streamId + ":" + "shardId-2";
+        String parent1LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-1";
+        String parent2LeaseKey = TEST_STREAM_ID_SERIALIZATION + ":" + "shardId-2";
         String parent1ShardId = "shardId-1";
         String parent2ShardId = "shardId-2";
         List<String> parentShardIds = new ArrayList<>();
@@ -222,14 +243,16 @@ public final void testCallWhenParentsHaveNotFinishedMultiStream()
 
         // test single parent
         parentShardIds.add(parent1ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
+        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds,
+                ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNotNull(result.getException());
 
         // test two parents
         parentShardIds.add(parent2ShardId);
-        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, streamId);
+        shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds,
+                ExtendedSequenceNumber.TRIM_HORIZON, TEST_MULTI_STREAM_CONFIG);
         task = new BlockOnParentShardTask(shardInfo, leaseRefresher, backoffTimeInMillis);
         result = task.call();
         assertNotNull(result.getException());
@@ -249,7 +272,8 @@ public final void testCallBeforeAndAfterAParentFinishes()
         String parentShardId = "shardId-1";
         List<String> parentShardIds = new ArrayList<>();
         parentShardIds.add(parentShardId);
-        ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON);
+        final ShardInfo shardInfo = new ShardInfo(
+                shardId, concurrencyToken, parentShardIds, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
         TaskResult result = null;
         Lease parentLease = new Lease();
         LeaseRefresher leaseRefresher = mock(LeaseRefresher.class);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
index 6551f9496..9e23f4d5e 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java
@@ -41,6 +41,7 @@
 import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
 import software.amazon.kinesis.common.InitialPositionInStream;
 import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
 import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.LeaseCleanupManager;
 import software.amazon.kinesis.leases.LeaseCoordinator;
@@ -113,15 +114,15 @@ public class ConsumerStatesTest {
 
     @Before
     public void setup() {
-        argument = new ShardConsumerArgument(shardInfo, StreamIdentifier.singleStreamInstance(STREAM_NAME),
-                leaseCoordinator, executorService, recordsPublisher,
+        argument = new ShardConsumerArgument(shardInfo, leaseCoordinator, executorService, recordsPublisher,
                 shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis,
                 taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
                 maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
-                INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
+                cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
                 new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder);
         when(shardInfo.shardId()).thenReturn("shardId-000000000000");
-        when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
+        when(shardInfo.streamConfig()).thenReturn(new StreamConfig(
+                StreamIdentifier.singleStreamInstance(STREAM_NAME), INITIAL_POSITION_IN_STREAM));
         consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
                 argument, taskExecutionListener, 0));
         when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
index 1e89d8cc5..576d6d065 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java
@@ -69,6 +69,10 @@
 import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
+import software.amazon.kinesis.common.InitialPositionInStream;
+import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardDetector;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
@@ -85,6 +89,11 @@
 
 @RunWith(MockitoJUnitRunner.class)
 public class ProcessTaskTest {
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
     private static final long IDLE_TIME_IN_MILLISECONDS = 100L;
     private static final Schema SCHEMA_REGISTRY_SCHEMA = new Schema("{}", "AVRO", "demoSchema");
     private static final byte[] SCHEMA_REGISTRY_PAYLOAD = new byte[] {01, 05, 03, 05};
@@ -119,7 +128,7 @@ public class ProcessTaskTest {
     public void setUpProcessTask() {
         when(checkpointer.checkpointer()).thenReturn(mock(Checkpointer.class));
 
-        shardInfo = new ShardInfo(shardId, null, null, null);
+        shardInfo = new ShardInfo(shardId, null, null, null, TEST_STREAM_CONFIG);
     }
 
     private ProcessTask makeProcessTask(ProcessRecordsInput processRecordsInput) {
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
index 4299c1632..348931c2f 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java
@@ -60,8 +60,11 @@
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.common.InitialPositionInStream;
 import software.amazon.kinesis.common.InitialPositionInStreamExtended;
 import software.amazon.kinesis.common.RequestDetails;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
 import software.amazon.kinesis.retrieval.KinesisClientRecord;
@@ -73,6 +76,11 @@
 @Slf4j
 @RunWith(MockitoJUnitRunner.class)
 public class ShardConsumerSubscriberTest {
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
 
     private final Object processedNotifier = new Object();
 
@@ -104,8 +112,8 @@ public void before() {
                 .setNameFormat("test-" + testName.getMethodName() + "-%04d").setDaemon(true).build());
         recordsPublisher = new TestPublisher();
 
-        ShardInfo shardInfo = new ShardInfo("shard-001", "", Collections.emptyList(),
-                ExtendedSequenceNumber.TRIM_HORIZON);
+        final ShardInfo shardInfo = new ShardInfo(
+                "shard-001", "", Collections.emptyList(), ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
         when(shardConsumer.shardInfo()).thenReturn(shardInfo);
 
         processRecordsInput = ProcessRecordsInput.builder().records(Collections.emptyList())
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java
index 62fd13ef5..97444c85c 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java
@@ -71,8 +71,11 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import lombok.extern.slf4j.Slf4j;
+import software.amazon.kinesis.common.InitialPositionInStream;
 import software.amazon.kinesis.common.InitialPositionInStreamExtended;
 import software.amazon.kinesis.common.RequestDetails;
+import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
 import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
@@ -88,7 +91,11 @@
 @RunWith(MockitoJUnitRunner.class)
 @Slf4j
 public class ShardConsumerTest {
-
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION_IN_STREAM_EXTENDED =
+            InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(TEST_STREAM_IDENTIFIER, TEST_INITIAL_POSITION_IN_STREAM_EXTENDED);
     private final String shardId = "shardId-0-0";
     private final String concurrencyToken = "TestToken";
     private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails();
@@ -148,7 +155,8 @@ public class ShardConsumerTest {
 
     @Before
     public void before() {
-        shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
+        shardInfo = new ShardInfo(
+                shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON, TEST_STREAM_CONFIG);
         ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-" + testName.getMethodName() + "-%04d")
                 .setDaemon(true).build();
         executorService = new ThreadPoolExecutor(4, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory);
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
index b79ffc036..8b82e314d 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java
@@ -49,6 +49,7 @@
 import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
 import software.amazon.kinesis.common.InitialPositionInStream;
 import software.amazon.kinesis.common.InitialPositionInStreamExtended;
+import software.amazon.kinesis.common.StreamConfig;
 import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
 import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@@ -86,13 +87,15 @@ public class ShutdownTaskTest {
     private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
 
     private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
+    private static final StreamConfig TEST_STREAM_CONFIG =
+            new StreamConfig(STREAM_IDENTIFIER, INITIAL_POSITION_TRIM_HORIZON);
 
     /**
      * Shard id for the default-provided {@link ShardInfo} and {@link Lease}.
      */
     private static final String SHARD_ID = "shardId-0";
     private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, "concurrencyToken",
-            Collections.emptySet(), ExtendedSequenceNumber.LATEST);
+            Collections.emptySet(), ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
 
     private ShutdownTask task;
 
@@ -274,7 +277,7 @@ public final void testMergeChildWhereBothParentsHaveLeases() throws Exception {
     public final void testCallWhenShardNotFound() throws Exception {
         final Lease lease = setupLease("shardId-4", Collections.emptyList());
         final ShardInfo shardInfo = new ShardInfo(lease.leaseKey(), "concurrencyToken", Collections.emptySet(),
-                ExtendedSequenceNumber.LATEST);
+                ExtendedSequenceNumber.LATEST, TEST_STREAM_CONFIG);
 
         final TaskResult result = createShutdownTask(SHARD_END, Collections.emptyList(), shardInfo).call();
 
@@ -394,16 +397,16 @@ private ShutdownTask createShutdownTask(final ShutdownReason reason, final List<
     private ShutdownTask createShutdownTask(final ShutdownReason reason, final List<ChildShard> childShards,
             final ShardInfo shardInfo) {
         return new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
-                reason, null, INITIAL_POSITION_TRIM_HORIZON, false, false,
+                reason, null, false, false,
                 leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer,
-                NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager);
+                NULL_METRICS_FACTORY, childShards, leaseCleanupManager);
     }
 
     private ShutdownTask createShutdownTaskWithNotification(final ShutdownReason reason,
             final List<ChildShard> childShards) {
         return new ShutdownTask(SHARD_INFO, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
-                reason, shutdownNotification, INITIAL_POSITION_TRIM_HORIZON, false, false,
+                reason, shutdownNotification, false, false,
                 leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer,
-                NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager);
+                NULL_METRICS_FACTORY, childShards, leaseCleanupManager);
     }
 }
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java
index 584540875..0b80d2005 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/fanout/FanOutConfigTest.java
@@ -37,13 +37,12 @@
 
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.common.StreamIdentifier;
 import software.amazon.kinesis.leases.ShardInfo;
 import software.amazon.kinesis.leases.exceptions.DependencyException;
 import software.amazon.kinesis.metrics.MetricsFactory;
 import software.amazon.kinesis.retrieval.RetrievalFactory;
 
-import java.util.Optional;
-
 @RunWith(MockitoJUnitRunner.class)
 public class FanOutConfigTest {
 
@@ -51,6 +50,12 @@ public class FanOutConfigTest {
     private static final String TEST_APPLICATION_NAME = "TestApplication";
     private static final String TEST_STREAM_NAME = "TestStream";
     private static final String TEST_CONSUMER_NAME = "TestConsumerName";
+    private static final String TEST_ACCOUNT_ID = "123456789012";
+    private static final long TEST_CREATION_EPOCH = 1234567890L;
+    private static final String TEST_STREAM_IDENTIFIER_SERIALIZATION =
+            String.join(":", TEST_ACCOUNT_ID, TEST_STREAM_NAME, String.valueOf(TEST_CREATION_EPOCH));
+    private static final StreamIdentifier TEST_STREAM_IDENTIFIER =
+            StreamIdentifier.multiStreamInstance(TEST_STREAM_IDENTIFIER_SERIALIZATION);
 
     @Mock
     private FanOutConsumerRegistration consumerRegistration;
@@ -85,23 +90,23 @@ public void testNoRegisterIfConsumerArnSet() {
 
     @Test
     public void testRegisterCalledWhenConsumerArnUnset() throws Exception {
-        getRecordsCache(null);
+        getRecordsCache();
 
         verify(consumerRegistration).getOrCreateStreamConsumerArn();
     }
 
     @Test
     public void testRegisterNotCalledWhenConsumerArnSetInMultiStreamMode() throws Exception {
-        when(streamConfig.consumerArn()).thenReturn("consumerArn");
+        when(streamConfig.consumerArn()).thenReturn(TEST_CONSUMER_ARN);
 
-        getRecordsCache("123456789012:stream:12345");
+        getRecordsCache();
 
         verify(consumerRegistration, never()).getOrCreateStreamConsumerArn();
     }
 
     @Test
     public void testRegisterCalledWhenConsumerArnNotSetInMultiStreamMode() throws Exception {
-        getRecordsCache("123456789012:stream:12345");
+        getRecordsCache();
 
         verify(consumerRegistration).getOrCreateStreamConsumerArn();
     }
@@ -112,7 +117,7 @@ public void testDependencyExceptionInConsumerCreation() throws Exception {
         when(consumerRegistration.getOrCreateStreamConsumerArn()).thenThrow(de);
 
         try {
-            getRecordsCache(null);
+            getRecordsCache();
             Assert.fail("should throw");
         } catch (RuntimeException e) {
             verify(consumerRegistration).getOrCreateStreamConsumerArn();
@@ -122,7 +127,7 @@ public void testDependencyExceptionInConsumerCreation() throws Exception {
 
     @Test
     public void testCreationWithApplicationName() {
-        getRecordsCache(null);
+        getRecordsCache();
 
         assertEquals(TEST_STREAM_NAME, config.streamName());
         assertEquals(TEST_APPLICATION_NAME, config.applicationName());
@@ -134,7 +139,7 @@ public void testCreationWithConsumerName() {
                 // unset common parameters
                 .applicationName(null);
 
-        getRecordsCache(null);
+        getRecordsCache();
 
         assertEquals(TEST_STREAM_NAME, config.streamName());
         assertEquals(TEST_CONSUMER_NAME, config.consumerName());
@@ -144,7 +149,7 @@ public void testCreationWithConsumerName() {
     public void testCreationWithBothConsumerApplication() {
         config = config.consumerName(TEST_CONSUMER_NAME);
 
-        getRecordsCache(null);
+        getRecordsCache();
 
         assertEquals(TEST_STREAM_NAME, config.streamName());
         assertEquals(TEST_CONSUMER_NAME, config.consumerName());
@@ -197,9 +202,10 @@ private void testInvalidState(final String streamName, final String consumerArn)
         }
     }
 
-    private void getRecordsCache(final String streamIdentifer) {
+    private void getRecordsCache() {
         final ShardInfo shardInfo = mock(ShardInfo.class);
-        when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.ofNullable(streamIdentifer));
+        when(shardInfo.streamConfig()).thenReturn(streamConfig);
+        when(streamConfig.streamIdentifier()).thenReturn(TEST_STREAM_IDENTIFIER);
 
         final RetrievalFactory factory = config.retrievalFactory();
         factory.createGetRecordsCache(shardInfo, streamConfig, mock(MetricsFactory.class));