Avro - format insertion to singlestore

Hi I am a beginner and trying to do a POC . I’m trying to load avro data from confluent kafka cloud to single store DB but I am unable to do so .

My avro-schema in confluent [cloud ] is as follows

{
“doc”: “Schema for Employee records”,
“fields”: [
{
“doc”: “Employee ID”,
“name”: “emp_id”,
“type”: “int”
},
{
“doc”: “Employee first name”,
“name”: “emp_fname”,
“type”: “string”
},
{
“doc”: “Employee last name”,
“name”: “emp_lname”,
“type”: “string”
}
],
“name”: “Employee”,
“namespace”: “com.example”,
“type”: “record”
}
I am using singlestore localhost to do the following
I wrote a procedure as follows :

delimiter //
CREATE or REPLACE procedure employeedetail(q query(emp_id int, emp_fname text, emp_lname text)) as begin insert into employee
select * from q;
end //
delimiter ;

I wrote a pipeline as follows :

CREATE or REPLACE pipeline p
AS LOAD DATA KAFKA ‘cloud gcp ip/ topic_name’
CONFIG ‘{“sasl.username”: “username”,
“sasl.mechanism”: “PLAIN”,
“security.protocol”: “SASL_SSL”}’
CREDENTIALS ‘{“sasl.password”: “password”}’
into procedure employeedetail format avro SCHEMA REGISTRY ‘cloud url’
(emp_id ← emp_id, emp_fname ← emp_fname, emp_lname ← emp_lname);

When I run : test pipeline p I get the following error

test pipeline p
ERROR 2341 ER_FOR_UPDATE_SUBQUERY: Leaf Error (localhost:3306): Leaf Error (127.0.0.1:3307): Missing Confluent byte in record header. This input is most likely not schema registry aware Avro. See Load Data · SingleStore Documentation for details on the accepted Avro sub-formats.

Kindly help me with this

Screenshot 2023-08-08 122157

Hello, himawari.

Pipeline creation succeeded, so the communication between the Kafka broker and S2 pipelines was successful. The problem is with the data itself.

This error message suggests that the data produced to the Kafka topic isn’t in the format expected with the confluent schema registry protocol. How was the data produced for the Kafka topic? If it’s Avro-encoded data, did you use a confluent schema registry?

Giving more details, version number byte isn’t found in the message, so most likely, it’s a different data format.
https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
We expect one byte(format version number), schema ID, serialized data as per the confluent schema registry protocol.

Hi, I resolved the error by mentioning the schema registry username and password in the credential feild. Thanks