Sink connector looking for topic -value schema instead of record name
Below is the Payload for JDBC Sink Connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=Subs_V1
behavior.on.null.values=ignore
connection.password=***********
topics=Subscription
task.max=1
batch.size=500
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
connection.user=admin
name=sink-jdbc-connector-Subs
errors.tolerance=all
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:sqlserver://MsSql-Instanse:dbName.
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
pk.mode=record_value
pk.fields=id
Got logs of schema registry like this:
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-01-19 08:24:35,474] INFO 127.0.0.1 - - [19/Jan/2022:08:24:35 +0000] "POST /subjects/Subscription-value?deleted=true HTTP/1.1" 404 96 1 (io.confluent.rest-utils.requests:62)
[2022-01-19 08:24:35,475] INFO Schema lookup under subject Subscription-value, deleted true, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource:91)
[2022-01-19 08:24:35,475] ERROR Request Failed with exception (io.confluent.rest.exceptions.DebuggableExceptionMapper:62)
io.confluent.rest.exceptions.RestNotFoundException: Subject 'Subscription-value' not found.
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:69)
at io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource.lookUpSchemaUnderSubject(SubjectsResource.java:105)
at jdk.internal.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
Please let me know how can I point my jdbs sink connector to read Subscription.Value file instead of Subscription-value
My connector is showing this, it reads the record but because of not found schema it is not sending records.
Solution 1:
By default, any Avro producer should have created that -value subject. Otherwise, you need to match its serializer record strategy. For example, you need to set
value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
Also, ideally your keys are not Avro, but rather simple types like strings or ints