Write data issue using java pipeline transform code and load data from kafka

Hi Singlestore,

We are trying to load the data from Kafka and transform using the java application. We are running the java jar file from the bash script.

Pipeline

CREATE PIPELINE test_pipeline_transform
    AS LOAD DATA KAFKA '<kafka-hostname:port>/<topic-name>'
    WITH TRANSFORM('https://xxxx/xxxx/maven-app-transform.tar.gz','executable.sh','') 
    INTO TABLE test_table;

Bash script passed to the transform(maven-app-transform.tar.gz/executable.sh)

#!/bin/bash
java -jar $( dirname -- "$0"; )/my-app-1.0-SNAPSHOT.jar "$@"

Java file

public class App
{
    public static void main( String[] args ) throws IOException {
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(System.out));
        String input = readInput();
        List<String> records = new ArrayList<>(Arrays.asList(input.substring(1).split("\n")));
        for (String row : records) {
            List<String> cellList = new ArrayList<>(Arrays.asList(row.split(",")));
            String output = "";
            for (String cell : cellList) {
                output += cell + ",test-" + cell + "-123\n";
            }
            bufferedWriter.write(output);
        }
        bufferedWriter.flush();
    }

    public static String readInput() throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        StringBuilder output = new StringBuilder();
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            output.append(line);
            output.append("\n");
        }
        bufferedReader.close();
        return output.toString();
    }
}

Published below messages in kafka topic

test 123
new line value
abc

We are able to transform the text. But we are getting one extra character in the first inserted message table as below. If we skip the first character in the java code then we are not getting the extra character.

image

Is the above approach correct or did I miss anything?
Please share any references for the pipeline transform using java if there are any.

Configurations:

  • Singlestore version - 8.0.4

  • Setup using cluster in a box

  • Red hat OS

  • Java version - 11

Thanks in advance

Hello, that might be a collation issue. What character would you expect there?

“test 123” is expected. But there is one extra value(?) as highlighted before the “test 123” value.

Could it be related to the same issue as here? Two strange bytes at the beginning of each message of a Kafka message produced by my Kafka Connector - Stack Overflow