Effectum

Written — Updated
  • url: https://github.com/dimfeld/effectum
  • A job queue based on SQLite, which is embeddable as a library and eventually will run standalone as well.
  • Task List

    • Up Next

      • Status callbacks
        • e.g. to report failures to Sentry, update some other status table with job status
    • Soon

      • A job can reschedule itself to run again
      • Server mode: allow adding jobs remotely
      • Server mode: Job workers can be in other processes that pull jobs from the server
      • Make it easy to launch jobs on an orchestrator (Fly Machines, AWS Lambda, etc.)
    • Later/Maybe

      • Enqueue jobs using Postgres transactional outbox pattern
        • This is mostly done but isn't important until server mode is done
      • Pluggable storage engine
        • Would require rewriting most of the library but might want to run against Postgres at some point.
        • Probably wont do this.
      • Wrapper function which can enqueue a job, wait for it to finish, and return the result from the job
        • This is only really useful when the worker task is running outside the current process. Otherwise just spawn a task and do it there.
      • Use from Node.js
      • Examples
        • Example: Retries due to failure
        • Example: Job priority
        • Example: Graceful shutdown
        • Example: Checkpoint functionality
        • Example: Expired jobs retry
    • Done

  • Server Mode

    • If you need to scale up, ability to run the queue as its own server.
    • This should present the same (or nearly the same) interface as the local mode, to make it easy to switch from one to the other.
    • Workers will be remote processes and communicate to the server over gRPC or normal HTTP
      • Use long polling or gRPC streaming here for workers that wait for pending jobs?
  • Database Structures

    • Base Data

      • This is stored in the jobs table.
      • Fields
        • job id
        • external job id (this is a UUID)
        • job type
          • Used to filter which workers will try to run this job.
        • job priority
          • Higher priority jobs will go first when multiple jobs can be run.
        • weight
          • How much this job counts against a worker's concurrency limit. Can be increased for jobds that require heavy computation.
        • job status
          • Active
            • For Active jobs, status resolves to Pending or Running depending on if the job is currently assigned to a worker in active_jobs
          • Success
          • Failed
        • recurrence id (for recurring tasks)
        • Original "run at" time
          • job that should run immediately will have this set to now
        • payload
        • payload from last checkpoint
        • current try
        • maximum retries allowed
        • retry backoff spec
          • Backoff initial time, multiplier, and randomization factor
        • time job was added
        • job timeout
          • Default expiration time when starting the task is (now + timeout)
        • heartbeat expiration increment
          • When receiving a heartbeat, the new expiration will be max of current expiration and (now + expiration increment)
          • This can be 0 to have a strict timeout, regardless of heartbeat
        • started at time
        • finished at time
        • run info (info about each failure for failed runs, info about success from a successful run)
    • Active Jobs

      • This is information specific to jobs which are running or waiting to run
      • job id
      • worker id, if currently running
      • job type (copied from the jobs table but helps make lookups more efficient when only looking for certain task types)
      • priority
      • run at time
      • started at time
      • expiration time for currently running jobs
    • Recurring Job Spec

      • Not supported yet, this is on the roadmap.
      • recurrence id
      • base data job id
        • When starting a recurring job, the row from the jobs table will be copied to a new job with a new id
      • schedule
  • In-Memory Data Structures and Tasks

    • Track "next" job run time for each job type
    • List of clients that can run for each job type
      • When a job becomes available, all the workers are notified, and then those that are currently waiting for a new job will try to grab one.
    • Database Writer
      • Instead of each task writing to the SQLite database, write access is run from a single thread, which communicates with the rest of the world using a channel. Results are returned through a oneshot associated with each request.
      • This is uniquely well suited to SQLite due to its "single writer at a time" policy. Databases with true MVCC wouldn't be as well served here.
      • But the real benefit is that this allows batching of independent requests that come in near the same time, which increases the maximum throughput of the queue by 10x-20x depending on the details.
  • Job Runners

    • Each job has a name and a runner function. The runner function is a normal async function which takes the current job and a context object (for database pools and so on).
    • Every job runner for a particular worker has the same type for its context.
    • Internally, the runner function is wrapped in some other code which manages things like spawning a task and handling the job completion, and also removing some of the need to explicitly deal with the unique Future type returned from each different async function.
    • Eventually there will probably be a proc macro that can automatically create a full runner from the function, but for now you have to create it manually. It's just a. single line of code though, so not too bad.
  • Job Workers

    • Workers run independently of each other, and are linked to the queue primarily by the notification channels that indicate when new jobs become available.
    • The worker runs a loop that waits for pending jobs if it has enough capacity to run them, and otherwise just waits for one of its own jobs to finish.
    • Cribbing from sqlxmq, workers have both a min concurrency and a max concurrency.
      • The max concurrency is the highest number of jobs it will run, and the worker will fetch jobs again when the number of jobs running falls below the min concurrency.
      • Jobs with weight > 1 will also count more against the max concurrency.
        • e.g. an encoding job that uses a lot of CPU might have a higher weight than smaller tasks like sending an email.
  • Algorithms

    • Running a pending task
      • Get a task from the pending table which has waiting job runners for the task types.
      • Pull a client off the waiting list and send the task to it.
      • If the task has closed its receiver, then try the next one, and if none of them work, leave the task on the pending list.
      • If the send succeeds, put the task onto the running list.
      • Need to figure out the exact best way to do this
        • Delete from pending list right away and then add to running list once it succeeds
        • Or find the row first, and wait to delete once it succeeds
          • This might work better if we're taking a batch of rows at a time, especially since we may run out of waiting clients for a particular task type.
  • Queue Internal Tasks

    • Move pending jobs to running when they reach their "run at" time.
      • This moves jobs in batches until there aren’t any left to move, then waits until the next “run at” time to do anything else. It also needs to be updated whenever that time changes.
    • Sweeper for expired tasks
      • Update retry data and move them from running back to pending
    • Sweeper for deleting old data on successful tasks
      • Optional but keeps the DB from blowing up too much.
      • Probably have some hook to send "done" task data somewhere external too.
    • Sweeper for recurring jobs which should be scheduled or running, but are not.
      • Create a new entry in pending for these (and probably log something since other mechanisms should really handle this)

Thanks for reading! If you have any questions or comments, please send me a note on Twitter.