dotpipe¶
Composable transformation pipelines
Part of the Shape pillar, dotpipe enables building complex data transformations through function composition.
Overview¶
dotpipe provides a clean, functional approach to chaining data transformations, making complex operations readable and maintainable.
Basic Usage¶
from shape.dotpipe import pipe, compose
# Create a pipeline
pipeline = pipe(
lambda x: x * 2, # Double
lambda x: x + 10, # Add 10
lambda x: x / 2 # Halve
)
result = pipeline(5) # ((5 * 2) + 10) / 2 = 10
Working with Nested Data¶
Combine with other dotsuite tools:
from shape.dotpipe import pipe
from shape.dotmod import set_, update
from depth.dotget import get
# Complex transformation pipeline
process_user = pipe(
lambda d: set_(d, "user.verified", True),
lambda d: update(d, "user.name", str.upper),
lambda d: set_(d, "user.processed_at", "2024-01-01"),
lambda d: delete(d, "user.temp_data")
)
data = {
"user": {
"name": "alice",
"verified": False,
"temp_data": "cleanup"
}
}
result = process_user(data)
# {
# "user": {
# "name": "ALICE",
# "verified": True,
# "processed_at": "2024-01-01"
# }
# }
Compose vs Pipe¶
pipe: Left-to-right composition (first → last)compose: Right-to-left composition (last → first)
from shape.dotpipe import pipe, compose
# These are equivalent:
pipeline1 = pipe(f, g, h) # h(g(f(x)))
pipeline2 = compose(h, g, f) # h(g(f(x)))
Partial Pipelines¶
Build reusable transformation components:
# Reusable transformations
normalize_user = pipe(
lambda d: update(d, "email", str.lower),
lambda d: update(d, "name", str.title)
)
add_timestamps = pipe(
lambda d: set_(d, "created_at", now()),
lambda d: set_(d, "updated_at", now())
)
validate_user = pipe(
lambda d: d if get(d, "email") else raise_error("Email required"),
lambda d: d if get(d, "name") else raise_error("Name required")
)
# Combine into full pipeline
process_new_user = pipe(
normalize_user,
validate_user,
add_timestamps
)
Real-World Examples¶
Data Cleaning Pipeline¶
clean_product_data = pipe(
# Normalize strings
lambda d: update(d, "name", lambda n: n.strip().title()),
lambda d: update(d, "sku", lambda s: s.upper().replace("-", "")),
# Validate price
lambda d: set_(d, "price", max(0, get(d, "price") or 0)),
# Add computed fields
lambda d: set_(d, "display_price", f"${get(d, 'price'):.2f}"),
# Remove internal fields
lambda d: delete(d, "_internal_id"),
lambda d: delete(d, "_raw_import")
)
API Response Transformation¶
transform_api_response = pipe(
# Extract data envelope
lambda r: get(r, "data") or {},
# Rename fields
lambda d: set_(d, "userId", get(d, "user_id")),
lambda d: delete(d, "user_id"),
# Flatten nested structure
lambda d: {**d, **get(d, "attributes", {})},
lambda d: delete(d, "attributes"),
# Add metadata
lambda d: set_(d, "fetched_at", datetime.now().isoformat())
)
Form Processing¶
process_form_submission = pipe(
# Sanitize inputs
lambda f: {k: v.strip() if isinstance(v, str) else v
for k, v in f.items()},
# Validate required fields
lambda f: f if all(get(f, field) for field in ["email", "name"])
else raise_error("Missing required fields"),
# Normalize email
lambda f: update(f, "email", str.lower),
# Hash password if present
lambda f: update(f, "password", hash_password) if get(f, "password") else f,
# Add metadata
lambda f: set_(f, "submitted_at", datetime.now()),
lambda f: set_(f, "ip_address", get_client_ip())
)
Async Pipelines¶
For async transformations:
from shape.dotpipe import async_pipe
process_async = async_pipe(
fetch_user_data, # async function
enrich_with_api, # async function
save_to_database # async function
)
result = await process_async(user_id)
Mathematical Foundation¶
dotpipe implements function composition from category theory:
- Associativity: pipe(f, pipe(g, h)) = pipe(pipe(f, g), h)
- Identity: pipe(identity, f) = f = pipe(f, identity)
- Functorial: Preserves structure of transformations