Multiple avro schemas in MEMSQL pipeline

I’m having a kafka topic with multiple avro schemas and they have no common fields between them. It produces records using both schemas. Is it possible to define multiple schemas in one MemSQL pipeline in order to consume both kinds of records from kafka into my MemSQL tables?

Presumably that Avro is being produced by a Confluent schema registry aware producer? I’m also assuming you’re using a stored procedure to manage inserting into multiple tables, though this response should also be relevant if you’re not.

With schema registry Avro, if a pipeline is asked to extract a nonexistent field from a record, it will by default load NULL instead of throwing an error. You can also explicitly control which value will be loaded when a field is missing, using the DEFAULT clause, as in this example from from
SingleStoreDB Cloud · SingleStore Documentation):

CREATE OR REPLACE PIPELINE p AS LOAD DATA FS "/path/to/files/data.avro"
	INTO TABLE t
  FORMAT AVRO
  SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
    	(id <- %::id,
 	     color <- %::color,
         price <- %::price DEFAULT NULL,
     	 input_record <- %);

NULL will be loaded for each record in which the price field is missing.

This behavior was intended for schema evolution and union handling, but it should work for your case as well. In particular, I’d use the following design:

  • Define your stored procedure’s input columns as the union of the columns among the destination tables, with each one being nullable.
  • Define the pipeline’s field → column mapping clause to extract values for every one of those columns, expect the DEFAULT-ing behavior to kick in for fields not in a given record’s schema.
  • Define the body of the stored procedure to expect that, for a given input row, the fields relevant to one table will all be defined, and the rest will be NULL or whatever the DEFAULT was.

For convenience, you could also likely take advantage of the fact that, if asked to extract a whole Avro record into a column, pipelines will convert that record to JSON. However, I’d be careful to measure the performance of that compared to the “fully structured” union approach I described above, as I’d expect Avro->JSON conversion (and temporary JSON in general) to add noticeable overhead.

Thanks for your response. But I’m using version 7.1.7 so I’m defining my schema in the pipeline rather than making use of schema registry url.

Also If I use schema registry by upgrading my version, how will the pipeline knows which schema to match for a particular record since my kafka topic has two completely different schemas with no relationship between them. And the records are coming in batch with both types of records in one batch.

And what if I don’t want to fill in any default value for the fields of record which are not coming? I’m having two different tables dedicated for each type of record so there would be no relevance for me to fill any default value for all the fields of record that is not coming.

Avro written by a registry-aware producer is written in a Confluent-specific binary format which prepends each encoded record with an id that uniquely identifies that record’s schema. If an id hasn’t previously been seen, the registry server can be queried to get the full text of the schema matching that id. That’s how pipelines know which record has which schema. I’m assuming your data was written in that format.

I don’t think there’s a better option than temporarily filling in default values for fields which don’t exist, if I understand your question correctly. That’s not a particularly expensive thing to do if the default is just NULL. Defaulted columns don’t need to (and shouldn’t) exist in both destination tables, just in the column list of the stored procedure’s input QTV.

On 7.1.7, I think your best option would be to use the TRANSFORM clause to run a python script which uses a schema registry aware Avro reader/writer to convert (via Avro’s schema resolution system) incoming records into instances of a single schema which is the union (in the sense of a literal Avro union type) of the incoming schemas, and then letting that be the schema of the pipeline but otherwise using the same design that I described above.

The behavior of pipelines with Avro unions is largely identical to that of missing fields in multi-schema case: you request to extract values from specific union branches, and if those branches aren’t active in a given record you get NULL or the DEFAULT value, as in the example here: LOAD DATA · SingleStore Documentation.