This section provides detailed documentation of the core classes and utility functions used in GenSphere.
Module: genflow.py
The GenFlow
class is responsible for parsing YAML workflow definitions, constructing an execution graph, and executing nodes in the correct order. It manages the overall workflow execution process.
class GenFlow:
def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):
# Initialization code
def parse_yaml(self):
# Parses the YAML data and constructs nodes
def build_graph(self):
# Builds the execution graph
def run(self):
# Executes the nodes in topological order
def __init__(self, yaml_file, functions_filepath=None, structured_output_schema_filepath=None):
Parameters:
yaml_file
(str): Path to the main YAML file defining the workflow.functions_filepath
(str, optional): Path to the Python file containing custom function definitions.structured_output_schema_filepath
(str, optional): Path to the Python file containing structured output schemas.Description:
Initializes the GenFlow
instance by loading the YAML data and preparing the environment for execution. It verifies the validity of provided file paths and ensures that the necessary files are accessible.
Raises:
FileNotFoundError
: If the provided functions_filepath
or structured_output_schema_filepath
does not exist.ValueError
: If the provided file paths are not .py
files.parse_yaml
def parse_yaml(self):
Description:
Parses the YAML data from the main workflow file and constructs the nodes for execution. It also checks for the presence of nested workflows (yml_flow
nodes) and composes them using YamlCompose
if necessary. Validates the YAML file for consistency before parsing.
Raises:
Exception
: If the YAML file fails consistency checks.Example Usage:
flow = GenFlow('workflow.yaml', 'functions.py', 'schemas.py')
flow.parse_yaml()
build_graph
def build_graph(self):
Description:
Builds a directed acyclic graph (DAG) representing the execution order of nodes based on their dependencies. It adds nodes and edges to the graph according to the dependencies identified during parsing.
Raises:
ValueError
: If the graph contains cycles or if a node depends on an undefined node or variable.Example Usage:
flow.build_graph()
run
def run(self):
Description:
Executes the nodes in the order determined by the topological sort of the execution graph. It renders the parameters for each node using the outputs of previously executed nodes and handles iterative execution for nodes processing lists.
Raises:
Exception
: If there are cycles in the graph or if an error occurs during node execution.Example Usage:
flow.run()
After execution, the outputs from each node are stored in the outputs
attribute of the GenFlow
instance.
Module: genflow.py
The Node
class represents an individual operation or step within the workflow. It encapsulates the logic required to execute that step, including parameter rendering and function execution.
class Node:
def __init__(self, node_data):
# Initialization code
def set_flow(self, flow):
# Sets reference to the GenFlow instance
def get_dependencies(self, node_names):
# Retrieves the dependencies of the node
def render_params(self, outputs, env):
# Renders the parameters using previous outputs
def execute(self, params):
# Executes the node based on its type and parameters
def __init__(self, node_data):
Parameters:
node_data
(dict): Dictionary containing the node’s configuration from the YAML file.Description:
Initializes the Node
instance with the given configuration. It extracts essential information such as the node’s name, type, outputs, and parameters.
set_flow
def set_flow(self, flow):
Parameters:
flow
(GenFlow): Reference to the GenFlow
instance managing the workflow execution.Description:
Sets the reference to the GenFlow
instance, allowing the node to access shared resources and configurations during execution.
get_dependencies
def get_dependencies(self, node_names):
Parameters:
node_names
(Iterable[str]): Iterable of all node names in the workflow.Returns:
dependencies
(Set[str]): Set of node names that the current node depends on.Description:
Analyzes the node’s parameters to determine which other nodes it depends on. This is used to build the execution graph and ensure correct execution order.
Example Usage:
dependencies = node.get_dependencies(flow.nodes.keys())
render_params
def render_params(self, outputs, env):
Parameters:
outputs
(dict): Outputs from previously executed nodes.env
(jinja2.Environment): Jinja2 environment used for templating.Returns:
rendered_params
(dict or list of dicts): Parameters with values rendered using the outputs of previous nodes.Description:
Renders the node’s parameters by substituting placeholders with actual values from previous outputs. Supports handling of indexed parameters and lists for iterative processing.
Raises:
ValueError
: If a referenced variable is not found or is not iterable when expected.execute
def execute(self, params):
Parameters:
params
(dict): Parameters to be used for the node execution.Returns:
outputs
(dict): Dictionary of outputs produced by the node execution.Description:
Executes the node based on its type:
function_call
nodes, it executes a Python function.llm_service
nodes, it interacts with an LLM service like OpenAI.Delegates to specific execution methods depending on the node type.
Raises:
NotImplementedError
: If the node type is not supported.Exception
: If an error occurs during execution.Example Usage:
outputs = node.execute(rendered_params)
Module: yaml_utils.py
The YamlCompose
class is responsible for composing multiple YAML workflow files into a single unified workflow. It resolves references to nested workflows (yml_flow
nodes) and adjusts node names and parameters to ensure uniqueness and consistency.
class YamlCompose:
def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):
# Initialization code
def compose(self, save_combined_yaml=False, output_file='combined.yaml'):
# Starts the composition process and returns the combined YAML data
def __init__(self, yaml_file, functions_filepath, structured_output_schema_filepath):
Parameters:
yaml_file
(str): Path to the root YAML file to be composed.functions_filepath
(str): Path to the Python file containing custom functions.structured_output_schema_filepath
(str): Path to the Python file containing structured output schemas.Description:
Initializes the YamlCompose
instance and prepares for the composition process by validating the provided file paths.
Raises:
FileNotFoundError
: If the provided file paths do not exist.ValueError
: If the provided file paths are not .py
files.compose
def compose(self, save_combined_yaml=False, output_file='combined.yaml'):
Parameters:
save_combined_yaml
(bool, optional): If True
, saves the combined YAML data to a file.output_file
(str, optional): Filename to save the combined YAML data.Returns:
combined_data
(dict): The combined YAML data after composition.Description:
Starts the composition process by recursively processing the root YAML file and any nested sub-flows. Adjusts node names and parameter references to ensure uniqueness across the combined workflow.
Raises:
Exception
: If validation fails during composition.Example Usage:
composer = YamlCompose('main_workflow.yaml', 'functions.py', 'schemas.py')
combined_yaml_data = composer.compose(save_combined_yaml=True, output_file='combined.yaml')
After composition, the combined YAML file can be executed as a single workflow.
Module: visualizer.py
The Visualizer
class provides a graphical representation of GenSphere workflows using a web-based interface powered by Dash and Cytoscape. It allows users to visualize nodes, their types, dependencies, and inspect details of each node interactively.
class Visualizer:
def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):
# Initialization code
def start_visualization(self):
# Starts the Dash application for visualization
def __init__(self, yaml_file=None, functions_filepath=None, structured_output_schema_filepath=None, address='127.0.0.1', port=8050):
Parameters:
yaml_file
(str, optional): Path to the YAML file defining the workflow.functions_filepath
(str, optional): Path to the Python file containing custom function definitions.structured_output_schema_filepath
(str, optional): Path to the Python file containing structured output schemas.address
(str, optional): The IP address to host the Dash app (default: '127.0.0.1'
).port
(int, optional): The port to host the Dash app (default: 8050
).Description:
Initializes the Visualizer
instance by setting up the necessary file paths and loading the user-provided functions and schemas. It validates the existence and correctness of the provided files and prepares the environment for visualization.
Raises:
FileNotFoundError
: If any of the provided file paths do not exist.ValueError
: If the provided files are not .py
files.Example Usage:
from gensphere.visualizer import Visualizer
viz = Visualizer(
yaml_file='workflow.yaml',
functions_filepath='functions.py',
structured_output_schema_filepath='schemas.py',
address='127.0.0.1',
port=8050
)
start_visualization
def start_visualization(self):
Description:
Starts the Dash application for visualizing the GenSphere workflow. The application provides an interactive interface where nodes are displayed graphically, and users can click on nodes to view detailed information such as parameters, outputs, functions, and schemas.
Features:
Example Usage:
viz.start_visualization()
After running this method, navigate to http://127.0.0.1:8050
in your web browser to view the visualization.
Notes:
address
and port
are accessible.Module: hub.py
The Hub
class provides an interface to interact with the GenSphere Hub platform. It allows users to push workflows to the hub, pull workflows from the hub, and check the number of times a workflow has been pulled.
class Hub:
def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):
# Initialization code
def push(self, push_name=None):
# Pushes the workflow to the GenSphere Hub
def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):
# Pulls a workflow from the GenSphere Hub
def count_pulls(self, push_id):
# Retrieves the total number of times a push has been pulled
def __init__(self, yaml_file=None, functions_file=None, schema_file=None, api_base_url='http://genspherehub.us-east-1.elasticbeanstalk.com/'):
Parameters:
yaml_file
(str, optional): Path to the YAML file to be pushed.functions_file
(str, optional): Path to the functions file to be pushed.schema_file
(str, optional): Path to the schema file to be pushed.api_base_url
(str, optional): Base URL for the GenSphere Hub API.Description:
Initializes the Hub
instance with the provided file paths and API base URL. Prepares the instance for pushing and pulling workflows to and from the GenSphere Hub platform.
Example Usage:
from gensphere.hub import Hub
hub = Hub(
yaml_file='workflow.yaml',
functions_file='functions.py',
schema_file='schemas.py'
)
push
def push(self, push_name=None):
Parameters:
push_name
(str, optional): A descriptive name for the workflow being pushed.Returns:
result
(dict): A dictionary containing the push_id
and a list of uploaded files.Description:
Pushes the specified workflow files to the GenSphere Hub. Validates the YAML file for consistency before pushing. The push_id
returned can be used to pull the workflow or check its pull count.
Raises:
ValueError
: If no yaml_file
is provided or if the functions or schema files are not .py
files.Exception
: If validation fails or if an error occurs during the push.Example Usage:
result = hub.push(push_name='My Awesome Workflow')
push_id = result.get('push_id')
print(f"Workflow pushed with push_id: {push_id}")
pull
def pull(self, push_id, save_to_disk=True, yaml_filename=None, functions_filename=None, schema_filename=None, download_path="."):
Parameters:
push_id
(str): The push_id
of the workflow to pull.save_to_disk
(bool, optional): If True
, saves the pulled files to disk (default: True
).yaml_filename
(str, optional): Custom filename for the YAML file.functions_filename
(str, optional): Custom filename for the functions file.schema_filename
(str, optional): Custom filename for the schema file.download_path
(str, optional): Directory to save the pulled files (default: "."
).Returns:
files_content
(dict): A dictionary containing the contents of the pulled files.Description:
Pulls a workflow from the GenSphere Hub using the provided push_id
. Optionally saves the files to disk with custom filenames. Ensures that existing files are not overwritten by appending a counter if necessary.
Raises:
Exception
: If an error occurs during the pull operation.Example Usage:
files = hub.pull(
push_id=push_id,
save_to_disk=True,
yaml_filename='downloaded_workflow.yaml',
functions_filename='downloaded_functions.py',
schema_filename='downloaded_schemas.py'
)
count_pulls
def count_pulls(self, push_id):
Parameters:
push_id
(str): The push_id
of the workflow to check.Returns:
pull_count
(int): The total number of times the workflow has been pulled.Description:
Retrieves the total number of times a workflow has been pulled from the GenSphere Hub using the provided push_id
.
Raises:
Exception
: If an error occurs during the request.Example Usage:
pull_count = hub.count_pulls(push_id=push_id)
print(f"The workflow has been pulled {pull_count} times.")
This section documents the utility functions used within GenSphere, primarily for internal processing and validation.
Module: genflow.py
def get_function_schema(func):
Parameters:
func
(function): The Python function object to generate a schema for.Returns:
function_def
(dict): A dictionary representing the function definition, including name, description, and parameters.Description:
Generates a schema for a given function by inspecting its signature and docstring. This schema is used for OpenAI’s function calling feature in LLM service nodes. It ensures that the function parameters are properly typed and documented.
Raises:
ValueError
: If a parameter lacks a type annotation or if the function lacks a docstring.Example Usage:
Used internally when preparing function definitions for OpenAI’s function calling.
Module: yaml_utils.py
def validate_yaml(
yaml_file,
functions_filepath=None,
structured_output_schema_filepath=None,
parent_node_names=None,
visited_files=None,
parent_params=None,
parent_node_outputs=None
):
Parameters:
yaml_file
(str): Path to the YAML file being validated.functions_filepath
(str, optional): Path to the functions file.structured_output_schema_filepath
(str, optional): Path to the schemas file.parent_node_names
(Set[str], optional): Set of node names from the parent flow.visited_files
(Set[str], optional): Set of visited YAML files to prevent circular references.parent_params
(Set[str], optional): Set of parameter names passed from the parent flow.parent_node_outputs
(Dict[str, List[str]], optional): Dictionary of node outputs from parent flows.Returns:
validated
(bool): True
if validation passes, False
otherwise.error_msgs
(List[str]): List of error messages encountered during validation.node_outputs
(Dict[str, List[str]]): Dictionary of node outputs in the current flow.Description:
Validates a YAML workflow file and any associated sub-flows for consistency and correctness. Checks for issues such as:
name
, type
).Raises:
FileNotFoundError
: If referenced files do not exist.ValueError
: If the YAML structure is invalid.Example Usage:
Used internally before executing or composing workflows to ensure they are valid.
Module: yaml_utils.py
def collect_referenced_nodes_and_outputs(params):
Parameters:
params
(dict): Parameters dictionary from a node.Returns:
referenced_nodes_outputs
(Set[Tuple[str, str]]): A set of tuples containing referenced node names and outputs.Description:
Analyzes the parameters of a node to identify all referenced nodes and their outputs, which is essential for validating dependencies and ensuring that all references are valid.
Module: yaml_utils.py
def collect_used_params(yaml_data):
Parameters:
yaml_data
(dict): The YAML data of a workflow.Returns:
used_params
(Set[str]): A set of parameter names used within the workflow.Description:
Collects all parameter names that are used in the workflow, particularly in the context of nested workflows (yml_flow
nodes). This helps in validating that all required parameters are provided.
Module: yaml_utils.py
def collect_referenced_params(params):
Parameters:
params
(dict): Parameters dictionary from a node.Returns:
referenced_params
(Set[str]): A set of parameter names referenced in the parameters.Description:
Identifies all parameter names that are referenced within the node’s parameters, usually in templated strings. This is used to ensure that all referenced parameters are defined.
Module: yaml_utils.py
def collect_referenced_nodes(params):
Parameters:
params
(dict): Parameters dictionary from a node.Returns:
referenced_nodes
(Set[str]): A set of node names referenced in the parameters.Description:
Identifies all node names that are referenced within the node’s parameters. This is crucial for building the execution graph and determining the correct execution order.
Module: yaml_utils.py
def load_yaml_file(yaml_file):
Parameters:
yaml_file
(str): Path to the YAML file to load.Returns:
data
(dict): The loaded YAML data.Description:
Loads the YAML data from a file and handles parsing errors. Ensures that the file exists and contains valid YAML.
Raises:
FileNotFoundError
: If the YAML file does not exist.ValueError
: If there is an error parsing the YAML file.Module: yaml_utils.py
def has_yml_flow_nodes(yaml_data):
Parameters:
yaml_data
(dict): The YAML data of a workflow.Returns:
bool
: True
if the workflow contains any yml_flow
nodes, False
otherwise.Description:
Checks whether the given YAML data contains any nested workflows (yml_flow
nodes). This helps determine if composition is necessary before execution.
Module: yaml_utils.py
def get_base_output_name(output_reference):
Parameters:
output_reference
(str): A string representing an output reference (e.g., 'countries_list[i]'
).Returns:
base_output_name
(str): The base output name extracted from the reference.Description:
Extracts the base output name from a complex output reference that may include indexing or attribute access. Used during validation to identify the actual outputs being referenced.
Module: graph_builder.py
def parse_yaml(yaml_file):
Parameters:
yaml_file
(str): Path to the YAML file to parse.Returns:
data
(dict): Parsed YAML data.Description:
Parses a YAML file and returns its content as a dictionary. Validates the existence of the file and handles parsing errors.
Raises:
FileNotFoundError
: If the YAML file does not exist.yaml.YAMLError
: If an error occurs during YAML parsing.Example Usage:
data = parse_yaml('workflow.yaml')
Module: graph_builder.py
def extract_referenced_nodes(template_str):
Parameters:
template_str
(str): A templated string containing references to other nodes (e.g., ""
).Returns:
referenced_nodes
(Set[str]): A set of referenced node names.Description:
Extracts all referenced node names from a templated string using regular expressions. Useful for identifying dependencies between nodes in a workflow.
Example Usage:
template_str = " and "
referenced_nodes = extract_referenced_nodes(template_str)
# referenced_nodes will be {'node1', 'node2'}
Module: graph_builder.py
def traverse_node_fields(node_value):
Parameters:
node_value
(Union[str, dict, list]): The node value to traverse.Returns:
referenced_nodes
(Set[str]): A set of referenced node names found within the node value.Description:
Recursively traverses a node’s fields to find all referenced node names. Handles strings, dictionaries, and lists. Used to identify all dependencies for a node.
Example Usage:
node_params = {
'param1': '',
'param2': {
'subparam': ''
}
}
referenced_nodes = traverse_node_fields(node_params)
# referenced_nodes will be {'node1', 'node2'}
Module: graph_builder.py
def identify_and_style_entrypoints_outputs(elements):
Parameters:
elements
(list): List of Cytoscape elements (nodes and edges).Returns:
elements
(list): Updated list of Cytoscape elements with styled entrypoints and output nodes.Description:
Identifies entrypoint nodes (nodes with no incoming edges) and output nodes (nodes with no outgoing edges) in the workflow graph and styles them accordingly for visualization purposes.
Example Usage:
elements = identify_and_style_entrypoints_outputs(elements)
Module: graph_builder.py
def build_graph_data(yaml_file):
Parameters:
yaml_file
(str): Path to the YAML file defining the workflow.Returns:
elements
(list): List of Cytoscape elements (nodes and edges) representing the workflow graph.Description:
Builds graph data compatible with Cytoscape from a YAML workflow definition. It processes nodes and edges, identifies dependencies, and prepares the data for visualization.
Raises:
ValueError
: If a node lacks a name or if there are duplicate node names.Example Usage:
elements = build_graph_data('workflow.yaml')
These utility functions are primarily used internally by GenSphere to process and validate workflows. Understanding them can be helpful for advanced users who wish to extend or debug the framework.
Note: When developing custom functions or schemas for use in GenSphere workflows, ensure that:
GenFlow
or YamlCompose
.For more examples and usage instructions, refer to the Tutorials.
If you have any questions or need further assistance, reach out on our GitHub Issues page.