@@ -12,12 +12,14 @@ mod cmd {
12
12
use std:: path:: PathBuf ;
13
13
14
14
use async_trait:: async_trait;
15
+ use fluvio_sc_schema:: partition:: PartitionMirrorConfig ;
16
+ use fluvio_sc_schema:: topic:: { MirrorConfig , PartitionMap , ReplicaSpec , TopicSpec } ;
15
17
#[ cfg( feature = "producer-file-io" ) ]
16
18
use futures:: future:: join_all;
17
19
use clap:: Parser ;
18
20
use tracing:: { error, warn} ;
19
21
use humantime:: parse_duration;
20
- use anyhow:: Result ;
22
+ use anyhow:: { bail , Result } ;
21
23
22
24
use fluvio:: {
23
25
Compression , Fluvio , FluvioError , TopicProducerPool , TopicProducerConfigBuilder , RecordKey ,
@@ -173,8 +175,12 @@ mod cmd {
173
175
pub transforms_line : Vec < String > ,
174
176
175
177
/// Partition id
176
- #[ arg( short = 'p' , long, value_name = "integer" ) ]
178
+ #[ arg( short = 'p' , long, value_name = "integer" , conflicts_with = "mirror" ) ]
177
179
pub partition : Option < PartitionId > ,
180
+
181
+ /// Remote cluster to consume from
182
+ #[ arg( short = 'm' , long, conflicts_with = "partition" ) ]
183
+ pub mirror : Option < String > ,
178
184
}
179
185
180
186
fn validate_key_separator ( separator : & str ) -> std:: result:: Result < String , String > {
@@ -247,6 +253,46 @@ mod cmd {
247
253
let config_builder =
248
254
config_builder. smartmodules ( self . smartmodule_invocations ( initial_param) ?) ;
249
255
256
+ let config_builder = if let Some ( mirror) = & self . mirror {
257
+ let admin = fluvio. admin ( ) . await ;
258
+ let topics = admin. all :: < TopicSpec > ( ) . await ?;
259
+ let partition = topics. into_iter ( ) . find_map ( |t| match t. spec . replicas ( ) {
260
+ ReplicaSpec :: Mirror ( MirrorConfig :: Home ( home_mirror_config) ) => {
261
+ let partitions_maps =
262
+ Vec :: < PartitionMap > :: from ( home_mirror_config. as_partition_maps ( ) ) ;
263
+ partitions_maps. iter ( ) . find_map ( |p| {
264
+ if let Some ( PartitionMirrorConfig :: Home ( remote) ) = & p. mirror {
265
+ if remote. remote_cluster == * mirror && remote. source {
266
+ return Some ( p. id ) ;
267
+ }
268
+ }
269
+ None
270
+ } )
271
+ }
272
+ ReplicaSpec :: Mirror ( MirrorConfig :: Remote ( remote_mirror_config) ) => {
273
+ let partitions_maps =
274
+ Vec :: < PartitionMap > :: from ( remote_mirror_config. as_partition_maps ( ) ) ;
275
+ partitions_maps. iter ( ) . find_map ( |p| {
276
+ if let Some ( PartitionMirrorConfig :: Remote ( remote) ) = & p. mirror {
277
+ if remote. home_cluster == * mirror && remote. target {
278
+ return Some ( p. id ) ;
279
+ }
280
+ }
281
+ None
282
+ } )
283
+ }
284
+ _ => None ,
285
+ } ) ;
286
+
287
+ if let Some ( partition) = partition {
288
+ config_builder. set_specific_partitioner ( partition)
289
+ } else {
290
+ bail ! ( "No partition found for mirror '{}'" , mirror) ;
291
+ }
292
+ } else {
293
+ config_builder
294
+ } ;
295
+
250
296
let config_builder = if let Some ( partition) = self . partition {
251
297
config_builder. set_specific_partitioner ( partition)
252
298
} else {
0 commit comments