Smelter

Written — Updated
  • This is a project to do map-reduce style calculations using a bunch of serverless workers reading from S3.
  • Task List

    • Up Next

    • Soon

    • Later/Maybe

      • Platform adapters could make it easier to build the local and platform-side code
        • Not sure how useful this actually is, but this would mostly be a preconfigured way for a binary to determine if its running in the platform or not and then run your manager code or your worker code when you're doing the single-source-code configuration
        • I think a simple abstraction would be something like:
          • The spawner has an easy way to set a "SMELTER_TASK" variable in the environment (or whatever is the equivalent) of the spawned task.
          • Provide a function that retrieves this value, and then you can match on it and run your own code. If it's not set, then you assume that you are the job manager and run that code instead.
          • Ultimately this isn't too different from doing it yourself, might be a nice convenience though especially when developing the pipeline.
      • Ability for spawner to send an event when a container goes from pending to running
      • Option to run a server within the worker container that can collect logs and stream them back to the manager
      • Option to timeout if we don't see any logs for too long
        • This depends on being able to get logs shipped from the container down to the manager.
      • AWS Lambda adapter
      • Support Fargate spot instances
        • A way for SpawnedTask to indicate that a task failure is from a spot instance shutdown
          • This can be a new error code alongside TaskFailed, call it TaskPreempted
        • Specify which capacity provider to use for Spot vs. Normal launching
        • An option to launch it as a non-spot instance if the spot instance gets preempted too much or if it takes too long to start.
        • Currently Fargate doesn't support spot on ARM
      • Convert a single SQL statement into a multi-stage set of tasks
        • This may be better done by integrating with something like Datafusion which already supports creating query plans
        • Consider also supporting Substrait query plan format
          • DuckDB supports consuming Substrait so I may be able to generate Substrait from SQL, split it into stages, and then run the nodes as needed
      • Communication between manager and worker, after worker is started
        • This would allow better inter-stage communication
        • For example stage 1 runs a bunch of "map" jobs, and stage 2 does the reduce, but both stages are started at the same time, and as the jobs in stage 1 finish their result locations are streamed to stage 2.
        • If everything is in the same VPC then this is easy; but otherwise it requires either a VPN or some other type of coordinator.
      • Clean up inter-level channels
        • There are a bunch of channels for communicating cancel state, task results, etc. between the tasks, stages, job, and manager. See if there's a way to make this cleaner.
    • Done

      • Retry inside the spawner when Fargate returns a capacity error
      • Cancel all the jobs when an error occurs, and wait for them to be cancelled
      • Handle rate exceeded errors from Fargate API
        • The Rust AWS SDK doesn't properly handle ThrottlingError and flag it for retry, but with a custom retry layer this can be done.
      • Workers automatically track RAM usage, load factor, time running, etc., and return these with the task results
      • Full test of Fargate jobs
      • Allow a cancel signal to kill all containers
      • StatusSender needs to work with a raw StatusUpdateItem, and the StatusCollector should be able to handle native StatusUpdateItem and have its own separate mechanism for the read operations. (Maybe just use a mutex here to simplify?)
      • Build fargate container
      • Have something that helps generate task definitions for fargate
      • AWS Fargate Spawner
        • How to figure out if a container succeeded or not? — exit code is exposed though there's no other way to pass back data
      • AWS Fargate Worker Framework
      • Example to run workers as local processes
      • Remove the Spawner trait altogether
        • It's no longer necessary and isn't sufficiently expressive for most spawners
  • Test Datasets

  • Initial Features

    • Split up queries across a number of workers
      • Split queries into chunks
      • Max number of concurrent workers
    • Run workers
    • Gather results
    • Reduce results using 1 or more reducers
    • Retryable workers with some threshold for retrying
      • e.g. autoretry remaining workers once the first 90% have returned
  • Pluggable Backends

    • Execution
      • AWS Lambda
      • Docker/Kubernetes/Nomad
    • Storage
      • Various Cloud storage
  • Applications

    • Query over some data and output a DuckDB file with the results for further analysis.
  • From Honeycomb's O11y Book But serverless functions and cloud object storage aren't 100% reliable. In practice, the latency at the tails of the distribution of invocation time can be significant orders of magnitude higher than the median time. That last 5% to 10% of results may take tens of seconds to return, or may never complete. In the Retriever implementation, we use impatience to return results in a timely manner. Once 90% of the requests to process segments have completed, the remaining 10% are re-requested, without canceling the still-pending requests. The parallel attempts race each other, with whichever returns first being used to populate the query result. Even if it is 10% more expensive to always retry the slowest 10% of subqueries, a different read attempt against the cloud provider's backend will likely perform faster than a "stuck" query blocked on S3, or network I/O that may never finish before it times out. What happens if someone attempts to group results by a high-cardinality field? How can you still return accurate values without running out of memory? The simplest solution is to fan out the reduce step by assigning reduce workers to handle only a proportional subset of the possible groups. For instance, you could follow the pattern Chord does by creating a hash of the group and looking up the hash correspondence in a ring covering the keyspace.


Thanks for reading! If you have any questions or comments, please send me a note on Twitter.