Building Pipelines¶
Learn how to define and structure data processing pipelines in Wurzel using the intuitive chaining syntax and modular step architecture.
What is a Wurzel Pipeline?¶
A pipeline in Wurzel is a chain of processing steps that are connected and executed in sequence. Each step processes the output of the previous one, enabling modular, reusable, and optimally scheduled workflows.
Key Concepts¶
- TypedStep: Individual processing units with defined input/output contracts
- Pipeline Chaining: Steps are connected using the
>>operator - Automatic Dependency Resolution: Wurzel determines execution order automatically
Basic Pipeline Structure¶
The WZ Utility¶
Use WZ(StepClass) to create step instances and >> to chain them:
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
source = WZ(ManualMarkdownStep)
embedding = WZ(EmbeddingStep)
storage = WZ(QdrantConnectorStep)
source >> embedding >> storage
Defining a Complete Pipeline¶
Basic Example¶
Define a function that builds the chain and returns the last step. Wurzel runs upstream steps in order:
from wurzel.step import TypedStep
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
def pipeline() -> TypedStep:
md = WZ(ManualMarkdownStep)
embed = WZ(EmbeddingStep)
db = WZ(QdrantConnectorStep)
md >> embed >> db
return db
Execution order: ManualMarkdownStep → EmbeddingStep → QdrantConnectorStep.
Advanced Pipeline Patterns¶
Branching¶
One source can feed multiple downstream steps:
from wurzel.step import TypedStep
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.steps.splitter import SimpleSplitterStep
from wurzel.utils import WZ
def branching_pipeline() -> TypedStep:
source = WZ(ManualMarkdownStep)
embedding = WZ(EmbeddingStep)
splitter = WZ(SimpleSplitterStep)
vector_db = WZ(QdrantConnectorStep)
source >> embedding >> vector_db
source >> splitter
return vector_db
Conditional Processing¶
Choose steps at build time:
from wurzel.step import TypedStep
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.embedding import TruncatedEmbeddingStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
def conditional_pipeline(use_truncated: bool = False) -> TypedStep:
source = WZ(ManualMarkdownStep)
processor = WZ(TruncatedEmbeddingStep) if use_truncated else WZ(EmbeddingStep)
storage = WZ(QdrantConnectorStep)
source >> processor >> storage
return storage
Pipeline Configuration¶
Step Settings¶
Each step can be configured through environment variables or settings classes. See Creating Custom Steps for details.
Pipeline-Level Configuration¶
Use environment variables to choose steps:
import os
from wurzel.step import TypedStep
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.embedding import TruncatedEmbeddingStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
def configurable_pipeline() -> TypedStep:
source = WZ(ManualMarkdownStep)
use_truncated = os.getenv("EMBEDDING_MODEL", "").lower() == "truncated"
embedding = WZ(TruncatedEmbeddingStep) if use_truncated else WZ(EmbeddingStep)
storage = WZ(QdrantConnectorStep)
source >> embedding >> storage
return storage
Testing Pipelines¶
from wurzel.step import TypedStep
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
def test_markdown_step():
step = WZ(ManualMarkdownStep)
result = step.run(None)
assert result is not None
assert isinstance(result, list)
def pipeline() -> TypedStep:
md = WZ(ManualMarkdownStep)
embed = WZ(EmbeddingStep)
db = WZ(QdrantConnectorStep)
md >> embed >> db
return db
def test_complete_pipeline():
assert pipeline() is not None
Pipeline Optimization¶
Parallel Execution¶
Independent branches can run in parallel (backend-dependent):
from wurzel.step import TypedStep
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
def parallel_pipeline() -> TypedStep:
source = WZ(ManualMarkdownStep)
embedding_a = WZ(EmbeddingStep)
embedding_b = WZ(EmbeddingStep)
storage = WZ(QdrantConnectorStep)
source >> embedding_a >> storage
source >> embedding_b
return storage
Steps cache outputs based on input changes; backends handle persistence.
Best Practices¶
Pipeline Design¶
- Keep steps focused: Each step should have a single, clear responsibility
- Use meaningful names: Choose descriptive names for your pipeline functions and step variables
- Document data flow: Use comments to explain complex pipeline logic
- Handle errors gracefully: Implement proper error handling in custom steps
Performance Considerations¶
- Minimize data copying: Use efficient data structures and avoid unnecessary transformations
- Batch processing: Design steps to handle multiple items efficiently
- Resource management: Be mindful of memory usage in data-intensive steps
Common Patterns¶
ETL-style (extract → transform → load)¶
from wurzel.step import TypedStep
from wurzel.steps import EmbeddingStep, QdrantConnectorStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.steps.splitter import SimpleSplitterStep
from wurzel.utils import WZ
def etl_pipeline() -> TypedStep:
extractor = WZ(ManualMarkdownStep)
transformer = WZ(SimpleSplitterStep)
loader = WZ(EmbeddingStep)
storage = WZ(QdrantConnectorStep)
extractor >> transformer >> loader >> storage
return storage
Same pattern works for document ML pipelines (load → split → embed → store).
Next Steps¶
- Create Custom Steps - Learn to build your own processing components
- Understand Data Contracts - Deep dive into type-safe data exchange
- Explore Backends - Deploy your pipelines to different platforms
Additional Resources¶
- Step Examples - Real-world step implementations
- API Documentation - Complete API reference
- Backend Guides - Platform-specific deployment instructions