FS PIPELINE understanding how it handles files

I have a data source where I have to download 7 large files daily via FTP, they all belong in one table but are assigned to different regions of the country, and unfortunately there is no good primary key that can be used other than an id serial number. The problem is there may be rows that get deleted and others added, and if the new data set that is downloaded is missing rows then they should be deleted from the database table.

So I have a bash script that downloads all the files once a day in a cron job, and after its all successfully downloaded it would hard link each file into /var/lib/memsql/db_files so that a PIPELINE can pick up the data. However its not obvious from the documentation how SingleStore handles these files exactly when it imports them. Does it only look for appended rows?

SELECT * FROM information_schema.PIPELINES_FILES provides some clues as to what is going on, as there is file_name, file_size, and file_state. Can someone provide more clarity as to how to use PIPELINE FS on files that may change and have deleted rows?

Thanks

To explain some more, so let’s say there is a new file for westcoast region that comes in today, I would link it to /var/lib/memsql/db_files/westcoast.csv to replace the link that came from the file yesterday, this would update the time stamp but technically its a new file with contents similar to the old one…

Hello, here is a response from SQrL
SingleStore Pipelines are designed to facilitate real-time data ingestion1. Once data is picked up from a file by a pipeline, it does not look for additional changes to the file or track deleted rows1. Pipelines handle files as if they’re append-only1.

However, you can leverage the ALTER PIPELINE ... DROP FILE command to handle files that have been replaced or removed2. When you download new versions of your files, before placing them in the /var/lib/memsql/db_files folder, you could run the ALTER PIPELINE ... DROP FILE command for each of the existing files. That will make the pipeline forget all metadata associated with those files2. When the new files are placed in the directory, the pipeline will treat them as new files to be loaded2.

Regarding your database with potentially missing rows in new data sets, there are a few ways to handle this. One method involves SingleStore’s support for handling Slowly Changing Dimensions3. This approach means that changes to data including additions, updates, and deletions, are tracked as new rows in the database3. By applying this pattern to your situation, you could include a “version” or “datetime” field in your records, then when new files are loaded, older records that aren’t included in the new file could be marked as ‘expired’, based on their version or datetime3.

Here’s an example where pipeline is used to track changes to product information3.

sql

CREATE PIPELINE changing_dimension_pipeline
AS LOAD DATA FS '/product_history.txt'
INTO PROCEDURE changing_dimension_proc
FIELDS TERMINATED BY ',';

START PIPELINE changing_dimension_pipeline;

This pipeline loads data into a stored procedure, changing_dimension_proc 3. This procedure could contain logic for handling changes to existing records3.

For more customized operations, you can also use a stored procedure, where you can apply more complex logic on the incoming rows. If you need to ignore certain records, (such as header or trailer records) you can use a transform function as suggested in this forum post4. Lastly, column changes in the files can be handled using a JSON pipeline strategy5.

Please keep in mind that handling changing file formats and maintaining a real-time reflection of your source data in the database, particularly with deletions, can be complex. An alternative approach could involve creating a complete snapshot of your data with each daily load, and completely replacing the old data with the new.

1 Like

After some lengthy chat with SQrL, looks like I have to do the old school method…

However, directly replacing all the data in a table would cause the table to be unavailable during the operation. If you need to replace all data and then make it “go live”, you should consider creating a new table, loading the data into the new table, and once the data loading is done, rename the tables to switch them. Renaming tables is a metadata operation and is almost instantaneous, so the downtime should be minimal:

sql

CREATE TABLE new_my_table LIKE my_table;
LOAD DATA INTO new_my_table ...;

RENAME TABLE my_table TO old_my_table, new_my_table TO my_table;
DROP TABLE old_my_table;

Remember, the above examples are generic and might need adjustment based on your exact use case and setup1234.