gensphere

API Reference

This section provides detailed documentation of the core classes and utility functions used in GenSphere.


Core Classes


GenFlow

Module: genflow.py

The GenFlow class is responsible for parsing YAML workflow definitions, constructing an execution graph, and executing nodes in the correct order. It manages the overall workflow execution process.

Class Definition

class GenFlow:
    def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):
        # Initialization code

    def parse_yaml(self):
        # Parses the YAML data and constructs nodes

    def build_graph(self):
        # Builds the execution graph

    def run(self):
        # Executes the nodes in topological order

Constructor

def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):

Parameters:

Description:

Initializes the GenFlow instance by loading the YAML data and preparing the environment for execution. It verifies the validity of provided file paths and ensures that the necessary files are accessible.

Raises:

Methods

parse_yaml
def parse_yaml(self):

Description:

Parses the YAML data from the main workflow file and constructs the nodes for execution. It also checks for the presence of nested workflows (yml_flow nodes) and composes them using YamlCompose if necessary. Validates the YAML file for consistency before parsing.

Raises:

Example Usage:

flow = GenFlow('workflow.yaml', 'functions.py', 'schemas.py')
flow.parse_yaml()
build_graph
def build_graph(self):

Description:

Builds a directed acyclic graph (DAG) representing the execution order of nodes based on their dependencies. It adds nodes and edges to the graph according to the dependencies identified during parsing.

Raises:

Example Usage:

flow.build_graph()
run
def run(self):

Description:

Executes the nodes in the order determined by the topological sort of the execution graph. It renders the parameters for each node using the outputs of previously executed nodes and handles iterative execution for nodes processing lists.

Raises:

Example Usage:

flow.run()

After execution, the outputs from each node are stored in the outputs attribute of the GenFlow instance.


Node

Module: genflow.py

The Node class represents an individual operation or step within the workflow. It encapsulates the logic required to execute that step, including parameter rendering and function execution.

Class Definition

class Node:
    def __init__(self, node_data):
        # Initialization code

    def set_flow(self, flow):
        # Sets reference to the GenFlow instance

    def get_dependencies(self, node_names):
        # Retrieves the dependencies of the node

    def render_params(self, outputs, env):
        # Renders the parameters using previous outputs

    def execute(self, params):
        # Executes the node based on its type and parameters

Constructor

def __init__(self, node_data):

Parameters:

Description:

Initializes the Node instance with the given configuration. It extracts essential information such as the node’s name, type, outputs, and parameters.

Methods

set_flow
def set_flow(self, flow):

Parameters:

Description:

Sets the reference to the GenFlow instance, allowing the node to access shared resources and configurations during execution.

get_dependencies
def get_dependencies(self, node_names):

Parameters:

Returns:

Description:

Analyzes the node’s parameters to determine which other nodes it depends on. This is used to build the execution graph and ensure correct execution order.

Example Usage:

dependencies = node.get_dependencies(flow.nodes.keys())
render_params
def render_params(self, outputs, env):

Parameters:

Returns:

Description:

Renders the node’s parameters by substituting placeholders with actual values from previous outputs. Supports handling of indexed parameters and lists for iterative processing.

Raises:

execute
def execute(self, params):

Parameters:

Returns:

Description:

Executes the node based on its type:

Delegates to specific execution methods depending on the node type.

Raises:

Example Usage:

outputs = node.execute(rendered_params)

YamlCompose

Module: yaml_utils.py

The YamlCompose class is responsible for composing multiple YAML workflow files into a single unified workflow. It resolves references to nested workflows (yml_flow nodes) and adjusts node names and parameters to ensure uniqueness and consistency.

Class Definition

class YamlCompose:
    def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):
        # Initialization code

    def compose(self, save_combined_yaml=False, output_file='combined.yaml'):
        # Starts the composition process and returns the combined YAML data

Constructor

def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):

Parameters:

Description:

Initializes the YamlCompose instance and prepares for the composition process by validating the provided file paths.

Raises:

Methods

compose
def compose(self, save_combined_yaml=False, output_file='combined.yaml'):

Parameters:

Returns:

Description:

Starts the composition process by recursively processing the root YAML file and any nested sub-flows. Adjusts node names and parameter references to ensure uniqueness across the combined workflow.

Raises:

Example Usage:

