Ramus

Written — Updated
  • Ramus is an in-process DAG and state machine executor designed for running agents with a focus on observability.
  • Task List

    • Up Next

      • Completely serialize and restore an agent
        • This needs to account for tool context as well, shared state between nodes, nested state machines, etc.
          • Probably split up "context" and "tools" where "context" always needs to be serializable and tools can contain functions.
          • Could go more complicated with serialize/deserialize functions but this gets messy in JS.
      • Agent visualization
        • This should be both a generic application that can visualize agents in any database with a compliant schema, and also a set of components that make it easy to do so in a customized fashion.
    • Soon

      • Helper function for simple chain -> DAG
      • Chatbot interface that can be adapted to web, Slack, Discord, etc.
        • Standard set of incoming and outgoing events for this case
        • Implement serialization
        • Multiple levels of verbosity
      • Serializable runner functions
        • This has the runner in the config as an object instead of a function, where the object contains a key into some runner registry that points to the function, and additional arguments to the function.
        • This will be very useful for DAGs generated by agents that may not be fixed in the code.
      • Dynamic node type that can expand its inputs into a DAG or state machine
      • Cache system
        • Mainly for tools that make network requests and for LLMs. This is mostly a debugging feature but in some cases will probably be useful for other things.
        • LLM request caching could be built into chronicle
        • Support in-memory, file-system, and network like Redis
    • Later/Maybe

      • Replay a DAG from any point in its execution
        • Basically just rerunning a node from its inputs and setting things up so that the descendant nodes will all run appropriately too.
      • Replay a state machine from any point in its execution
        • At each step, save the input/output and a diff of the context.
      • Update inputs and rerun only affected parts of a DAG
        • This adds a new config key to nodes, where they can indicate which keys in rootInput they care about
      • Ability to edit a node's output that wasn't what the user wanted
        • Say, if a node got the wrong result and the user wants to correct it.
        • Ability to cancel just a single node and its downstream
        • Modify output of any node and rerun downstream nodes in the DAG
        • Make "cancelled" state non-permanent. Nodes return to a ready state or something when a cancel occurs.
      • Dynamic modification of a DAG or state machine
        • Add new nodes to the existing DAG or state machine. This shouldn't be too hard to do and will help with introspection if doing dynamic planning.
        • This should also be able to take another DAG configuration and merge it into the current one, along with support for mapping or prefixing the node names so that multiple copies of a DAG can be merged.
        • Where do the outputs of the merged DAG go? Need to think more about how this makes sense and fits together.
        • A simple version of this is the node that is dynamically configured as a subgraph. This probably makes the most sense as a longer-term solutiona s well.
      • Look at integrating with https://github.com/transitive-bullshit/agentic tools
      • Short term memory system
        • Sometimes we want to do RAG or something while an agent executes but won't need most of this data afterward. This basically becomes the ability to purge and/or archive the RAG data after we don't need it anymore.
        • One-file-per-agent using LanceDB or something like that could work well here.
      • Script to parse DAG dependencies and generate a Typescript interface with proper types for the inputs and outputs of each node
      • Manage multiple related state machines
        • e.g. multiple agents running similar tasks on different input for a usre
        • or multiple parts of a single plan all executing concurrently
      • Send an event to multiple state machines based on some criteria
        • This should allow some level of arbitrary criteria, like we want to be able to select based on an organization, a user, workflow type, start time, some other tag
      • Global rate limiting of active tasks
      • Evals
        • Design conversation
        • Eval database tables
          • Eval job - name, time, source (CI, manual, etc), user, optional description
          • Eval item - a single item from an eval job that links to a run. Contains additional info around the run
          • Eval score - one score for an eval run
          • Eval data set - inputs and outputs
          • Need ability to attach extra scores by manual curation.
          • Associate metadata with a run
        • Views:
          • compare results of a particular eval across different runs
          • Results for a particular eval over time
          • Total results for a particular category of run over time (e.g. for a particular workflow, the trend of success rate)
          • All results for a run
          • Filter by whatever, including metadata values
    • Done

      • Chronicle function for converting metadata into span attributes
      • Convert to multipackage monorepo
      • Allow overriding name of DAG or state machine on each run
      • Basic State machine support
        • Configuration
        • Function to execute one step of the state machine
        • Ability to call a DAG, other state machines, or an arbitrary promise-returning function in a state
        • Tracing/events on all transitions
        • Configurable transitions
        • Global error handler/state
        • Configurable override of error transition on a per-state basis
        • Send events to state machines
      • Semaphores per class of node
      • Stepwise mode for DAGs
      • Some way to send events out while still running (e.g. to send updates to the user)
      • Tracing spans for every state
      • Log state transitions, inputs, and outputs to a database
      • DAG excutor
      • Spawn state machines or DAGs from an existing task and wait for them to finish
        • This is just done via normal code, nothing special.
  • The main features are:
    • "human in the loop" states
    • cyclic workflow graphs
      • with some control on the number of steps taken
    • Fully embrace "workflow as state machine" rather than just using them behind the scenes
    • arbitrary delays
    • rate limiting on how quickly an agent can make requests?
  • What do we really need a workflow engine for?
    • Arbitrary delays e.g. resume after a month
    • Stop to get user input, and resume
      • Sort of. This makes it somewhat easier but a simpler system can accomplish the same thing.
    • Global rate limiting
    • Long-running tasks so we can resume from somewhere on failure
      • This can be somewhat emulated with checkpoints or by doing a state machine inside the worker and having it save its state on every transition
    • Events that can be sent to multiple dormant workflows based on some tag
      • Can be done manually though
  • A good first effort would be to write agents as state machines where the state machine runs in the agent itself instead of being externally orchestrated
    • This simplifies the running a lot, especially for agents that don't need to sleep for a long time.
    • This leaves the external component as something that can resume and run a state machine, which doesn't have to be generic at first. It can just be a normal call to the service that runs it.
    • Downsides:
      • Not resilient to full process crash, although with an external sweeper this could be mollified somewhat
      • Are there any downsides around spawning sub-state machines? I think the agent software would have to be designed to allow resuming from a state like this, but otherwise it wouldn't be a problem.
    • Take a look at napi-rs or neon or maybe even just in WASM, to see if we can write a lot of this in Rust. Would simplify code reuse and reliability in some ways and look forward to more full-featured plans. Might be more complex than it's worth though, need to see.
    • Initial Layout
      • This is a library which can run an arbitrary state machine and persist the config and state into a database
      • Two modes
        • State machine
        • DAG
          • The difference is that because a DAG is non-looping, we can just run all the root nodes concurrently and then run other nodes as soon as their parent dependencies are done.
      • Each node in a state machine or DAG can spawn other DAGs and state machines, and wait for them to finish
      • Support running either preconfigured machine from the database or taking an ad hoc state machine
      • Some way to send events to a paused state machine to start it up again
      • State machines can run other state machines
        • Need to figure out how much explicit support this really needs
      • Ensure that we're never processing more than one state machine event at a time (except perhaps a cancel event)
      • When entering a state, supply
        • Previous state name
        • Current machine context
        • Data emitted from the previous state for this state
        • Data emitted from any submachine results that ran between the previous and current state.
      • When leaving a state, the function returns
        • Next state
        • Data for that state
        • Context updates
        • Submachines to run before transitioning, if any, and their inputs
      • Create a tracing span for every state
      • Global error state
      • States can override the global error state with some other state
      • Parallel states?
        • Need to figure out the right configuration for merging these back into a single state again though.
        • Probably easier if either
          • These are just run from within a state
            • But then it's just a normal call. Works but not great
          • These are submachines which run to completion and there's a fixed state on the other end that receives all the results.
    • Library V2
      • Eventually have preconfigured blocks for certain common things, that can be called without any custom code
        • Maybe do this sooner than later depending on how agent planning goes
      • Built-in loop control with loop count limit.
        • Probably do loops as a submachine since the scoped context is useful here
      • Submachines
        • Scoped context for groups of states
        • Also useful for embedding state machines from other sources into a larger workflow.
  • A particular instance of a workflow can have its own priority
  • Workflows can stream events out to the caller
  • Optionally use a time-weighted priority system for queuing
  • Model workflows as a Moore state machine
    • Probably will end up with a hybrid Moore/Mealy though
  • State executor code can run child state machines
    • Have some way to time out on these, like we run 10 searches with a timeout of 15 seconds and when we reach the timeout use whatever results have come back.
    • Each submachine will run in parallel
    • I think it's best if these child machines are actually their own separate machines which can be spawned from the state, rather than part of the parent state machine config. This is because many of these child machines will be used by multiple workflows.
  • States have categories (eg makes an LLM call or not)
  • Rate limits global, per workflow type, and per category
  • Scoped context so that a set of sub-machines can have its own context separate from the global context
  • Workflow and state priority
  • States can define a transition to a particular state on timeout
  • Support websockets or 2-way gRPC for simpler one-off runs where we don't really want to integrate a whole web server
  • Support running as a library for cases where the agent is in Rust
  • States and Transitions can log events for analysis
    • Things like "agent required correction", "this is an error", "user accepted the answer"
  • Each state (or state transition? whatever runs code) also has an "error" property which defines the state that it goes to upon an uncaught error. The state machine as a whole also has an error state which can perform some action.
  • jobs register their state machine with a version and a place to call back when triggering states. Then anything can start the job with some initial context.
    • Allow upserting a config where it makes a new version if the config is different from the latest version.
  • Some way to generate typescript types from state machine config
    • This should include a list of permitted next states from any particular state
  • Each step returns:
    • next state. There should be some way of type checking the returned value from the worker.
    • Input for the next state, specific to that state?
    • Conditions to go to next state:
      • Optional delay before next transition. This can be encoded in the state machine config but also overridden
      • Some kind of polling of an endpoint?
        • Maybe the orchestrator calls an endpoint on some backoff schedule and depending on status code it either goes ot the next state or tries again. Need to think about this it might not be too useful
    • Context updates
  • Bot Abstraction

    • Keep track of conversations.
    • Each "server" is a mapping of a platform (Discord, Slack, etc) and the ID of the team (Guild, Team, Org, etc.) on that platform to one of our orgs.
    • Each conversation has:
      • Our own ID
      • The server it belongs to
      • Which user started it
      • The ID of the conversation in the platform. This should be JSON since it could have multiple components (e.g. a channel and thread ID in Discord) but also should be easy to look up.
        • Might be best to just use separate tables for each platform for the platform-specific part. We do still need an indication in the main conversation table of which platform a conversation belongs to so we don't have to look it up in every table.
    • Sending events
      • Figure out which platform the conversation belongs to
      • Send the conversation ID and the standardized event to the platform adapter
      • Platform adapter translates it to the platform-specific format and sends it.
    • Receiving an event
      • Look up the conversation, if one exists
      • Translate the event to the standard format
      • send the conversation ID and the event to the main layer
  • Notes from Phind

    • Here are some suggestions to improve and expand on your notes for a workflow orchestrator that models workflows as Moore state machines: 1. State Machine Definition:
      • Allow defining state machines using a declarative format (e.g., YAML or JSON) for easy configuration and versioning.
      • Include additional properties for each state, such as:
        • State name and description
        • Input and output data schemas for type checking and validation
        • Error handling and retry mechanisms
        • Timeouts and maximum execution time
      • Support nested state machines or sub-workflows for modular and reusable components. 2. Step Categories and Rate Limiting:
      • Define a clear set of step categories based on the type of operation (e.g., LLM calls, database queries, external API calls).
      • Implement rate limiting at both the global and category levels to control resource utilization and prevent abuse.
      • Allow configuring rate limits per category and provide options to adjust them dynamically based on load or priority. 3. Job Priority and Scheduling:
      • Introduce a priority system for jobs to determine their execution order and resource allocation.
      • Support different priority levels (e.g., low, medium, high) and allow assigning priorities to individual jobs or workflows.
      • Implement a scheduler that takes into account job priorities and available resources to optimize execution order and minimize waiting times. 4. Callback Mechanism:
      • Provide a flexible callback mechanism for triggering state transitions and notifying external systems.
      • Allow registering multiple callback endpoints or functions for different events (e.g., state entry, state exit, job completion).
      • Support various callback protocols (e.g., HTTP, WebSocket, message queues) to integrate with different systems. 5. Context Management:
      • Enhance the context object to store and pass data between states and steps.
      • Allow defining default context values and merging them with job-specific context during execution.
      • Provide methods to update, retrieve, and delete context variables within steps. 6. Error Handling and Recovery:
      • Implement robust error handling mechanisms to handle failures gracefully and prevent workflow disruptions.
      • Allow defining error handling strategies per state or step (e.g., retry, skip, abort).
      • Support compensating actions or rollback mechanisms to revert partial changes in case of failures. 7. Monitoring and Observability:
      • Provide built-in monitoring and logging capabilities to track workflow execution, performance metrics, and errors.
      • Expose APIs or dashboards for real-time monitoring and visualization of workflow progress and status.
      • Integrate with external monitoring and alerting systems for centralized observability and incident management. 8. Scalability and Distribution:
      • Design the orchestrator to be scalable and distributed to handle a large number of concurrent workflows.
      • Implement load balancing and horizontal scaling mechanisms to distribute workload across multiple nodes or workers.
      • Consider using distributed locking and coordination mechanisms (e.g., ZooKeeper, etcd) for managing shared resources and ensuring consistency. 9. Testing and Debugging:
      • Provide testing utilities and frameworks to facilitate unit testing and integration testing of workflows and steps.
      • Allow simulating and debugging workflows in a controlled environment without affecting production systems.
      • Implement logging and tracing capabilities to aid in troubleshooting and identifying issues during development and production. 10. Documentation and Examples:
        • Create comprehensive documentation that explains the concepts, architecture, and usage of the workflow orchestrator.
        • Provide clear examples and tutorials demonstrating how to define state machines, implement steps, and integrate with external systems.
        • Maintain a collection of common workflow patterns and best practices to guide users in designing efficient and maintainable workflows. These suggestions aim to enhance the functionality, reliability, and usability of your workflow orchestrator. They cover aspects such as state machine definition, rate limiting, job scheduling, error handling, monitoring, scalability, testing, and documentation. Incorporating these features and improvements will make your orchestrator more robust, flexible, and suitable for a wide range of workflow orchestration scenarios.

Thanks for reading! If you have any questions or comments, please send me a note on Twitter.