Effectum

Written — Updated
  • status: v0.1 released
  • url: https://github.com/dimfeld/effectum
  • A job queue based on SQLite, which will eventually be usable either embedded as a library or standalone.
  • Current Features
    • Scheduled Jobs
    • Multiple types of jobs, and workers can handle one or more types
    • Job can checkpoint and update their payload, so that retries will start with the checkpointed payload. This can be used to reduce the risk of running non-idempotent operations more than once.
    • Jobs can submit a heartbeat while they are still working on long tasks.
    • Jobs expire and are retried if the heartbeat times out for too long
    • Jobs can have higher priority to run sooner.
    • Jobs can have higher "weight" for processing-intensive jobs for which we may want to use lower concurrency.
    • Workers can listen only for specific types of jobs.
    • Workers can run multiple jobs concurrently.
    • Handle Unexpected Process Restarts
      • If the process crashes we want to cleanly handle jobs that had been running at the time.
      • This involves doing a sweep for running jobs when the queue is created, and scheduling them for retries
        • Should have options to reschedule them immediately or using standard retry backoff.
      • For the future server mode we'll also want the ability to not touch them and to reinstate them in the in-memory data structures as running jobs, since the workers will have been outside the queue process.
        • This will probably also work best if the worker details are persisted to the database too.
  • Future Features
    • Examples
      • Create example programs that demonstrate actual usage of the library.
      • Simple example
      • Retries due to failure
      • Job priority
      • Graceful shutdown
      • Checkpoint functionality
      • Expired jobs retry
    • Cancellation and Modification of Pending Jobs
      • Disallow this for jobs that are already running, but otherwise jobs should be able to be cancelled, rescheduled, or otherwise modified.
    • Recurring Jobs
      • A job template will be added to the database, which will have all the same fields as a normal job except that it will not be scheduled.
      • For this job, a corresponding entry will be added to recurring_jobs with a cron-like spec. A task in the queue will then automatically schedule the jobs for the appropriate times.
    • Server Mode
      • If you need to scale up, ability to run the queue as its own server.
      • This should present the same (or nearly the same) interface as the local mode, to make it easy to switch from one to the other.
      • Workers will be remote processes and communicate to the server over gRPC (maybe normal REST too?)
        • Use long polling or gRPC streaming here for workers that wait for pending jobs?
  • Database Structures
    • Base Data
      • This is stored in the jobs table.
      • Fields
        • job id
        • external job id (this is a UUID)
        • job type
          • Used to filter which workers will try to run this job.
        • job priority
          • Higher priority jobs will go first when multiple jobs can be run.
        • weight
          • How much this job counts against a worker's concurrency limit. Can be increased for jobds that require heavy computation.
        • job status
          • Active
            • For Active jobs, status resolves to Pending or Running depending on if the job is currently assigned to a worker in active_jobs
          • Success
          • Failed
        • recurrence id (for recurring tasks)
        • Original "run at" time
          • job that should run immediately will have this set to now
        • payload
        • payload from last checkpoint
        • current try
        • maximum retries allowed
        • retry backoff spec
          • Backoff initial time, multiplier, and randomization factor
        • time job was added
        • job timeout
          • Default expiration time when starting the task is (now + timeout)
        • heartbeat expiration increment
          • When receiving a heartbeat, the new expiration will be max of current expiration and (now + expiration increment)
          • This can be 0 to have a strict timeout, regardless of heartbeat
        • started at time
        • finished at time
        • run info (info about each failure for failed runs, info about success from a successful run)
    • Active Jobs
      • This is information specific to jobs which are running or waiting to run
      • job id
      • worker id, if currently running
      • job type (copied from the jobs table but helps make lookups more efficient when only looking for certain task types)
      • priority
      • run at time
      • started at time
      • expiration time for currently running jobs
    • Recurring Job Spec
      • Not supported yet, this is on the roadmap.
      • recurrence id
      • base data job id
        • When starting a recurring job, the row from the jobs table will be copied to a new job with a new id
      • schedule
  • In-Memory Data Structures and Tasks
    • Track "next" job run time for each job type
    • List of clients that can run for each job type
      • When a job becomes available, all the workers are notified, and then those that are currently waiting for a new job will try to grab one.
    • Database Writer
      • Instead of each task writing to the SQLite database, write access is run from a single thread, which communicates with the rest of the world using a channel. Results are returned through a oneshot associated with each request.
      • This is uniquely well suited to SQLite due to its "single writer at a time" policy. Databases with true MVCC wouldn't be as well served here.
      • But the real benefit is that this allows batching of independent requests that come in near the same time, which increases the maximum throughput of the queue by 10x-20x depending on the details.
  • Job Runners
    • Each job has a name and a runner function. The runner function is a normal async function which takes the current job and a context object (for database pools and so on).
    • Every job runner for a particular worker has the same type for its context.
    • Internally, the runner function is wrapped in some other code which manages things like spawning a task and handling the job completion, and also removing some of the need to explicitly deal with the unique Future type returned from each different async function.
    • Eventually there will probably be a proc macro that can automatically create a full runner from the function, but for now you have to create it manually. It's just a. single line of code though, so not too bad.
  • Job Workers
    • Workers run independently of each other, and are linked to the queue primarily by the notification channels that indicate when new jobs become available.
    • The worker runs a loop that waits for pending jobs if it has enough capacity to run them, and otherwise just waits for one of its own jobs to finish.
    • Cribbing from sqlxmq, workers have both a min concurrency and a max concurrency.
      • The max concurrency is the highest number of jobs it will run, and the worker will fetch jobs again when the number of jobs running falls below the min concurrency.
      • Jobs with weight > 1 will also count more against the max concurrency.
        • e.g. an encoding job that uses a lot of CPU might have a higher weight than smaller tasks like sending an email.
  • Algorithms
    • Running a pending task
      • Get a task from the pending table which has waiting job runners for the task types.
      • Pull a client off the waiting list and send the task to it.
      • If the task has closed its receiver, then try the next one, and if none of them work, leave the task on the pending list.
      • If the send succeeds, put the task onto the running list.
      • Need to figure out the exact best way to do this
        • Delete from pending list right away and then add to running list once it succeeds
        • Or find the row first, and wait to delete once it succeeds
          • This might work better if we're taking a batch of rows at a time, especially since we may run out of waiting clients for a particular task type.
  • Queue Internal Tasks
    • Move pending jobs to running when they reach their "run at" time.
      • This moves jobs in batches until there aren’t any left to move, then waits until the next “run at” time to do anything else. It also needs to be updated whenever that time changes.
    • Sweeper for expired tasks
      • Update retry data and move them from running back to pending
    • Sweeper for deleting old data on successful tasks
      • Optional but keeps the DB from blowing up too much.
      • Probably have some hook to send "done" task data somewhere external too.
    • Sweeper for recurring jobs which should be scheduled or running, but are not.
      • Create a new entry in pending for these (and probably log something since other mechanisms should really handle this)

Thanks for reading! If you have any questions or comments, please send me a note on Twitter or Mastodon.