-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added kafka spring binder #3123
base: main
Are you sure you want to change the base?
Added kafka spring binder #3123
Conversation
.../org/locationtech/geomesa/spring/binder/kafka/datastore/KafkaDatastoreBinderProvisioner.java
Outdated
Show resolved
Hide resolved
try { | ||
var sft = SimpleFeatureTypeLoader.sftForName(sfName); | ||
if (sft.isDefined()) { | ||
ds.createSchema(sft.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you might want to verify here that the sft matches any existing schema in the store - it's a bit tricky to do that well though :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought createSchema created it if it was missing and did nothing if it already existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, but you can have issues if the schema exists in the store, but it doesn't match the one loaded by the SimpleFeatureTypeLoader. maybe in this case it doesn't actually matter...
if (sft.isDefined()) { | ||
ds.createSchema(sft.get()); | ||
} else { | ||
logger.warn("Could not find a local version of {}, hoping the KDS is already defined...", sfName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could call ds.getSchema to verify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added check.
private final DataStore ds; | ||
FeatureWriter<SimpleFeatureType, SimpleFeature> writer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a close lifecycle where you can dispose/close these?
SimpleFeatureStore fs = null; | ||
while (fs == null) { | ||
try { | ||
this.ds = dsFactory.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should dispose of this ds if we fail getting the feature source - i think you had trouble getting the metadata to refresh, hence the need to get a new ds each time, is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct.
public void doStart() { | ||
SimpleFeatureStore fs = getSimpleFeatureStore(); | ||
|
||
fs.addFeatureListener(featureEvent -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so feature listeners don't guarantee at-least-once delivery - if you want that, you need to use a GeoMessageProcessor with a stable group id: https://github.com/locationtech/geomesa/blob/main/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala#L124
I'm realizing that method is not in the docs... I'll open a ticket to add it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
geomesa-spring/pom.xml
Outdated
<artifactId>geomesa-spring</artifactId> | ||
<packaging>pom</packaging> | ||
<modules> | ||
<module>geomesa-spring-cloud-stream-binder-kafka-datastore</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think i'd prefer to move this module to geomesa-kafka/geomesa-kafka-spring-cloud-stream(-binder?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved!
9d9cf2e
to
a8ecb36
Compare
No description provided.