We are facing a random error with Slick 3.
That is the brief history.
We are building an ETL pipeline Spark, Minio as S3 Storage ( Minio is an open-source alternative to AWS) and Delta tables. The pipeline has a web interface created using Play Framework (Scala).
Cluster is consisted of:
7 workers nodes of 16 cores and 64GB RAM each configured in client mode.
1 Storage node
[login to view URL] and [login to view URL] are both set to 600
[login to view URL] is disabled
App data (session data, users data, and some other records in) is saved in PostgreSQL using Slick 3 mapper.
Data processed size is exponentially growing and now it is around 50GB. (In production, we aim to process Terabytes of data)
Data processing flow consists essentially in data Aggregation using group-by and saving data into S3 Storage following these steps
1. Read CSV data from Storage and create read_df dataframe
2. Read main_db from dtorag and create main_df
3. Merge read_df with main_df
4. GroupBy a specfic Key (let’s say user_id)
5. Save records to Storage to replace main_db. To guarantee data integrity, this stage is split into three phases:
- Write records to a temp object referenced by datetime
- Backup Existing database object main_db (copy to another object)
- Rename temp object to main_db (copy and delete)
6. Then Update PostgreSQL history table with processed job informations such as:
time_started, time_ended, number_of_rows_processed, size, etc. And that is where issue occurs.
We are facing a random error and we noticed it happens when shuffle occurs after groupby. Sometimes, we end up with 1000+ partitions. In those cases Step 5 is not completed and gives folowing Exception:
[login to view URL]: Task [login to view URL]$DatabaseDef$$anon$3@291fad07 rejected from [login to view URL]$$anon$1$$anon$2@7345bd2c[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 26]
completed tasks value sometimes is lowe sometime reaches hundreds
Below is code that is executed in step 5
[login to view URL]([login to view URL]("[login to view URL]", mainDb), [login to view URL])
Googling the exception, we found that it could be because connections are closed before code is excecuted when using transactionally. Notice we don’t use transactionnally in our code. Below is code excecuted when calling update()
val updateQuery = [login to view URL]([login to view URL] === id).update(db)
[login to view URL](updateQuery)
That is the actual slick configuration:
connectionPool = "HikariCP"
dataSourceClass = "[login to view URL]"
numThreads = 100
Initially before errors starting, it was
numThreads = 20
maxConnections = 20
We tried queueSize = 2000 but not fixed.
Can someone have a soution for us?
Furthermore, we suspect the step5 to be responsible of that connection closed issue because that did not happen when it is turned off. What is the link between threads that read/write from S3 Storage (on another server) and hikari (slick) processes that are killed?
And is there a better way to guarantee data integrity (in case of failure while writing data) without this time consuming copy-restore-and-delete process ?
1. After Aggregation we repartition() to reduce partitions and avoid skew data before saving results. Coalesce() made driver JVM craches with OOM.
2. main_df and read_df do not have the same schema so, overwritting using delta in built-in method is not possible.
3. Update() function’s Await time was 10s but following issue, we increased it but that did not fix the issue.