MemSQL Pipeline error extracting data from Kafka(\)

To replace Elasticsearch, I am conducting a test to collect Json data using Kafka PIPELINE function in MemSQL. When trying to load kafka’s topic data into MemSQL, Pipeline stopped when an error occurred “Invalid JSON value for column ‘message’”.
Looking at the Info Schema table, it appears that an error has occurred because the data type in the form of " could not be loaded.
When loading data through Procedure, it seems to be a problem that does not handle special characters (characters starting with ). What’s the solution to this?(It should not be using \)

  1. Test Environment

    • Data flow : metricbeat → kafka→ MemSQL PIPELINE → MemSQL Tables
    • MemSQL Version : 7.1.7
  2. PIPELINE Configure
    2.1 Procedure


DELIMITER //
CREATE OR REPLACE PROCEDURE proc_mbeats (kafka_metricbeat query(message json))
AS
BEGIN
.
.
END //
DELIMITER ;

2.2 Pipeline

CREATE PIPELINE pl_mbeats AS LOAD DATA KAFKA 'localhost/metricbeat' INTO procedure proc_mbeats;

3.3 Error meassage & Info Schema
* Error_Message : Invalid JSON value for column ‘message’
* Error_Kind : Load / Load_Data_Line :

{"metricset":{"name":"process","period":10000},"ecs":{"version":"1.5.0"},"event":{"duration":315087751,"dataset":"system.process","module":"system"},"@version":"1","system":{"process":{"cmdline":"/usr/bin/ssh-agent /bin/sh -c exec -l /bin/bash -c \"env GNOME_SHELL_SESSION_MODE=classic gnome-session --session gnome-classic\"","state":"sleeping","cgroup":{"cpuacct":{"total":{"ns":1322048427228212},"percpu":{"2":344223580301541,"3":339673452025808,"4":335369976030739,"1":302781418870124},"stats":{"system":{"ns":550364150000000},"user":{"ns":678525700000000}},"path":"/user.slice","id":"user.slice"},"id":"user.slice","blkio":{"total":{"ios":29544670,"bytes":573001093120},"path":"/user.slice","id":"user.slice"},"cpu":{"cfs":{"period":{"us":100000},"quota":{"us":0},"shares":1024},"rt":{"period":{"us":1000000},"runtime":{"us":0}},"stats":{"throttled":{"ns":0,"periods":0},"periods":0},"path":"/user.slice","id":"user.slice"},"path":"/user.slice","memory":{"kmem":{"usage":{"max":{"bytes":0},"bytes":0},"limit":{"bytes":9223372036854771712},"failures":0},"id":"user.slice","mem":{"usage":{"max":{"bytes":6548250624},"bytes":5639651328},"limit":{"bytes":9223372036854771712},"failures":0},"memsw":{"usage":{"max":{"bytes":7810928640},"bytes":7496921088},"limit":{"bytes":9223372036854771712},"failures":0},"stats":{"page_faults":5320872892,"unevictable":{"bytes":0},"active_file":{"bytes":730714112},"swap":{"bytes":1857269760},"inactive_anon":{"bytes":924241920},"cache":{"bytes":3842818048},"pages_out":1041190970,"rss":{"bytes":1796628480},"rss_huge":{"bytes":10485760},"pages_in":1042542749,"hierarchical_memory_limit":{"bytes":9223372036854771712},"hierarchical_memsw_limit":{"bytes":9223372036854771712},"active_anon":{"bytes":3241947136},"inactive_file":{"bytes":742543360},"major_page_faults":102542,"mapped_file":{"bytes":2297708544}},"kmem_tcp":{"failures":0,"limit":{"bytes":9223372036854771712},"usage":{"max":{"bytes":0},"bytes":0}},"path":"/user.slice"}},"memory":{"rss":{"pct":0,"bytes":49152},"size":74211328,"share":0},"cpu":{"total":{"value":48980,"pct":0.001,"norm":{"pct":2.0E-4}},"start_time":"2020-07-21T02:25:14.000Z"}}},"agent":{"version":"7.9.0","hostname":"apigw.itmsg.com","id":"2e3eb88f-2eaa-45db-a34a-1cc85c3cbeca","name":"apigw.itmsg.com","type":"metricbeat","ephemeral_id":"2e09d098-68a3-4c4c-8998-c2121f32f904"},"service":{"type":"system"},"@timestamp":"2020-10-10T04:29:28.946Z","user":{"name":"itmsg"},"tags":["beats_input_raw_event"],"cloud":{"iscloud":"no"},"host":{"hostname":"apigw.itmsg.com","os":{"version":"7 (Core)","family":"redhat","name":"CentOS Linux","codename":"Core","kernel":"3.10.0-1127.el7.x86_64","platform":"centos"},"name":"apigw.itmsg.com","architecture":"x86_64","id":"812d3a73deef42039707c92b4f3de3a9","containerized":false,"ip":["192.168.0.191","fe80::a25d:431:d1a9:c168","192.168.122.1","172.17.0.1"],"mac":["00:50:56:90:21:7b","52:54:00:4c:1a:18","52:54:00:4c:1a:18","02:42:00:9d:c6:56"]},"process":{"name":"ssh-agent","ppid":22549,"pid":22694,"args":["/usr/bin/ssh-agent","/bin/sh","-c","exec -l /bin/bash -c \"env GNOME_SHELL_SESSION_MODE=classic gnome-session --session gnome-classic\""],"pgid":22694}}

