Kafka PipeLine To Proc

Hi All,

I am trying to configure a kafka pipe line to pass data to a stored proc. A standard load into a table works however, for some reason passing it to the proc fails.

My Proc creation is

Create or replace PROCEDURE proc_sale_trans(batch query(sale_transaction json))
AS
BEGIN
INSERT INTO sale_trans(transaction_id, location_id)
SELECT sale_transaction::Transaction_id, sale_transaction::Location_id
FROM batch;
END

with the pipeline creation as;
CREATE or replace PIPELINE sale_trans
AS LOAD DATA KAFKA ‘kafka/paul_sales’
INTO PROCEDURE proc_sale_trans
FIELDS TERMINATED BY ‘,’;

when i test the pipeline, i get the following error
ERROR 1262 ER_WARN_TOO_MANY_RECORDS: Leaf Error (127.0.0.1:3306): Leaf Error (127.0.0.1:3307): Row 1 was truncated; it contained more data than there were input columns

Regards,

Paul

Was the successful query identical, except with into table sale_trans in place of into procedure proc_sale_trans? If so, then you likely want to change the type of batch to:

query(transaction_id int, sale_transaction int))

The error here is that there are more fields in the input CSV than MemSQL expects. In particular, when not explicitly specified, we expect exactly as many fields as there are columns in the destination table or (as in the case) stored procedure QTV parameter.

So the formulation with query(sale_transaction json) will expect 1 field per line, and will try to interpret the value of that field as JSON. I suspect it doesn’t actually look like that, hence my suggestion above.

But if the file really is newline-separated JSON, then I’d suspect the issue has to do with commas inside the JSON appearing as field terminators. Rather than try to work around that, I’d recommend just using a FORMAT JSON pipeline.

Hi @sasha,

Yes, the successful query was identical to except i used the into table clause. I’ve tried your suggestion but am getting lost. So here is an example of my json:

{“Application_Id”:“5”,“Cash_Drawer_No”:“1”,“Exported”:“1”,“Group_Sale_No”:“1679715”,“Location_Id1”:“31462”,“Location_id”:“31462”,“New_Card_Cost_Tax_Id”:“314000001”,“PCash_Paid”:“0.0000”,“POS_Product_Id”:“314000488”,“Product_Id”:“314000488”,“Quantity”:“1”,“Sale_Id”:“1”,“Sale_Invoice_No”:“1679715”,“Sub_Product_Id”:“314000488”,“Sublocation_Id”:“0”,“Tax1_Amount”:“0.0000”,“Tax2_Amount”:“0.0000”,“Tax_Id”:“314000001”,“Transaction_Amount”:“50000.0000”,“Transaction_DateTime”:“27/10/2018 3:35:45 PM”,“Transaction_Id1”:“25260”,“Transaction_Type”:“101”,“Transaction_id”:“25260”,“User_Id”:“1”,“sys_change_type”:“Insert”}

I’ve tried to change my proc to
create proc_sale_trans(query(transaction_id int, sale_id int))
AS
BEGIN
INSERT INTO sale_trans(transaction_id, location_id)
SELECT Transaction_id, Location_id
FROM query
END

and my pipeline to
CREATE or replace PIPELINE sale_trans
AS LOAD DATA KAFKA ‘kafka/paul_sales’
INTO PROCEDURE proc_sale_trans
format json;

However, i still get errors.
My apologies in advance if this is obvious :slight_smile:

Is it possible to point me to a worked through example?

Regards,

Paul

I assume the error is something like could not find field "transaction_id"? So your CREATE PIPELINE is omitting the subclause that tells memsql how to map fields of the input data to columns of the destination table/qtv. In this case FORMAT JSON will attempt to find fields with the same names as the destination columns. That search is case sensitive, so it’ll fail because the cases don’t match here. Not much to do but explicitly provide the mapping clause.

Try:

INTO PROCEDURE proc_sale_trans (transaction_id ← Transaction_id, location_id ← Location_id)