Schema registry shows look up failed with memsql pipeline

I’m trying to read data into memsql pipeline using confluent kafka schema registry as shown in command below:

CREATE or replace pipeline test 
AS LOAD DATA KAFKA '#host#:port/topic'
CONFIG '{"security.protocol": "",
"sasl.mechanism": "",
"ssl.certificate.location": "",
"ssl.key.location": "",
"ssl.ca.location": "",
"sasl.username": ""}'
CREDENTIALS '{"sasl.password": ""}'
INTO table tablename
FORMAT avro
SCHEMA REGISTRY "#host#:port"
(id <- id);

But I’m encountering an error as mentioned below:

Failed to lookup or compile schema id 12 of record 1 from registry. Forwarded error : “REST request failed (code-1) : HTTP request failed: Peer certificate cannot be authenticated with given CA certificates”.

Any help would be appreciated.

Hello! Can you please provide some more details about your environment? What version of SingleStore are you using? Where are Confluent Kafka and SingleStore running?

Hi, Thanks for the response! We are having 7.3.9 Singlestore version installed in our server. Both the confluent kafka and Memsql are installed on the same server. Also we have configured all the necessary security and client certificates as well.

Hello,

It looks like the configuration fields used are incorrect, our documentation was corrected last week.

The command should be similar to

CREATE or replace pipeline test 
AS LOAD DATA KAFKA '#host#:port/topic'
CONFIG '{"security.protocol": "",
"sasl.mechanism": "",
"schema.registry.ssl.certificate.location": "",
"schema.registry.ssl.key.location": "",
"schema.registry.ssl.ca.location": "",
"sasl.username": ""}'
CREDENTIALS '{"sasl.password": ""}'
INTO table tablename
FORMAT avro
SCHEMA REGISTRY "#host#:port"
(id <- id);
1 Like

Hello,Thanks for the update! But now If I’m changing ssl.ca.location field name to schema.registry.ssl.ca.location , it is saying ssl.ca.location is missing. Do we also need to make some changes in the configurations of our certificates?

Sorry about that. To clarify,

"security.protocol","sasl.mechanism", "sasl.username","sasl.password", "ssl.certificate.location", "ssl.key.location", "ssl.ca.location"

refer to the configurations used for the kafka connection
where as

"schema.registry.ssl.certificate.location", "schema.registry.ssl.key.location", "schema.registry.ssl.ca.location", "schema.registry.ssl.key.password"

refer to the configurations used for the schema registry connection. Both are needed because there is no guarantee that kafka and the schema registry are using the same certificate or trust the same client key and certificate used to communicate.
For your case where it looks like you are using SASL to communicate with kafka and SSL to communicate with the schema registry, you should include the configurations for both such as

CREATE or replace pipeline test 
AS LOAD DATA KAFKA '#host#:port/topic'
CONFIG '{"security.protocol": "",
"sasl.mechanism": "",
"ssl.certificate.location": "",
"ssl.key.location": "",
"ssl.ca.location": "",
"sasl.username": "",
"schema.registry.ssl.certificate.location": "",
"schema.registry.ssl.key.location": "",
"schema.registry.ssl.ca.location": ""}'
CREDENTIALS '{"sasl.password": ""}'
INTO table tablename
FORMAT avro
SCHEMA REGISTRY "#host#:port"
(id <- id);

On our end we are going to add a documentation page to explain what each CONFIG and CREDENTIALS field is used for to make this more clear.

1 Like

The database can work on multiple, simultaneous streams, and while adhering to exactly-once!

Thanks a lot, Bremy. It worked!! Yes, a clear documentation about this would be great!!