ERROR 1970 ER_SUBPROCESS_TIMEOUT_ERROR: because no metadata was received from source

Hello,

I am seeing an error during SingleStore pipeline creation for Kafka topic.

I’ve tried multiple ways and all of them had the same issue

  1. Tried to run Kafka producer on my local machine and have public IP available. Able to run Python consumer, subscribe and consume messages. But SingleStore pipeline throws below error.

  2. Also, deployed Kafka producer on AWS and have TCP port 9092 enabled. Again, outside of SingleStore, I am able to use public IP, subscribe to specific Kafka topics and start consuming messages without any issues when I run Python scripts for consumer. But SingleStore pipeline throws an error.

ERROR 1970 ER_SUBPROCESS_TIMEOUT_ERROR: Forwarding Error (node-…): Remote connection timed out because no metadata was received from source. Stderr: Batch starting with new consumer. rd_kafka_version_str: 1.9.2-21-gff6b70

Based on SingleStore documentation, this seems to be related to connectivity issues, but like I said I am able to subscribe to messages outside of SingleStore. Maybe need to try to increase timeout…

Here is an example of pipeline:

CREATE OR REPLACE PIPELINE rp_trades_stream AS
LOAD DATA KAFKA '184.169.233.95:9092/Trade'
CONFIG '{"security.protocol":"plaintext"}'
INTO TABLE Trades;

Tried adding this CONFIG ‘{“security.protocol”:“plaintext”}’, but it did not make any difference and I think this is a default setting

Maybe since I am using data in JSON format, I need to specify how to consume it? I also tried adding other parameters, this seems to be a connectivity issue based on other forum issues and docs.

Thank you,
MP

Currently I am running SingleStore on “GCP / US East 4 (N. Virginia)” portal.singlestore.com
Kafka producer is on AWS, but that should not matter, since it is available from anywhere

Here is Python script for consumer that works for me with any Kafka setup, local or AWS:


import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    bootstrap_servers=["MY-PUBLIC-IP:9092"],
    group_id="MY-GROUP-NAME",
    auto_offset_reset="earliest",
    enable_auto_commit=False,
    consumer_timeout_ms=1000,
    value_deserializer=lambda m: m  # Use raw message content without decoding
)

consumer.subscribe("Trade")

try:
    for message in consumer:
        topic_info = f"topic: {message.partition}|{message.offset})"
        message_info = f"key: {message.key}, {message.value}"
        print(f"{topic_info}, {message_info}")

        try:
            # Attempt to deserialize the JSON value
            value = json.loads(message.value.decode('utf-8'))  # Decode the bytes to UTF-8 before JSON parsing
            print("Deserialized JSON value:", value)
        except json.JSONDecodeError as json_error:
            print("JSON decoding error:", json_error)

except Exception as e:
    print(f"Error occurred while consuming messages: {e}")
finally:
    consumer.close()