Confluent Kafka Avro pipeline "libavro-c++ failed to parse record 1 from stream"

PLEASE HELP - i would love to use MEMSQL, but can’t get something this simple to work!
i’m simply trying to get the entire JSON of the AVRO msg and it won’t even do that
(based on SingleStoreDB Cloud · SingleStore Documentation)

trying to run a pipeline
12:13:56: Running: start pipeline pl_FutsFillsAvro foreground

getting this error:
12:13:56: ERROR 2339 ER_AVRO_BAD_DATAFILE_CONTENTS: Leaf Error (127.0.0.1:3307): Leaf Error (127.0.0.1:3307): libavro-c++ failed to parse record 1 from stream. Forwarded error:“EOF reached prematurely in skip().”.

DETAILS:

create a table

create table FutsFillsAvro (avroMsg text);

create a pipeline
note: that SCHEMA is copied directly from Confluent’s CtrlPanel

create or replace pipeline pl_FutsFillsAvro
as load data KAFKA ‘10.11.102.120/fixinc.fill.cmestp’
into table FutsFillsAvro
format avro
(avroMsg ← %)
SCHEMA
‘{
“type” : “record”,
“name” : “fixinc.fill.cmestp”,
“fields” : [
{ “name” : “tradeReportTransType” , “type” : “string” },
{ “name” : “sendingTime” , “type” : “string” },
{ “name” : “transactTime” , “type” : “string” },
{ “name” : “tradeDate” , “type” : “string” },
{ “name” : “side” , “type” : “string” },
{ “name” : “lastQuantity” , “type” : “double” },
{ “name” : “lastPrice” , “type” : “double” },
{ “name” : “account” , “type” : “string” },
{ “name” : “securityExchange” , “type” : “string” },
{ “name” : “securityId” , “type” : “string” },
{ “name” : “symbol” , “type” : “string” },
{ “name” : “maturityMonthYear” , “type” : “string” },
{ “name” : “securityType” , “type” : “string” },
{ “name” : “putOrCall” , “type” : “string” },
{ “name” : “strike” , “type” : “double” }
]
}’;

same error when attempting to do what i’m actually trying to do:


create table FutsFillsAvro
(
tradeReportTransType  varchar(20), 
sendingTime  varchar(40), 
transactTime  varchar(40), 
tradeDate  varchar(40), 
side  varchar(20), 
lastQuantity  double, 
lastPrice  double, 
account  varchar(20), 
securityExchange  varchar(20), 
securityId  varchar(20), 
symbol  varchar(20), 
maturityMonthYear  varchar(20), 
securityType  varchar(20), 
putOrCall  varchar(10), 
strike  varchar(20)
);


create or replace pipeline pl_FutsFillsAvro
as load data KAFKA '10.11.102.120/fixinc.fill.cmestp'
into table FutsFillsAvro
format Avro
(
`FutsFillsAvro`.`tradeReportTransType` <- `tradeReportTransType`,
`FutsFillsAvro`.`sendingTime` <- `sendingTime`,
`FutsFillsAvro`.`transactTime` <- `transactTime`,
`FutsFillsAvro`.`tradeDate` <- `tradeDate`,
`FutsFillsAvro`.`side` <- `side`,
`FutsFillsAvro`.`lastQuantity` <- `lastQuantity`,
`FutsFillsAvro`.`lastPrice` <- `lastPrice`,
`FutsFillsAvro`.`account` <- `account`,
`FutsFillsAvro`.`securityExchange` <- `securityExchange`,
`FutsFillsAvro`.`securityId` <- `securityId`,
`FutsFillsAvro`.`symbol` <- `symbol`,
`FutsFillsAvro`.`maturityMonthYear` <- `maturityMonthYear`,
`FutsFillsAvro`.`securityType` <- `securityType`,
`FutsFillsAvro`.`putOrCall` <- `putOrCall`,
`FutsFillsAvro`.`strike` <- `strike`
)
SCHEMA
'{
    "type": "record",
    "name": "ExecutionReport",
    "namespace": "co.dvtrading.conduit.fixinc.model",
    "fields": [
        {
        "name": "tradeReportTransType",
        "type": "string"
        },
        {
        "name": "sendingTime",
        "type": "string"
        },
        {
        "name": "transactTime",
        "type": "string"
        },
        {
        "name": "tradeDate",
        "type": "string"
        },
        {
        "name": "side",
        "type": "string"
        },
        {
        "name": "lastQuantity",
        "type": "double"
        },
        {
        "name": "lastPrice",
        "type": "double"
        },
        {
        "name": "account",
        "type": "string"
        },
        {
        "name": "securityExchange",
        "type": "string"
        },
        {
        "name": "securityId",
        "type": "string"
        },
        {
        "name": "symbol",
        "type": "string"
        },
        {
        "name": "maturityMonthYear",
        "type": "string"
        },
        {
        "name": "securityType",
        "type": "string"
        },
        {
        "name": "putOrCall",
        "type": "string"
        },
        {
        "name": "strike",
        "type": "string"
        }
    ]
}';


start  pipeline pl_FutsFillsAvro foreground;

The issue is likely that the data doesn’t quite match the provided SCHEMA clause because Confluent Kafka’s Avro producers add a 5 byte header to every record. This header holds the record’s schema id and is used for schema registry lookups. It’s documented here:

https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format

You can tell MemSQL to expect and ignore the header by modelling it as an Avro field in the SCHEMA clause. You can add it as a size 5 fixed-len-array type field in the original top-level record or as a field in a wrapper record around the original schema.

For example, you could replace example_record in the schema below with your current schema:

CREATE PIPELINE P
AS LOAD DATA KAFKA "..."
INTO TABLE t
FORMAT AVRO
(a <- data::field1)
SCHEMA
'
{
  "type": "record",
  "name": "confluent_kafka_wrapper",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "fixed",
        "size": 5,
        "name": "unused_confluent_metadata"
      }
    },
    {
      "name": "data",
      "type": {
        "type": "record",
        "name": "example_record",
        "fields": [
          {
            "name": "field1",
            "type": "long"
          }
        ]
      }
    }
  ]
}
'

We’ll be adding a note about this to our documentation.

Note that this workaround assumes that the schema of incoming data is in fact fixed over the lifetime of the pipeline. Do you plan to use Confluent’s schema registry/evolution features?

Thank you for the reply and the details. As far as your question regarding the schemer registry, What impacts would there be should I choose to use it or not?

The cost of ingesting registry-aware Avro with a never-changing schema (e.g. because the confluent libraries are otherwise convenient to use) is just the cost of transferring and skipping past those extra 5 bytes per record, which is likely negligible.

But if the schema ever actually evolves, the pipeline will fail to parse because it expects input data to match the SCHEMA clause. MemSQL doesn’t currently have special schema registry integration. Handling evolution automatically and/or without downtime would be quite hard - especially considering that the destination table would likely have to evolve too - though likely still possible with a mix of pipelines transforms and external scripts.

1 Like

@sasha , sorry but I am still unable to understand the confluent_kafka_wrapper in your solution schema.Can you explain the solution a bit more?