Sink connector looking for topic -value schema instead of record name

Below is the Payload for JDBC Sink Connector

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=Subs_V1
behavior.on.null.values=ignore
connection.password=***********
topics=Subscription
task.max=1
batch.size=500
value.converter.schema.registry.url=http://localhost:8081
auto.evolve=true
connection.user=admin
name=sink-jdbc-connector-Subs
errors.tolerance=all
auto.create=true
value.converter=io.confluent.connect.avro.AvroConverter
connection.url=jdbc:sqlserver://MsSql-Instanse:dbName.    
insert.mode=upsert
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
pk.mode=record_value
pk.fields=id

Got logs of schema registry like this:

at java.base/java.lang.Thread.run(Thread.java:829)
[2022-01-19 08:24:35,474] INFO 127.0.0.1 - - [19/Jan/2022:08:24:35 +0000] "POST /subjects/Subscription-value?deleted=true HTTP/1.1" 404 96  1 (io.confluent.rest-utils.requests:62)
[2022-01-19 08:24:35,475] INFO Schema lookup under subject Subscription-value, deleted true, type null (io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource:91)
[2022-01-19 08:24:35,475] ERROR Request Failed with exception  (io.confluent.rest.exceptions.DebuggableExceptionMapper:62)
io.confluent.rest.exceptions.RestNotFoundException: Subject 'Subscription-value' not found.
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:69)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource.lookUpSchemaUnderSubject(SubjectsResource.java:105)
    at jdk.internal.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at 

Please let me know how can I point my jdbs sink connector to read Subscription.Value file instead of Subscription-value

My connector is showing this, it reads the record but because of not found schema it is not sending records.

enter image description here


Solution 1:

By default, any Avro producer should have created that -value subject. Otherwise, you need to match its serializer record strategy. For example, you need to set

value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

Also, ideally your keys are not Avro, but rather simple types like strings or ints