We also use Postgres but we don't have many jobs. It's usually 10-20 squedule that creates hourly-monthly jobs and they are mostly independent.
Currently a custom made solution but we are going to update it to use skip locked and use Notify/Listen + interval to handle jobs. There is a really good video about it on YouTube called: "Queues in PostgreSQL Citus Con."
I've never found a satisfying way to not hold the lock for the full duration of the task that is resilient to workers potentially dying. And postgres isn't happy holding a bunch of locks like that. You end up having to register and track workers with health checks and a cleanup job to prune old workers so you can give jobs exclusivity for a time.
So when selecting a message that isn’t started. It also looks for in progress ones that have been going longer than the timeout.
The update sets status, start time, and attempt counter.
If attempt counter equals 3 when the update happens, it sets the message to failed. The return looks at the stats sees failed and raises a notification.
Then if it’s a fix like correcting data or something I just reset the state to have it reprocess.
Never needed to track workers or cleanup jobs etc.
We also use Postgres but we don't have many jobs. It's usually 10-20 squedule that creates hourly-monthly jobs and they are mostly independent. Currently a custom made solution but we are going to update it to use skip locked and use Notify/Listen + interval to handle jobs. There is a really good video about it on YouTube called: "Queues in PostgreSQL Citus Con."
You can go an awfully long way with just SELECT … FOR UPDATE … SKIP LOCKED
I've never found a satisfying way to not hold the lock for the full duration of the task that is resilient to workers potentially dying. And postgres isn't happy holding a bunch of locks like that. You end up having to register and track workers with health checks and a cleanup job to prune old workers so you can give jobs exclusivity for a time.
I use a visibility timeout.
So when selecting a message that isn’t started. It also looks for in progress ones that have been going longer than the timeout.
The update sets status, start time, and attempt counter.
If attempt counter equals 3 when the update happens, it sets the message to failed. The return looks at the stats sees failed and raises a notification.
Then if it’s a fix like correcting data or something I just reset the state to have it reprocess.
Never needed to track workers or cleanup jobs etc.
Hold the lock and write a row with timestamp at the time you read.
That row indicates you are the one processing the data and no one else should. When reading, abort the read if someone else wrote that row first.
When you are finished processing, hold the lock and update the row you added before to indicate processing is complete.
The timestamp can be used to timeout the request.
If you go that route you could spawn a parent process , the work is done in a child process ?
Built right in using a group of pg functions, or also with a library, or also with a python based tool that happens to use pg for the queue.
pgmq https://github.com/pgmq/pgmq
Just select for update skipped locked. Table is partitioned to keep unprocessed small.