Creating a Kafka pipeline

Hello guys,

I’m trying to create a Kafka pipeline which will consume a Json from a Kafka topic but I’m having a problem when inserting elements from a list into single store db. Below is my Json:

image

I have 2 tables, Customer and Characteristic.
The information that is being saved in the Customer table is the id and name. In the Characteristic it’s only being saved the object with id=3 (which is the last one from the list)

Below is my procedure and pipeline:

image

Can you help me out please?

Hello @Carlao, thanks for trying out JSON pipelines. Since characteristic is a JSON array, you need explicit reference to each positional element. Consider this pipeline for reference:

the JSON data is of this form {"name":"metric1","values":[1612396800, "1.0"]}

 CREATE PIPELINE `metrics`
AS LOAD DATA S3 '...'
INTO TABLE `t`
FORMAT JSON
(
    `t`.`name` <- `name`,
    @val <- `values` DEFAULT    null,
    `t`.`data` <- %
)
SET
    `ts` = json_extract_bigint(@val, 0),
    `int_val` = json_extract_bigint(@val, 1)

Of course, if you have a variable number of elements in the JSON array, you would need to invoke the pipeline into a stored procedure, like you are doing in your example, convert JSON array into an array of JSON objects, and iterate over that. There is a really good example for that contained in our docs here: SingleStoreDB Cloud · SingleStore Documentation

DELIMITER //
CREATE OR REPLACE FUNCTION array_as_string(a ARRAY(JSON) NULL)
    RETURNS VARCHAR(255) AS
    DECLARE result VARCHAR(255);
    BEGIN
        IF isnull(a) THEN
            result = "NULL";
        ELSE
            result = "Values: [";
            FOR i IN 0 .. LENGTH(a) - 1 LOOP
                IF i < LENGTH(a) - 1 THEN
                    result = CONCAT(result, a[i], ", ");
                ELSE
                    result = CONCAT(result, a[i], "");
                END IF;
            END LOOP;
        END IF;
    RETURN CONCAT(result, "]");
END //

-- Regular string split with delimiter (does not trim)
CREATE OR REPLACE FUNCTION udf_json_to_array(js JSON NULL)
    RETURNS VARCHAR(255) AS
    DECLARE
        jsonArray array(JSON) NULL;
        result VARCHAR(255);
    BEGIN
        jsonArray = JSON_TO_ARRAY(js);
        result = array_as_string(jsonArray);
        return result;
    END //

DELIMITER ;

Hello @m_k,

Yesterday I came up with a solution and it’s using exactly what you mention. Thank you so much for your help :slight_smile:

1 Like

By the way this doesn’t work (highlighted):
for i in 0 … length(v_customer_characteristic_array) - 1 loop
insert into CHARACTERISTIC (characteristic_id, characteristic_name, value_type, characteristic_type, customer_id)
values (v_characteristic[i]::$id, v_characteristic[i]::$name, v_characteristic[i]::$valueType, v_characteristic[i]:: $@type, v_customer_id);

In order to work I need to create a new var:
v_characteristic = v_customer_characteristic_array[i];

and then we can replace in values:
values (v_characteristic::$id, v_characteristic::$name, v_characteristic::$valueType, v_characteristic::blush:@type, v_customer_id);