JSON Parsing Error

I am trying to create a procedure and a pipeline as following.

DELIMITER //

CREATE OR REPLACE PROCEDURE party_proc (q query(msg json))

AS

BEGIN

insert into party_interaction

select msg::$specversion from q;

END //

DELIMITER ;

CREATE or REPLACE PIPELINE party_pip AS LOAD DATA KAFKA ‘gcp:9092/newParty’

CONFIG ‘{“sasl.username”:“xxxxxx”, “sasl.mechanism”:“PLAIN”, “security.protocol”: “SASL_SSL”}’
CREDENTIALS ‘{“sasl.password”:“xxxxx”}’
INTO Procedure party_proc
(msg<- %)
FORMAT JSON;

test pipeline party_pip;

When I test the pipeline i get the following error.

ERROR 2328 ER_REPLAY_CATCHING_UP: Leaf Error (localhost:3306): Leaf Error (127.0.0.1:3307): JSON syntax error at byte 0 of object 1. Error: The document is empty. Contents just before that byte: .

Hi Himawari,
A possible reason is that the Kafka message starts with the byte 0x0 which is not a valid JSON.

Can please look in the message that is filing and check?
You can find Kafka offset of the exact failing message in the information_schema.PIPELINES_ERRORS table: BATCH_SOURCE_PARTITION_ID and BATCH_EARLIEST_OFFSET on the error message. This will give you the failing Kafka and the range of offsets from that partition we were trying to load.
Then you can query the particular offset from Kafka and see the message that failed.

1 Like