Parquet Pipeline: Cannot extract data for pipeline - AWS 404

Hello,

I am working on setting up a memSQL Pipeline to read data in .parquet format form an S3 bucket. I successfully got a pipeline working using the MySQL Command-Line Client when reading from a single file in .csv format using the following syntax:

mysql> CREATE OR REPLACE PIPELINE `csv_test_pipeline`
 AS LOAD DATA S3 "s3://my_bucket/csv_test_data.csv"
 SKIP DUPLICATE KEY ERRORS
 INTO TABLE `csv_test_table`
 FIELDS TERMINATED BY ','
 (`CCID_1`,
  `CCID_2`,
  `CCID_3`);
Query OK, 0 rows affected (0.16 sec)

mysql> START PIPELINE csv_test_pipeline FOREGROUND;
Query OK, 1001 rows affected (0.51 sec)

However, I can’t get a .parquet pipeline to work. I wrote out the same dataset as above to the same bucket in .parquet format instead of .csv format using Spark/Databricks. I am able to create the pipeline, but get an error when running it.

EDIT: I realized after originally posting that I had snappy encryption enabled on the parquet file by default. I have since re-written the file with compression=none.

mysql> CREATE OR REPLACE PIPELINE `parquet_test_pipeline`
 AS LOAD DATA S3 "s3://my_bucket/test_data.parquet"
 SKIP DUPLICATE KEY ERRORS
 INTO TABLE `parquet_test_table`
 (`CCID_1` <- `CCID_1`,
  `CCID_2` <- `CCID_2`,
  `CCID_3` <- `CCID_3`)
 FORMAT PARQUET;
-- Query OK, 0 rows affected (0.17 sec)

mysql> START PIPELINE parquet_test_pipeline FOREGROUND;
-- ERROR 1934 (HY000): Leaf Error (<internal_ip>:3306): Leaf Error (<internal_ip>:3306): Cannot extract data for pipeline.  AWS request failed:
NotFound: Not Found
	status code: 404, request id: 958..., host id: C3BZ...

The parquet file was written out using Spark running on Databricks. The S3 folder contains the usual Spark .parquet files along with (in this case) one data file:

s3://my_bucket/test_data.parquet/_SUCCESS
s3://my_bucket/test_data.parquet/_committed_5112107910452879650
s3://my_bucket/test_data.parquet/_started_5112107910452879650
s3://my_bucket/test_data.parquet/part-00000-tid-51...c000.parquet

I’ve tried using different variations of the S3 path (e.g. /test_data.parquet, /test_data.parquet/, /test_data.parquet/*, etc.), but always get the same 404 error. As a test, I sshed into the leaf node in question and copied the S3 object in question using AWS CLI with no issues. I’m not sure how this could be an AWS access issue given the .csv pipeline has no issues.

Any help is appreciated.

It appears that the S3 key prefix is s3://my_bucket/parquet_test_data.parquet rather than what you have in the pipeline statement s3://my_bucket/test_data.parquet

Hi @carl,

Thank you for the reply. I see the error you pointed out - unfortunately, that was just a mistake I made when simplifying the S3 paths in question. I corrected the typo in the post. I also confirmed that the paths in question are identical in the pipeline description and using the AWS CLI, but I am still getting the AWS error.

Is there a convention for the S3 path that should be used when specifying the parquet directory in the pipeline? s3://.../data.parquet?

Hmm, if the paths are correct the next thing I would try is providing a full path to a single parquet file. If that fails with a 404 then check your permissions. If that succeeds, then there may be an issue with iterating that folder. Are the _SUCCESS, committed…, and started… files empty? They might be causing issues.

The code path handling downloads from s3 is written to be agnostic of the data format, so I wouldn’t expect anything particular to parquet.

After the start pipeline fails, what’s the result of select * from information_schema.pipelines_files where pipeline_name = "parquet_test_pipeline"?

We raise an error if non-empty files which match the name filter are deleted before we have a chance to load them. Because the filter includes everything prefixed by test_data.parquet, it’s possible that at some initial bucket list time, we found a temporary file (I recall Spark can stage output to a _temporary subdirectory) and added it to our internal metadata as a file to be loaded, and are now trying to load it but finding it deleted. You should see it in pipelines_files in this case. Note that, when you replace a pipeline, it preserves metadata about found vs loaded files.

We support shell-style globs in bucket names, so if this is the issue, dropping the pipeline and recreating with s3://my_bucket/test_data.parquet/*.parquet will likely do the trick.

Thank you, @carl and @sasha, for the insights! The issue did end up being the empty _SUCCESS etc. files.

It might be worth adding some of these notes to the memSQL Parquet Pipeline documentation. Those empty files are common when using Spark/Hadoop/MapReduce, which are frequently the systems creating Parquet files. The notes on S3 path wildcards were also very helpful, and I didn’t see that in the documentation.

Thank you @tmiller for the feedback. I just created a ticket to track updating the documentation with this information.