In addition, I added the column name as below, but the situation is the same.

STOP  PIPELINE pl_mbeats;
DROP   PIPELINE pl_mbeats;
CREATE PIPELINE pl_mbeats AS LOAD DATA KAFKA 'localhost/metricbeat' INTO procedure proc_mbeats (message);
TEST  PIPELINE pl_mbeats LIMIT 1;
START PIPELINE pl_mbeats

information_schema.pipelines_errors table
* Error_Message : Invalid JSON value for column ‘message’
* Error_Kind : Load
* Load_Data_Line :

{"user":{"name":"itmsg"},"agent":{"id":"2e3eb88f-2eaa-45db-a34a-1cc85c3cbeca","type":"metricbeat","ephemeral_id":"d35c8bcf-0053-47c2-9868-6b0945449d28","hostname":"apigw.itmsg.com","name":"apigw.itmsg.com","version":"7.9.0"},"tags":["beats_input_raw_event"],"system":{"process":{"cmdline":"/usr/bin/ssh-agent /bin/sh -c exec -l /bin/bash -c \"env GNOME_SHELL_SESSION_MODE=classic gnome-session --session gnome-classic\"","memory":{"share":0,"size":74211328,"rss":{"pct":0,"bytes":49152}},"cgroup":{"id":"user.slice","path":"/user.slice","memory":{"id":"user.slice","path":"/user.slice","memsw":{"usage":{"max":{"bytes":7810928640},"bytes":7645310976},"limit":{"bytes":9223372036854771712},"failures":0},"kmem":{"usage":{"max":{"bytes":0},"bytes":0},"limit":{"bytes":9223372036854771712},"failures":0},"stats":{"pages_in":1076281878,"hierarchical_memory_limit":{"bytes":9223372036854771712},"pages_out":1074907863,"hierarchical_memsw_limit":{"bytes":9223372036854771712},"active_anon":{"bytes":3344973824},"active_file":{"bytes":737796096},"swap":{"bytes":1914695680},"page_faults":5507004419,"cache":{"bytes":3926863872},"major_page_faults":112743,"mapped_file":{"bytes":2304925696},"inactive_file":{"bytes":818876416},"rss":{"bytes":1803661312},"inactive_anon":{"bytes":828837888},"rss_huge":{"bytes":10485760},"unevictable":{"bytes":0}},"mem":{"usage":{"max":{"bytes":6548250624},"bytes":5730615296},"limit":{"bytes":9223372036854771712},"failures":0},"kmem_tcp":{"usage":{"max":{"bytes":0},"bytes":0},"limit":{"bytes":9223372036854771712},"failures":0}},"cpuacct":{"id":"user.slice","path":"/user.slice","total":{"ns":1363356787668632},"stats":{"system":{"ns":565265630000000},"user":{"ns":701970730000000}},"percpu":{"3":350289131550440,"4":345755768882701,"1":312352102946165,"2":354959784289326}},"cpu":{"id":"user.slice","path":"/user.slice","cfs":{"shares":1024,"quota":{"us":0},"period":{"us":100000}},"rt":{"period":{"us":1000000},"runtime":{"us":0}},"stats":{"throttled":{"periods":0,"ns":0},"periods":0}},"blkio":{"id":"user.slice","path":"/user.slice","total":{"ios":30659566,"bytes":593764645888}}},"state":"sleeping","cpu":{"total":{"pct":0,"norm":{"pct":0},"value":0},"start_time":"2020-07-21T02:25:14.000Z"}}},"process":{"ppid":22549,"pid":22694,"pgid":22694,"args":["/usr/bin/ssh-agent","/bin/sh","-c","exec -l /bin/bash -c \"env GNOME_SHELL_SESSION_MODE=classic gnome-session --session gnome-classic\""],"name":"ssh-agent"},"metricset":{"period":10000,"name":"process"},"service":{"type":"system"},"ecs":{"version":"1.5.0"},"host":{"id":"812d3a73deef42039707c92b4f3de3a9","architecture":"x86_64","containerized":false,"ip":["192.168.0.191","fe80::a25d:431:d1a9:c168","192.168.122.1","172.17.0.1"],"mac":["00:50:56:90:21:7b","52:54:00:4c:1a:18","52:54:00:4c:1a:18","02:42:00:9d:c6:56"],"hostname":"apigw.itmsg.com","os":{"family":"redhat","codename":"Core","platform":"centos","kernel":"3.10.0-1127.el7.x86_64","name":"CentOS Linux","version":"7 (Core)"},"name":"apigw.itmsg.com"},"@version":"1","@timestamp":"2020-10-12T13:26:14.408Z","cloud":{"iscloud":"no"},"event":{"dataset":"system.process","duration":175970329,"module":"system"}}

The error is likely because the input data is actually being parsed as TSV, as that’s the default data format when otherwise unspecified. I believe TSV escape translation should in fact turn that into invalid JSON.

Try it like this:

CREATE PIPELINE pl_mbeats AS LOAD DATA KAFKA ‘localhost/metricbeat’ INTO procedure proc_mbeats (message <- %) format json;

1 Like

Thank you for your answer.
I didn’t quite understand your answer. But now I totally get it.
Thank you for your help.

1 Like