composer = YamlCompose('main_workflow.yaml', 'functions.py', 'schemas.py')
combined_yaml_data = composer.compose(save_combined_yaml=True, output_file='combined.yaml')

After composition, the combined YAML file can be executed as a single workflow.


Visualizer

Module: visualizer.py

The Visualizer class provides a graphical representation of GenSphere workflows using a web-based interface powered by Dash and Cytoscape. It allows users to visualize nodes, their types, dependencies, and inspect details of each node interactively.

Class Definition

class Visualizer:
    def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):
        # Initialization code

    def start_visualization(self):
        # Starts the Dash application for visualization

Constructor

def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):

Parameters:

Description:

Initializes the Visualizer instance by setting up the necessary file paths and loading the user-provided functions and schemas. It validates the existence and correctness of the provided files and prepares the environment for visualization.

Raises:

Example Usage:

from gensphere.visualizer import Visualizer

viz = Visualizer(
    yaml_file='workflow.yaml',
    functions_filepath='functions.py',
    structured_output_schema_filepath='schemas.py',
    address='127.0.0.1',
    port=8050
)

Methods

start_visualization

def start_visualization(self):

Description:

Starts the Dash application for visualizing the GenSphere workflow. The application provides an interactive interface where nodes are displayed graphically, and users can click on nodes to view detailed information such as parameters, outputs, functions, and schemas.

Features:

Example Usage:

viz.start_visualization()

After running this method, navigate to http://127.0.0.1:8050 in your web browser to view the visualization.

Notes:


Hub

Module: hub.py

The Hub class provides an interface to interact with the GenSphere Hub platform. It allows users to push workflows to the hub, pull workflows from the hub, and check the number of times a workflow has been pulled.

Class Definition

class Hub:
    def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):
        # Initialization code

    def push(self, push_name=None):
        # Pushes the workflow to the GenSphere Hub

    def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):
        # Pulls a workflow from the GenSphere Hub

    def count_pulls(self, push_id):
        # Retrieves the total number of times a push has been pulled

Constructor

def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):

Parameters:

Description:

Initializes the Hub instance with the provided file paths and API base URL. Prepares the instance for pushing and pulling workflows to and from the GenSphere Hub platform.

Example Usage:

from gensphere.hub import Hub

hub = Hub(
    yaml_file='workflow.yaml',
    functions_file='functions.py',
    schema_file='schemas.py'
)

Methods

push

def push(self, push_name=None):

Parameters:

Returns:

Description:

Pushes the specified workflow files to the GenSphere Hub. Validates the YAML file for consistency before pushing. The push_id returned can be used to pull the workflow or check its pull count.

Raises:

Example Usage:

result = hub.push(push_name='My Awesome Workflow')
push_id = result.get('push_id')
print(f"Workflow pushed with push_id: {push_id}")

pull

def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):

Parameters:

Returns:

Description:

Pulls a workflow from the GenSphere Hub using the provided push_id. Optionally saves the files to disk with custom filenames. Ensures that existing files are not overwritten by appending a counter if necessary.

Raises:

Example Usage:

files = hub.pull(
    push_id=push_id,
    save_to_disk=True,
    yaml_filename='downloaded_workflow.yaml',
    functions_filename='downloaded_functions.py',
    schema_filename='downloaded_schemas.py'
)

count_pulls

def count_pulls(self, push_id):

Parameters:

Returns:

Description:

Retrieves the total number of times a workflow has been pulled from the GenSphere Hub using the provided push_id.

Raises:

Example Usage:

pull_count = hub.count_pulls(push_id=push_id)
print(f"The workflow has been pulled {pull_count} times.")

Utility Functions

This section documents the utility functions used within GenSphere, primarily for internal processing and validation.


get_function_schema

Module: genflow.py

def get_function_schema(func):

Parameters:

Returns:

Description:

Generates a schema for a given function by inspecting its signature and docstring. This schema is used for OpenAI’s function calling feature in LLM service nodes. It ensures that the function parameters are properly typed and documented.

Raises:

Example Usage:

Used internally when preparing function definitions for OpenAI’s function calling.


validate_yaml

Module: yaml_utils.py

def validate_yaml(
    yaml_file,
    functions_filepath=None,
    structured_output_schema_filepath=None,
    parent_node_names=None,
    visited_files=None,
    parent_params=None,
    parent_node_outputs=None
):

