How memsql sharding Data when we ingest data from Kafka topic

I read below mentioned steps in memsql docs regarding kafka pipeline and its saying that system will auto distribute data based upon Kafka partition so my question is this if i provide Shard key so how data would be distributed in memsql side?

Following steps occur when you start a Kafka pipeline:

  1. The MemSQL cluster’s master aggregator connects to the Kafka lead broker and requests metadata about the Kafka cluster. This metadata includes information about the Kafka cluster’s brokers, topics, and partitions.
  2. The master aggregator parses the metadata and learns that there are four partitions spread across two Kafka brokers. The master aggregator decides how to process Kafka topics, which are groups of partitions.
  3. The master aggregator assigns leaf node partitions to Kafka partitions and sets the leaf nodes’ configuration. One important configuration detail is the maximum number of offsets to read per batch. Once configured, each leaf node in the cluster effectively becomes a Kafka consumer. At a lower level, each partition in a leaf node is paired with a partition in a Kafka broker.
  4. Once a leaf node’s partitions have been paired with Kafka partitions, each leaf node in the cluster begins extracting data directly from the Kafka brokers. The leaf nodes individually manage which message offsets have been read from a given Kafka partition.
  5. Offsets are ingested in batches, and the maximum number per batch is specified in the engine variables. When an extracted batch has successfully read its offset data, the batch is then optionally transformed and finally loaded into the destination table.

You have it almost exactly correct. In step 5, the data is staged in memory, and then memsql cluster performs an insert-select to the destination. With shard keys, the leaves reshuffle the data for you. If there are no shard keys, the data stays in the partition which downloaded it from Kafka.

Thank you for prompt response…

1 Like

We plan to use kafka pipeline to insert data to columnar table with UNIQUE KEY constraint.

What is the recommended way to configure kafka partition key/memsql shard key for this use-case?

does it mean that a leaf can transmit data directly to another leaf after it realize that a kafka record belongs to other shard(leaf) after the shuffle stage? or it sends it through the aggregators?

say we have 10 partitions in kafka, a memsql table sharded by client_id and 10 leafs in the cluster.
say the keys(client_id ) are distributed as a round robin a cross the 10 kafka partitions,
so now each leaf consumes 1 partition with data that only some of it belongs to him, but the rest of it will have to get to the rest of the 9 leaf, how does it handle this?

Welcome to our forums @yuvalgut!

You are correct; each leaf independentally reads ranges from kafka partitions and then performs a distributed reshuffle without going through the aggregators. This is able to take advantage of the same reshuffle logic that we use for running large scale distributed joins and other similar operations and is one of the most unique and powerful features of SingleStore Pipelines.

In addition to this feature, SingleStore Pipelines also guarantees exactly once semantics (a single record in kafka will be committed exactly once into SingleStore) via committing offset metadata along with the distributed read from Kafka.

Thanks for the fast reply!
any chance there is any plan regarding pulsar extractor?

Not currently, see the replies in Using SingleStore with Apache Pulsar for more information.

thanks again!

what is the performance penalty of extractor vs going through aggregators?
will I have to deploy as many aggregators as leaf node in order to compare the ingestion rate?

Regarding performance and multiple aggs, please see my answer in the other thread Using SingleStore with Apache Pulsar