Hi,
Is there a way to ignore empty messages in a kafka pipeline? We’re getting NULLs from a flink upsert-kafka sink.
BR
Is this a CSV pipeline? Those support a SKIP PARSER ERRORS clause which will skip input lines with an unexpected number of columns, logging a warning in pipelines_errors. Alternatively, if there’s a column which is always non-NULL in valid messages, you could use create pipeline ... where col is not null
to filter invalid rows from invalid messages.
If it’s not a CSV pipeline (FORMAT JSON, etc.), empty messages should be ignored just because they result in no extra input being fed to our parsers.
Hi,
No, it’s a JSON pipeline. We get these errors.
The pipeline definition is:
CREATE OR REPLACE PIPELINE test_flink_pipeline
AS LOAD DATA KAFKA ‘kafka:9092/as-test-output-flink-topic’
BATCH_INTERVAL 2500
SKIP PARSER ERRORS
INTO PROCEDURE test_flink_insert_json
(I know now that skip errors does not work with JSON).
And the stored procedure:
CREATE or REPLACE PROCEDURE test_flink_insert_json(message query(data json) NULL)
AS
BEGIN
INSERT INTO test_flink (account_id, user_id, device_id, time, zapatos, naranjas)
select
data::$account_id,
data::$user_id,
data::$device_id,
data::$time28,
data::zapatos,
data::$naranjas
from message;
END
When the kafka message is not NULL, it works fine.
I’d say that’s a CSV pipeline being fed JSON as input. I mean “CSV pipeline” in the sense of the FORMAT
clause of CREATE PIPELINE
. With no explicit FORMAT clause, we parse the input as CSV (and will also happen to treat empty messages slightly differently). That’s generally problematic for JSON input, for more reasons than the empty message behavior.
Try FORMAT JSON
, as described here: LOAD DATA · SingleStore Documentation.
In your case, you’d want:
create or replace pipeline
…
into table test_flink
(
account_id ← data::account_id default NULL,
user_id ← data::user_id default NULL,
…
)
format json
Note that it also means that you can omit the stored procedure.
As to why SKIP PARSER ERRORS didn’t work there: I’d guess that it’s because that pipeline expects to have one CSV column and empty messages will indeed appear to have one empty CSV column.
you can always try to edit the stored procedure
and add
…
from message
where data::$account_id is not null
or something similar.
but judging from the error message itself, it seems like the json structure itself as it comes from kafka, is flawed.
so you should check what the message looks like on kafka when it generates errors
cool ways to test pipelines:
test pipeline_name; - this only reads the data from kafka and shows it to you
start pipeline_name foreground limit 1 batches; - this executes the whole deal once. (reading from kafka, then applying to the stored procedure)
hope this helps