Parameters:

Returns:

Description:

Validates a YAML workflow file and any associated sub-flows for consistency and correctness. Checks for issues such as:

Raises:

Example Usage:

Used internally before executing or composing workflows to ensure they are valid.


collect_referenced_nodes_and_outputs

Module: yaml_utils.py

def collect_referenced_nodes_and_outputs(params):

Parameters:

Returns:

Description:

Analyzes the parameters of a node to identify all referenced nodes and their outputs, which is essential for validating dependencies and ensuring that all references are valid.


collect_used_params

Module: yaml_utils.py

def collect_used_params(yaml_data):

Parameters:

Returns:

Description:

Collects all parameter names that are used in the workflow, particularly in the context of nested workflows (yml_flow nodes). This helps in validating that all required parameters are provided.


collect_referenced_params

Module: yaml_utils.py

def collect_referenced_params(params):

Parameters:

Returns:

Description:

Identifies all parameter names that are referenced within the node’s parameters, usually in templated strings. This is used to ensure that all referenced parameters are defined.


collect_referenced_nodes

Module: yaml_utils.py

def collect_referenced_nodes(params):

Parameters:

Returns:

Description:

Identifies all node names that are referenced within the node’s parameters. This is crucial for building the execution graph and determining the correct execution order.


load_yaml_file

Module: yaml_utils.py

def load_yaml_file(yaml_file):

Parameters:

Returns:

Description:

Loads the YAML data from a file and handles parsing errors. Ensures that the file exists and contains valid YAML.

Raises:


has_yml_flow_nodes

Module: yaml_utils.py

def has_yml_flow_nodes(yaml_data):

Parameters:

Returns:

Description:

Checks whether the given YAML data contains any nested workflows (yml_flow nodes). This helps determine if composition is necessary before execution.


get_base_output_name

Module: yaml_utils.py

def get_base_output_name(output_reference):

Parameters:

Returns:

Description:

Extracts the base output name from a complex output reference that may include indexing or attribute access. Used during validation to identify the actual outputs being referenced.


parse_yaml

Module: graph_builder.py

def parse_yaml(yaml_file):

Parameters:

Returns:

Description:

Parses a YAML file and returns its content as a dictionary. Validates the existence of the file and handles parsing errors.

Raises:

Example Usage:

data = parse_yaml('workflow.yaml')

extract_referenced_nodes

Module: graph_builder.py

def extract_referenced_nodes(template_str):

Parameters:

Returns:

Description:

Extracts all referenced node names from a templated string using regular expressions. Useful for identifying dependencies between nodes in a workflow.

Example Usage:

template_str = " and "
referenced_nodes = extract_referenced_nodes(template_str)
# referenced_nodes will be {'node1', 'node2'}

traverse_node_fields

Module: graph_builder.py

def traverse_node_fields(node_value):

Parameters:

Returns:

Description:

Recursively traverses a node’s fields to find all referenced node names. Handles strings, dictionaries, and lists. Used to identify all dependencies for a node.

Example Usage:

node_params = {
    'param1': '',
    'param2': {
        'subparam': ''
    }
}
referenced_nodes = traverse_node_fields(node_params)
# referenced_nodes will be {'node1', 'node2'}

identify_and_style_entrypoints_outputs

Module: graph_builder.py

def identify_and_style_entrypoints_outputs(elements):

Parameters:

Returns:

Description:

Identifies entrypoint nodes (nodes with no incoming edges) and output nodes (nodes with no outgoing edges) in the workflow graph and styles them accordingly for visualization purposes.

Example Usage:

elements = identify_and_style_entrypoints_outputs(elements)

build_graph_data

Module: graph_builder.py

def build_graph_data(yaml_file):

Parameters:

Returns:

Description:

Builds graph data compatible with Cytoscape from a YAML workflow definition. It processes nodes and edges, identifies dependencies, and prepares the data for visualization.

Raises:

Example Usage:

elements = build_graph_data('workflow.yaml')

Additional Information

These utility functions are primarily used internally by GenSphere to process and validate workflows. Understanding them can be helpful for advanced users who wish to extend or debug the framework.


Note: When developing custom functions or schemas for use in GenSphere workflows, ensure that:


Conclusion

For more examples and usage instructions, refer to the Tutorials.


If you have any questions or need further assistance, reach out on our GitHub Issues page.