Verifying Pipeline completion - integrity checks?

We had a problem recently with some data being lost. Firstly to best explain this I’ll give an overview of our process:

  1. Our application writes out csv files into azure blob storage, upon successfully writing an entire file it renames the file to have a prefix “completed_{pipelinetype}” substituting “{pipelinetype}” with a specific code to specify the appropriate pipeline.
  2. There is a singlestore database in which we create a number of pipelines that filter by prefix “completed_{pipelinetype}” that read from azure blob storage into staging tables (currently columnstore tables, without a key).
  3. We poll the PIPELINES_FILES table to see when all expected files are in there in a “Loaded” state, we consider this to indicate completion of the pipeline.
  4. We run a proc to move the staging tables into an optimised reporting structure. This joins on some other tables and could be the problem.

The singlestore DB is in K8s running a single height 1 leaf (32GB, 8 cores) and single height 1 aggregator.

At some point something went wrong and of the 14 million rows, 2000 did not come through.

  • We know the rows are in the source file, and that they were there from the start because of the renaming “completed_” step to ensure only fully written files are loaded.
  • We know the rows are not in the end table
  • We do not know if they made it into staging because the table is cleared upon completion of the process
  • We therefore do not know if they went into staging but failed to move from staging to the final tables.
  • We re-ran the process from step 2 (i.e. we did not regenerate the source files, but did recreate the pipelines and import again into staging etc.) and the rows successfully imported.

I am worried the failure may have been with the pipeline, the reason for this is that all the missing rows were all in the same file (one of about 400 for that pipeline type), but there’s nothing particular about that file or the data in it that would differentiate it from others, the same keys exist across other rows that made it in, and there does not appear to be any pattern to what was dropped beyond sharing a file. I could well be wrong here, but that’s my current thinking.

So I have the following questions

  1. Is our process of checking the PIPELINES_FILES table for the FILE_STATE value sufficient for concluding that the data is loaded, it can be selected from, and the pipeline can be deleted?
  2. What verification checks are built into a pipeline before the file is changed to this state to confirm that all rows were imported. Right now we are tempted to write our own verification based on rowcount, but this is pointless if we’re reproducing singlestore behaviour.
  3. Is there any built in verification process we can run to validate a pipeline before we confirm completion and delete it?
  4. Do you know of any current reason why a pipeline might drop rows? If so how to deal with this.
  5. Is there any race condition possible around the pipeline completion and the columnstore table that might mean the data wasn’t quite inserted before we tried to move it?

Thanks,
Will

Hi Will! Sorry that you are encountering an issue here. I have some clarifying questions and remarks which should help us get to the bottom of it.

  1. Are you using append blobs? If not, how are you uploading the file chunks?
  2. Are you using (or have you considered) the put block / put block list api’s? It seems that this is the recommended approach for uploading large files in parallel and with retry. Some information here.
  3. Can you modify your procedure such that instead of clearing the rows from staging it moves them into another table called “loaded” and see if you can repro the issue? I don’t suggest this permanently, but it may be a good idea to debug your procedure to ensure rows aren’t being lost at that point.

In general - assuming the blobs are entirely static and available in Azure when we download them we will load 100% of the data. There are only three cases where we could load less data:

  1. the blob is not entirely uploaded - it’s possible that renaming an append blob in Azure is not atomic with regard to the final append committing
  2. the pipeline is configured to ignore errors or duplicates - or the csv parsing is somehow mis-parsing certain rows
  3. the procedure is not moving all of the rows

Hi Carl,

Thanks for your reply. To answer your questions:

  1. We are using Block Blobs with Azure DataLake File storage (i.e. Gen2 storage accounts with hierarchical storage on), and writing them using the “Azure.Storage.Files.DataLake” package. We don’t append the data, it’s a single continuous write to the initial stream.
  1. I believe the Gen2 api write operations are a separate set of operations and not compatible with the gen1 operations. However we are using the standard MS libraries for writing the data.
  2. Yes, we will do this, though the issue has not recurred.

With regard to your suggested cases:

  1. We’re using block blobs, also the data missing was not at the end of the file, it appeared to be a section somewhere about 2/3rds down.
  2. The pipeline is not ignoring duplicates or errors. It usually fails if we have bad data.
  3. This is possible, the thing that makes it unlikely is that all the data is loaded as a single cohesive set, which is all inserted via the pipelines, there is no persistent data used in the load operation. The data loaded successfully on retry which suggests that this is a non deterministic problem.

