Debezium - Kafka - Memsql Pipeline - Schema change

Hi,
We’re looking into using Memsql as an Audit system, leverage CDC with debezium, stream changes into Kafka and convert json on the fly into a table. We’ve done a POC and we have the concept working.
Question: (a new column is added to a table in Mysql for example)
What happen to MemSQL pipeline when a schema change is done in MySQL, which Kafka will detect?
How can the change (new column) be implemented into MemSQL? Does the pipeline need to be dropped and recreated? If transform is used, python script needs to be updated I presume.

Depending on how you have mapped your Debezium JSON into the DB you could go through the transform and execute the alter table to add new columns by having a Python connection to the Master Agg and then push your JSON to a stored procedure to execute the insert, update or delete statement.

So flow is to only map the before JSON, After JSON, Source JSON and Operation in the pipeline, have a transform only interrogate for schema changes and perform alters and then have the SP perform the load.

This process does push loading data up to the aggregator so performance is slower than a straight pipeline load.

Here is a sample Stored Procedure that does the apply from Debzium.

CREATE OR REPLACE PROCEDURE handle_kafka_cdc(batch query(before json, after json, source json, op text)) as
DECLARE
v_table text;
v_key text;
v_row json;
v_before_string text;
delete_sql text;
insert_sql text;
tmp text;
arr ARRAY(record(cbefore json, cafter json, source json, op varchar(5)));
nvp_array array(text);
nvp text;
i int;
v_equal int;
v_len int;
insert_columns text;
insert_values text;
v_set_string text;
v_update_string text;
v_insert_string text;
v_op char(1);

v_json_select              	  	query( cbefore json, cafter json, source json, op varchar(5));

v_stmt_text                  	text;
v_query_text                 	query( t text);
v_current_db					text;
v_error_msg						text;
v_delete_string					text;
v_select_database query(q_current_db varchar(64)) = select database();
v_select_delete_string query(q_delete_string text);

BEGIN

for TESTING ONLY

#v_json_select = select cdata::before,cdata::after,cdata::source,cdata::op from cdcdata;

#arr = collect(v_json_select);
arr = COLLECT(batch);

FOR x in arr LOOP
v_table = x.source::blush:table;

v_op = substring(x.op,2,2);
 	IF v_op = 'd' THEN
    	 
		BEGIN
		#Build delete string
		v_delete_string = REPLACE(
			REPLACE(
			REPLACE(
			REPLACE(x.cbefore,
			'{"' ,' '),
			'":','='),
			',"',' and '),
			'}',' ');


  			EXCEPTION
			when ER_SCALAR_BUILTIN_NO_ROWS or others then
			rollback;
			v_error_msg = exception_message();
			v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
			echo select v_error_msg as "error";
			raise;
		END;

		Begin
		# Execute delete from table
			v_delete_string = Replace(v_delete_string,'=null',' is null');
     	 	v_stmt_text = CONCAT('DELETE FROM ',v_table,' where ',v_delete_string);
			execute immediate v_stmt_text;

  			EXCEPTION
			when ER_SCALAR_BUILTIN_NO_ROWS or others then
			rollback;
			v_error_msg = exception_message();
			v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
			echo select v_error_msg as "error";
			raise;
		END;
	
	elsif v_op = 'u' THEN
		BEGIN
		#Build NVP string of after object
		v_set_string = REPLACE(
			REPLACE(
			REPLACE(
			REPLACE(x.cafter,
			'{"' ,' '),
			'":','='),
			',"',' , '),
			'}',' ');


  			EXCEPTION
			when ER_SCALAR_BUILTIN_NO_ROWS or others then
			rollback;
			v_error_msg = exception_message();
			v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
			echo select v_error_msg as "error";
			raise;
		END;
	
		BEGIN
		#Build NVP string of before object
		v_update_string = REPLACE(
			REPLACE(
			REPLACE(
			REPLACE(x.cbefore,
			'{"' ,' '),
			'":','='),
			',"',' and '),
			'}',' ');


  			EXCEPTION
			when ER_SCALAR_BUILTIN_NO_ROWS or others then
			rollback;
			v_error_msg = exception_message();
			v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
			echo select v_error_msg as "error";
			raise;
		END;
		
		BEGIN
    	  v_update_string = Replace(v_update_string,'=null',' is null');
        	v_stmt_text = concat('update ',v_table,' set ',v_set_string,' where ', v_update_string);
			execute immediate v_stmt_text;

			EXCEPTION
				WHEN OTHERS THEN
				rollback;
				v_error_msg = exception_message();
				v_error_msg = v_error_msg || '- EXCEPTION: error building update columns string: ';
				echo select v_error_msg as "error";
				raise;
		END;

	ELSE

		BEGIN
		#Build NVP string of after object
		v_insert_string = REPLACE(
			REPLACE(
			REPLACE(
			REPLACE(x.cafter,
			'{"' ,' '),
			'":','='),
			',"',' and '),
			'}',' ');

  			EXCEPTION
			when ER_SCALAR_BUILTIN_NO_ROWS or others then
			rollback;
			v_error_msg = exception_message();
			v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
			echo select v_error_msg as "error";
			raise;
		END;
		BEGIN
		#Build array of name value pairs
    	nvp_array = split(v_insert_string, "and");

		EXCEPTION
			when ER_SCALAR_BUILTIN_NO_ROWS or others then
			rollback;
			v_error_msg = exception_message();
			v_error_msg = v_error_msg || '- EXCEPTION: no delete string set: ';
			echo select v_error_msg as "error";
			raise;
		END;

		BEGIN
		#build replace columns string and columns values
		i=0;
		insert_columns = ' ';
		insert_values = ' ';
		FOR y in nvp_array LOOP
			v_key = nvp_array[i];
			v_equal = instr(v_key,'=');
			v_len = length(v_key);
			insert_columns = Concat(insert_columns,substring(v_key,1,v_equal-1),',');
			insert_values = Concat(insert_values,substring(v_key,v_equal+1,v_len),',');
			i=i+1;
		END LOOP;
		v_len = length(insert_columns);	
		insert_columns = substring(insert_columns,1,v_len-1);

		v_len = length(insert_values);
		insert_values = substring(insert_values,1,v_len-1);

		END;
		
		BEGIN
    	  
     	v_stmt_text = concat('INSERT INTO ',v_table,' (',insert_columns,') values (', insert_values,')');
		execute immediate v_stmt_text;

		EXCEPTION
			WHEN OTHERS THEN
			rollback;
			v_error_msg = exception_message();
			v_error_msg = v_error_msg || '- EXCEPTION: error building insert columns string: ';
			echo select v_error_msg as "error";
		  	INSERT INTO errors VALUES(v_error_msg,current_timestamp());
			raise;
		END;

	END IF;

END LOOP;

END$$
DELIMITER ;