Creating Custom Steps¶
Learn how to build your own data processing steps in Wurzel, from simple data sources to complex transformation components.
Step Types Overview¶
Wurzel provides two main types of steps:
- Data Source Steps (WurzelTips): Entry points that ingest data from external sources
- Processing Steps (WurzelSteps): Transform data from upstream steps
Both types follow the same TypedStep interface but serve different roles in your pipeline.
Step Architecture¶
The TypedStep Interface¶
All steps inherit from TypedStep with three type parameters: TSettings, TInput, TOutput. You implement:
__init__— setup (connections, resources)run(inpt)— process input and return outputfinalize— optional cleanup
from typing import Generic, TypeVar
TSettings = TypeVar("TSettings")
TInput = TypeVar("TInput")
TOutput = TypeVar("TOutput")
class TypedStep(Generic[TSettings, TInput, TOutput]):
def __init__(self) -> None:
pass
def run(self, inpt: TInput) -> TOutput:
raise NotImplementedError
def finalize(self) -> None:
pass
Step Lifecycle¶
- Initialization (
__init__): Setup connections, create resources - Execution (
run): Process data (may be called multiple times) - Finalization (
finalize): Cleanup resources, close connections
⚠️ Important: The
runmethod may be executed multiple times for different upstream dependencies. Put setup logic in__init__, notrun.
Creating Data Source Steps¶
Data source steps introduce data into your pipeline. They have None as their input type since they don't depend on previous steps.
Basic Data Source¶
from pathlib import Path
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import Settings, TypedStep
class MySettings(Settings):
DATA_PATH: Path = Path("./data")
FILE_PATTERN: str = "*.md"
class MyDatasourceStep(TypedStep[MySettings, None, list[MarkdownDataContract]]):
def __init__(self):
super().__init__()
self.settings = MySettings()
def run(self, inpt: None) -> list[MarkdownDataContract]:
documents = []
for file_path in self.settings.DATA_PATH.glob(self.settings.FILE_PATTERN):
content = file_path.read_text(encoding="utf-8")
doc = MarkdownDataContract(
md=content,
url=str(file_path),
keywords=file_path.stem,
metadata={"file_path": str(file_path)},
)
documents.append(doc)
return documents
Advanced Data Source with Database¶
Shows a source step with resource cleanup in finalize:
import sqlite3
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import Settings, TypedStep
class DatabaseSettings(Settings):
DB_PATH: str = "data.db"
TABLE_NAME: str = "documents"
QUERY: str = "SELECT content, source, metadata FROM documents"
class DatabaseSourceStep(TypedStep[DatabaseSettings, None, list[MarkdownDataContract]]):
def __init__(self):
super().__init__()
self.settings = DatabaseSettings()
self.connection = sqlite3.connect(self.settings.DB_PATH)
self.connection.row_factory = sqlite3.Row
def run(self, inpt: None) -> list[MarkdownDataContract]:
cursor = self.connection.cursor()
cursor.execute(self.settings.QUERY)
documents = []
for row in cursor.fetchall():
meta = eval(row["metadata"]) if row["metadata"] else {}
doc = MarkdownDataContract(
md=row["content"],
url=row["source"],
keywords="",
metadata=meta,
)
documents.append(doc)
return documents
def finalize(self) -> None:
if self.connection:
self.connection.close()
Creating Processing Steps¶
Processing steps transform data from upstream steps. They can filter, validate, transform, or enrich data.
Filter Step¶
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import Settings, TypedStep
class FilterSettings(Settings):
MIN_LENGTH: int = 100
MAX_LENGTH: int = 10000
REQUIRED_KEYWORDS: list[str] = []
class DocumentFilterStep(
TypedStep[FilterSettings, list[MarkdownDataContract], list[MarkdownDataContract]]
):
def __init__(self):
super().__init__()
self.settings = FilterSettings()
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
filtered_docs = []
for doc in inpt:
if (
len(doc.md) < self.settings.MIN_LENGTH
or len(doc.md) > self.settings.MAX_LENGTH
):
continue
if self.settings.REQUIRED_KEYWORDS and not all(
kw.lower() in doc.md.lower() for kw in self.settings.REQUIRED_KEYWORDS
):
continue
filtered_docs.append(doc)
return filtered_docs
Transformation Step¶
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import Settings, TypedStep
class TransformSettings(Settings):
PREFIX: str = "[processed] "
class DocumentTransformStep(
TypedStep[TransformSettings, list[MarkdownDataContract], list[MarkdownDataContract]]
):
def __init__(self):
super().__init__()
self.settings = TransformSettings()
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
return [
MarkdownDataContract(
md=self.settings.PREFIX + doc.md,
url=doc.url,
keywords=doc.keywords,
metadata=doc.metadata,
)
for doc in inpt
]
Validation Step¶
Uses a helper and raises after too many errors:
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import Settings, TypedStep
class ValidationSettings(Settings):
CHECK_ENCODING: bool = True
CHECK_STRUCTURE: bool = True
MAX_ERRORS: int = 5
class DocumentValidationStep(
TypedStep[
ValidationSettings, list[MarkdownDataContract], list[MarkdownDataContract]
]
):
def __init__(self):
super().__init__()
self.settings = ValidationSettings()
self.error_count = 0
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
valid_docs = []
for doc in inpt:
if self._validate_document(doc):
valid_docs.append(doc)
else:
self.error_count += 1
if self.error_count >= self.settings.MAX_ERRORS:
raise RuntimeError(
f"Too many validation errors: {self.error_count}"
)
return valid_docs
def _validate_document(self, doc: MarkdownDataContract) -> bool:
if self.settings.CHECK_ENCODING:
try:
doc.md.encode("utf-8")
except UnicodeEncodeError:
return False
if self.settings.CHECK_STRUCTURE and (not doc.md.strip() or len(doc.md) < 10):
return False
return True
Advanced Patterns¶
Multi-Input Steps¶
Steps can collect data from multiple upstream runs (simplified: single input list here):
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import Settings, TypedStep
class MergerSettings(Settings):
MERGE_STRATEGY: str = "concatenate"
class DocumentMergerStep(
TypedStep[MergerSettings, list[MarkdownDataContract], list[MarkdownDataContract]]
):
def __init__(self):
super().__init__()
self.settings = MergerSettings()
self.collected_inputs: list[list[MarkdownDataContract]] = []
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
self.collected_inputs.append(inpt)
if self.settings.MERGE_STRATEGY == "concatenate":
all_docs: list[MarkdownDataContract] = []
for doc_list in self.collected_inputs:
all_docs.extend(doc_list)
return all_docs
return inpt
Stateful Processing¶
State in __init__ and optional cleanup in finalize:
import hashlib
from collections import defaultdict
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import Settings, TypedStep
class DeduplicationSettings(Settings):
HASH_ALGORITHM: str = "md5"
class DeduplicationStep(
TypedStep[
DeduplicationSettings, list[MarkdownDataContract], list[MarkdownDataContract]
]
):
def __init__(self):
super().__init__()
self.settings = DeduplicationSettings()
self.seen_hashes: set[str] = set()
self.document_index: defaultdict[str, list[str]] = defaultdict(list)
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
unique_docs = []
for doc in inpt:
raw = doc.md.encode()
content_hash = (
hashlib.md5(raw).hexdigest()
if self.settings.HASH_ALGORITHM == "md5"
else hashlib.sha256(raw).hexdigest()
)
if content_hash not in self.seen_hashes:
self.seen_hashes.add(content_hash)
unique_docs.append(doc)
self.document_index[content_hash].append(doc.url)
return unique_docs
def finalize(self) -> None:
_ = len(self.seen_hashes)
_ = sum(len(sources) - 1 for sources in self.document_index.values())
Step Settings and Configuration¶
Environment Variable Integration¶
Settings fields map to env vars (prefix = step name in UPPERCASE, e.g. MYSTEP__API_KEY):
from wurzel.step import Settings
class APISettings(Settings):
API_KEY: str
BASE_URL: str = "https://api.example.com"
TIMEOUT: int = 30
MAX_RETRIES: int = 3
Nested Configuration¶
Nested settings use __ in env var names:
from wurzel.step import Settings
class DatabaseConfig(Settings):
HOST: str = "localhost"
PORT: int = 5432
DATABASE: str = "wurzel"
USERNAME: str = "user"
PASSWORD: str = "password"
class ProcessingConfig(Settings):
BATCH_SIZE: int = 100
PARALLEL_WORKERS: int = 4
class ComplexStepSettings(Settings):
database: DatabaseConfig = DatabaseConfig()
processing: ProcessingConfig = ProcessingConfig()
DEBUG_MODE: bool = False
Testing Custom Steps¶
Unit Testing¶
from pathlib import Path
from unittest.mock import patch
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import TypedStep
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
def test_markdown_step_returns_list(tmp_path: Path):
(tmp_path / "doc.md").write_text("# Hello")
with patch.dict("os.environ", {"MANUALMARKDOWNSTEP__FOLDER_PATH": str(tmp_path)}):
step = WZ(ManualMarkdownStep)
result = step.run(None)
assert isinstance(result, list)
assert len(result) == 1
assert all(isinstance(d, MarkdownDataContract) for d in result)
def test_filter_step():
from wurzel.step import Settings
class FilterSettings(Settings):
MIN_LENGTH: int = 10
class SimpleFilterStep(
TypedStep[
FilterSettings, list[MarkdownDataContract], list[MarkdownDataContract]
]
):
def __init__(self):
super().__init__()
self.settings = FilterSettings()
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
return [d for d in inpt if len(d.md) >= self.settings.MIN_LENGTH]
docs = [
MarkdownDataContract(md="Hi", url="u1", keywords="k1"),
MarkdownDataContract(md="Long enough content here", url="u2", keywords="k2"),
]
step = WZ(SimpleFilterStep)
result = step.run(docs)
assert len(result) == 1
assert result[0].md == "Long enough content here"
Integration Testing¶
from wurzel.steps.manual_markdown import ManualMarkdownStep
from wurzel.utils import WZ
def test_pipeline_structure():
source = WZ(ManualMarkdownStep)
assert source is not None
assert hasattr(source, "inputs")
Best Practices¶
Error Handling¶
Catch per-item errors and fail if too many:
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import NoSettings, TypedStep
class RobustProcessingStep(
TypedStep[NoSettings, list[MarkdownDataContract], list[MarkdownDataContract]]
):
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
processed_docs = []
errors = []
for i, doc in enumerate(inpt):
try:
processed_docs.append(doc)
except Exception as e:
errors.append(f"Failed to process document {i}: {e!s}")
if errors and len(errors) > len(inpt) * 0.5:
raise RuntimeError(f"Too many processing errors: {len(errors)}/{len(inpt)}")
return processed_docs
Resource Management¶
Use finalize() to release connections and temp files:
import os
from typing import Any
from wurzel.datacontract import MarkdownDataContract
from wurzel.step import NoSettings, TypedStep
class ResourceManagedStep(
TypedStep[NoSettings, list[MarkdownDataContract], list[MarkdownDataContract]]
):
def __init__(self):
super().__init__()
self.connection: Any = None
self.temp_files: list[str] = []
def run(self, inpt: list[MarkdownDataContract]) -> list[MarkdownDataContract]:
try:
return inpt
except Exception:
self._cleanup_resources()
raise
def finalize(self) -> None:
self._cleanup_resources()
def _cleanup_resources(self) -> None:
if self.connection is not None:
self.connection.close()
self.connection = None
for temp_file in self.temp_files:
try:
os.unlink(temp_file)
except OSError:
pass
self.temp_files.clear()
Next Steps¶
- Understand Data Contracts - Learn about type-safe data exchange
- Explore Backend Integration - Deploy your custom steps
Additional Resources¶
- API Documentation - Complete TypedStep API reference
- Built-in Steps - Examples of existing step implementations
- Testing Guidelines - Best practices for testing steps