This does bring me to the main question around this though: Given it’s near impossible to lock down the cause I’m more interested in what verification checks we might have available post load. Are there any checksums or row counts we can verify? Any recommended process for doing this? We can definitely store a checksum of the written file, but obviously we cannot verify that against the data the pipeline loaded. Otherwise I suppose we will need to manually implement row count checks.

Regards,
Will

I appreciate the detailed overview.

Regarding writing the files, I am not super familiar with the low-level implementation of the Gen2 API, however assuming you are using Upload(stream) then Flush then Rename it seems like this approach should be fine. Also given the missing data appeared 2/3 through the stream it’s even less likely that this issue is due to a filesystem sync issue.

In order to debug this further I am going to need to see your stored procedure and create pipeline statement. Feel free to DM it to me (or contact me to arrange secure transfer) if needed.

Regarding verification checks - right now we can only help with row-count based checks but it would be a good idea to provide checksums or hashes of files that we download. For row-counts, I think you have two options (probably implement both):

  1. Take a look at the PIPELINE* tables in information_schema. For example you could inspect the PIPELINES_CURSORS table which will provide byte level cursor offsets and total size for every file we load from the source. (docs)

  2. Depending on performance sensitivity you could add a row count check within the procedure. Check the size of the incoming batch the size of the staging table and the size of the destination post merge. Obviously this may be tricky depending on what else is going on in the workload - but something to think about.

Cheers,
Carl

Hi Carl,

I will not be able to share the exact procedure, but the logic that might exclude rows is really very simple:
Insert into factTable
[columnlist]
SELECT f.factcol1, f.factcol2, d1.dimension1id, d2.dimension2id…
FROM staging_factTable f
JOIN dimension_dimension1 d1 on f.Dimension1Code = d1.Dimension1Code and f.JobId = d1.JobId
… [Many More Dimensions] …
WHERE f.JobId = jobIdParam

All dimensions and the staging fact are loaded via pipelines defined in code as:
$@“CREATE PIPELINE {pipeLineName}
AS LOAD DATA AZURE ‘{dataPath}’
CONFIG ‘{“suffixes”:[“csv”]}’
CREDENTIALS ‘{credentials}’
INTO TABLE {tableName}({string.Join(’,’, columns)})
FIELDS TERMINATED BY ‘,’ OPTIONALLY ENCLOSED BY '”"’
LINES TERMINATED BY ‘\r\n’
IGNORE 1 LINES
SET {string.Join(", “, setColumnsStatement)}”

The setColumnsStatement is being used to to get nulls coming through for data types such as DateTime.

As you can see, there’s nothing very particular about it, and I think it’s unlikely to be something you’ll be able to divine the cause from.

Much better is that we implement the verification checks discussed so that if this occurs again we will catch it, then if it’s a persistent problem I can follow up with you guys with a bit more detail. Thanks for the suggestions on those. We have already got something in the works to verify counts from staging -> fact (unfortunately it was not ready when this occurred), and I will look into those pipeline tables you suggested in order to allow us to verify the files have been loaded in entirety so we can catch this issue if it recurs.

Thanks for your help,
Will

Thanks for the additional details. Makes sense to see if it repros along with more data.

As an aside - I am curious why you are loading the data into a staging table and then calling the pipeline for each job. Have you considered using pipelines into stored procedures to eliminate the need for a staging table?

I am not suggesting this due to the issue you described - more want to understand whether or not pipelines into stored procedures would fit your use case.

Cheers,
Carl

Hi Carl,

I haven’t looked into running pipelines into procs, however the proc we have could only run once all pipelines had completed, not sure if that would block it. However the main reason we won’t be going down that route is that we are aiming to move away from procs completely and run all logic via the Entity Framework so that we can reproduce the logic in our tests.

Regards,
Will

Will,
Thank you for the details. As a developer myself, I feel your pain and annoyance with Stored Procedures - it’s certainly not the same as working on code you can test and run normally.

Quick tangent: we have some interesting projects in this space - one of which is experimenting with embedding WASM in the engine. You can see a very early overview of this in a video we recently published: Bailey Hayes demonstrates #WebAssembly with #SingleStore - YouTube. Stay tuned for more announcements along these lines in the future.

Keep us in the loop if the original problem happens again. Otherwise we wish you luck with the entity framework approach and are available to assist if you need anything else.

Cheers,
Carl

assess, evaluate, repair and validate the integrity of their pipelines that improving inspection protocols for integrity management inspections.