Using SingleStore Kafka Pipeline and Procedure to insert nested Json data to two Tables

I am sending this data to a kafka topic.
{
“id”: 3,
“name”: “abhi”,
“houseName”: “xyz”,
“visitorId”: [
821494,
821551,
844766
]
}
I want this data to go to two different table.
people and visitor.
uqh1 = CONCAT(id + ‘-’ + name + ‘-’ +houseName’)
people {
id, name, housename, uqh1
}
uqh2 = CONCAT(id + ‘-’ + name + ‘-’ +houseName’ + ‘-’ + visitorId)
visitor{
uqh1, uqh2, visitorId
}

I am doing something like this.

CREATE OR REPLACE PIPELINE `visitor_pipeline`
      AS LOAD DATA KAFKA "bootstrap_server/visitor_topic"
      INTO PROCEDURE visitor_migration
      (visitor <- %)
      FORMAT JSON;
DELIMITER //
CREATE OR REPLACE PROCEDURE visitor_migration (batch QUERY(visitor JSON))
AS 
DECLARE uqh1 TEXT;
DECLARE uqh2 TEXT;
BEGIN
   FOR entry IN collect(batch) LOOP
   uqh1 = CONCAT( entry::id, '-', entry::name , '-', entry::housename);
   visitors = entry::visitorId;
   INSERT IGNORE INTO people (
				id, name, housename, unique_hash
			) SELECT entry::id, entry::name, entry::housename,  uqh1;
   FOR i IN 0 .. LENGTH(visitors) - 1 LOOP
        vid = visitors[i];
        uqh2 =  CONCAT(uqh1, '-', vid);
	INSERT IGNORE INTO visitors( uqh1, visitor_id, uqh2) SELECT uqh1, vid, uqh2;
   END LOOP;
 END LOOP;
END // 
DELIMITER;

This is the error I am getting.
A non-scalar value entry is used as a scalar.
I can’t even pass array type in the procedure Batch query as it only accepts scaler data type.
I don’t want to send the data one by one instead of array for visitor. That will be an overload on the system. as I may have to send 1000 to 10000 events instead of one.
How can i achieve this?

Hi anilvivekabhi! :wave: Oh my. I’m so sorry you are dealing with this. Can you share what service you ae running and the version number?

Hi Maria, Thanks for responding.
SingleStore version I am using is 7.6.5.
I didn’t understand, what do you mean by what service I am running.