JAF (Just Another Flow) is a production-grade streaming data processing system for JSON/JSONL data. It implements boolean algebra over nested JSON structures with lazy evaluation, composable operations, and a fluent API. JAF is the full-featured, battle-tested implementation of the concepts explored in dotsuite.
The Relationship to Dotsuite
Think of it this way:
- dotsuite: “This is HOW it works” — Pedagogical, simple, learn-by-building
- JAF: “This is WHAT you use” — Production-ready, feature-complete, battle-tested
JAF implements the highest level of dotsuite’s architecture: boolean algebra over collections of nested documents. Where dotsuite teaches the concepts through isolated, simple tools, JAF combines them into a unified streaming framework optimized for production workloads.
The Boolean Algebra Branch
In dotsuite’s three-pillar architecture (Depth, Truth, Shape), JAF focuses on the collections layer—specifically the boolean wing that provides filtering operations with full boolean algebra:
\[ \text{filter}: (\mathcal{D} \to \mathbb{B}) \to (C \to C) \]Where:
- \(\mathcal{D}\) = Document space
- \(\mathbb{B}\) = Boolean values
- \(C\) = Collection of documents
JAF lifts boolean operations to streams with:
- AND: Intersection of filtered streams
- OR: Union of filtered streams
- NOT: Complement of filtered stream
- Composition: Chainable predicates with guaranteed homomorphism
Core Innovation: Lazy Streaming Boolean Algebra
The Problem
Traditional data processing loads entire datasets into memory:
# Eager evaluation - loads everything
all_data = load_json("huge_file.jsonl")
filtered = [d for d in all_data if d['age'] > 25]
mapped = [transform(d) for d in filtered]
This fails on large datasets and wastes resources when you only need the first 10 results.
JAF’s Solution
from jaf import stream
# Lazy evaluation - nothing executes yet
pipeline = stream("huge_file.jsonl") \
.filter(["gt?", "@age", 25]) \
.map(transform) \
.take(10)
# Only processes 10 matching items
for item in pipeline.evaluate():
process(item)
Key benefits:
- ✅ Constant memory: Processes one item at a time
- ✅ Early termination: Stops after
take(10) - ✅ Composable: Build complex pipelines declaratively
- ✅ Streaming: Works with infinite streams
Three Query Syntaxes
JAF supports multiple query syntaxes that all compile to the same internal representation:
1. S-Expression Syntax (Lisp-like)
# Simple comparisons
(eq? @status "active")
(gt? @age 25)
(contains? @tags "python")
# Boolean logic
(and
(gte? @age 18)
(eq? @verified true))
# Nested expressions
(or (eq? @role "admin")
(and (eq? @role "user")
(gt? @score 100)))
Why S-expressions?
- Unambiguous parsing (no precedence rules)
- Easy to serialize/deserialize
- Homoiconic (code is data)
- Composable ASTs
2. JSON Array Syntax
# Same queries in JSON
["eq?", "@status", "active"]
["gt?", "@age", 25]
["contains?", "@tags", "python"]
["and",
["gte?", "@age", 18],
["eq?", "@verified", true]
]
Why JSON arrays?
- Easy to generate programmatically
- Standard JSON format
- Network-transmissible
- No parsing needed
3. Infix DSL Syntax
# Natural infix notation
@status == "active"
@age > 25 and @verified == true
@role == "admin" or (@role == "user" and @score > 100)
Why infix?
- Human-readable
- Familiar syntax
- Good for CLI usage
All three compile to the same AST—use whichever fits your use case.
Advanced Path System
JAF extends dotsuite’s simple dot notation with powerful path features:
Exact Paths
stream(data).filter(["eq?", "@user.profile.name", "Alice"])
Wildcards
# Match any array element
stream(data).map("@users.*.email")
# Recursive descent
stream(data).map("@**.id")
Regex Key Matching
# Match keys by pattern
["regex_key", "^error_\\d+$"]
Fuzzy Matching
# Tolerate typos
["fuzzy_key", "username", 0.8] # 80% similarity
Array Operations
# Array slicing
"@items[0:5]"
"@items[-3:]"
# Predicate filtering
"@users[?(@.age > 25)]"
Streaming Operations
Filter — Boolean Algebra
# AND: Intersection
active_verified = stream("users.jsonl") \
.filter(["eq?", "@status", "active"]) \
.filter(["eq?", "@verified", true])
# OR: Union (using boolean logic in query)
admins_or_mods = stream("users.jsonl") \
.filter(["or",
["eq?", "@role", "admin"],
["eq?", "@role", "moderator"]
])
# NOT: Complement
inactive = stream("users.jsonl") \
.filter(["not", ["eq?", "@status", "active"]])
Map — Transformations
# Extract fields
emails = stream("users.jsonl").map("@email")
# Reshape documents
simplified = stream("users.jsonl").map(["dict",
"name", "@firstName",
"age", "@profile.age",
"verified", "@verified"
])
# Conditional transformations
categorized = stream("sales.jsonl").map(["dict",
"date", "@timestamp",
"amount", "@amount",
"category", ["if", ["gt?", "@amount", 1000], "high", "low"]
])
Take/Skip — Pagination
# First 100 items
stream("data.jsonl").take(100)
# Skip first 100, take next 100
stream("data.jsonl").skip(100).take(100)
Batch — Chunking
# Process in groups of 1000
for batch in stream("data.jsonl").batch(1000).evaluate():
bulk_insert(batch)
Windowed Operations for Memory Efficiency
JAF supports windowed operations that trade accuracy for bounded memory:
Distinct
# Exact distinct (potentially unbounded memory)
stream("data.jsonl").distinct(window_size=float('inf'))
# Windowed distinct (bounded memory, approximate)
stream("data.jsonl").distinct(window_size=1000)
Groupby
# Tumbling window groupby
stream("logs.jsonl").groupby(
key="@level",
window_size=100 # Group every 100 items
)
Set Operations
# Exact intersection
stream1.intersect(stream2, window_size=float('inf'))
# Windowed intersection (memory-efficient)
stream1.intersect(stream2, window_size=10000)
Trade-off: Finite windows provide memory bounds but may miss items if windows don’t overlap. Choose window size based on your data distribution.
Real-World Examples
Log Analysis
from jaf import stream
# Find critical errors in authentication service
critical_auth_errors = stream("app.log.jsonl") \
.filter(["and",
["eq?", "@level", "ERROR"],
["eq?", "@service", "auth"],
["contains?", "@message", "critical"]
]) \
.map(["dict",
"timestamp", "@timestamp",
"user", "@context.user_id",
"message", "@message"
]) \
.evaluate()
for error in critical_auth_errors:
alert_oncall(error)
ETL Pipeline
# Transform sales data
pipeline = stream("raw_sales.jsonl") \
.filter(["eq?", "@status", "completed"]) \
.map(["dict",
"date", ["date", "@timestamp"],
"customer_id", "@customer.id",
"amount", "@total",
"category", ["if",
["gt?", "@total", 1000],
"enterprise",
"standard"
]
]) \
.batch(1000)
for batch in pipeline.evaluate():
warehouse.bulk_insert(batch)
Data Validation
# Find invalid user records
invalid_users = stream("users.jsonl") \
.filter(["or",
["not", ["exists?", "@email"]],
["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
["lt?", "@age", 0],
["gt?", "@age", 150]
]) \
.map(["dict",
"id", "@id",
"reason", ["if",
["not", ["exists?", "@email"]],
"missing_email",
["if",
["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
"invalid_email",
"invalid_age"
]
]
]) \
.evaluate()
Complex Boolean Logic
# High-value customers: VIP OR (loyal AND high_spending)
high_value = stream("customers.jsonl") \
.filter(["or",
["eq?", "@tier", "VIP"],
["and",
["gte?", "@years_active", 3],
["gte?", "@total_spent", 10000]
]
])
Command-Line Usage
JAF excels at Unix-style pipelines:
# Filter logs
jaf filter app.log.jsonl '(eq? @level "ERROR")' --eval
# Chain operations
jaf filter users.jsonl '(eq? @status "active")' | \
jaf map - "@email" | \
jaf eval -
# Complex queries
jaf filter logs.jsonl \
'(and (eq? @level "ERROR") (gt? @timestamp "2024-01-01"))' \
--eval
# Integrate with other tools
jaf filter data.jsonl '(exists? @metadata)' --eval | \
jq '.metadata' | \
sort | uniq -c
# Batch processing
jaf filter orders.jsonl '(gt? @amount 100)' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'
Mapping Dotsuite Concepts to JAF
| Dotsuite Tool | JAF Feature | Notes |
|---|---|---|
| dotget | Path system (@path) | JAF adds regex, fuzzy, wildcards |
| dotstar | Wildcard paths | @users.*.name |
| dotfilter | .filter() method | S-expression queries |
| dotquery | Query language | More powerful, S-expressions |
| dotmod | .map() transformations | Immutable by default |
| dotpipe | Method chaining | Fluent API |
| dotpluck | Reshaping with ["dict", ...] | Transform documents |
| dotrelate | Planned (join/union) | Future feature |
Key difference: JAF is lazy by default, while dotsuite is mostly eager. This enables JAF to handle datasets that don’t fit in memory.
Performance Characteristics
| Operation | Memory | Time | Notes |
|---|---|---|---|
| Filter | O(1) | O(n) | Streams one item at a time |
| Map | O(1) | O(n) | Zero-copy transformations |
| Take | O(k) | O(k) | Early termination |
| Distinct | O(w) | O(n) | w = window_size |
| Groupby | O(w×k) | O(n) | k = unique keys in window |
| Batch | O(b) | O(n) | b = batch_size |
JAF achieves:
- Constant memory for basic operations (filter, map, take)
- Bounded memory for windowed operations
- Early termination when using take/skip
- Zero serialization overhead (streaming JSON parsing)
Future Work: Probabilistic Data Structures
JAF plans to add probabilistic data structures for massive-scale operations:
Bloom Filters
# Memory-efficient approximate distinct
stream("huge.jsonl").distinct_approx(
fpr=0.001 # False positive rate
)
Count-Min Sketch
# Heavy hitters detection
stream("events.jsonl").top_k(
k=100,
sketch_width=1000,
sketch_depth=5
)
HyperLogLog
# Cardinality estimation
count = stream("data.jsonl").cardinality_estimate(
precision=14 # ±1.04% error
)
These provide controllable accuracy/memory tradeoffs with theoretical guarantees, enabling billion-item operations on commodity hardware.
Integration with Ecosystem
JAF works seamlessly with other tools:
With jsonl-algebra
jaf filter orders.jsonl '["gt?", "@amount", 100]' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'
With jq
jaf filter data.jsonl '["exists?", "@metadata"]' --eval | \
jq '.metadata'
With Unix tools
jaf map users.jsonl "@email" --eval | sort | uniq -c
Why S-Expressions?
JAF’s choice of S-expressions for the primary query language might seem unusual, but it has deep advantages:
1. Unambiguous parsing: No operator precedence rules to remember
# Clear nesting
["and", ["gt?", "@x", 5], ["lt?", "@y", 10]]
# vs ambiguous infix
x > 5 and y < 10 # Is this (x > 5) and (y < 10) or x > (5 and y) < 10?
2. Homoiconic: Code is data
query = ["and", ["eq?", "@status", "active"], placeholder]
# Can manipulate query as data structure
query[2] = ["gt?", "@age", 18]
3. Composable ASTs:
def build_query(conditions):
return ["and"] + conditions
query = build_query([
["eq?", "@verified", true],
["gt?", "@score", 100]
])
4. Language-agnostic: JSON arrays work in any language
{
"query": ["and", ["eq?", "@status", "active"], ["gt?", "@age", 18]]
}
Theoretical Foundation
JAF implements boolean algebra over document collections:
Predicates as Sets
Each predicate \(p: \mathcal{D} \to \mathbb{B}\) defines a subset:
\[ S_p = \{d \in \mathcal{D} \mid p(d) = \text{true}\} \]Boolean Operations
\[ \text{AND}(p_1, p_2) \Rightarrow S_{p_1} \cap S_{p_2} \]\[ \text{OR}(p_1, p_2) \Rightarrow S_{p_1} \cup S_{p_2} \]\[ \text{NOT}(p) \Rightarrow \mathcal{D} \setminus S_p \]Homomorphism
The filter operation is a monoid homomorphism:
\[ \text{filter}(C_1 \cup C_2, p) = \text{filter}(C_1, p) \cup \text{filter}(C_2, p) \]This guarantees:
- ✅ Parallelizable: Can split collection and filter in parallel
- ✅ Composable: Sequential filters commute
- ✅ Streaming: Can process incrementally
Design Principles
🚀 Lazy by Default: Build pipelines, execute when ready
🔗 Fluent API: Method chaining for readable code
🧩 Composable: Operations combine freely
📦 Source Agnostic: Files, stdin, memory, compressed, infinite streams
🛡️ Safe: Graceful handling of missing paths, malformed data
⚡ Efficient: Constant memory, early termination, windowed operations
🎯 Unix Philosophy: Works great with pipes and command-line tools
Quick Start
# Install
pip install jaf
# Basic usage
jaf filter data.jsonl '(gt? @age 25)' --eval
Python API:
from jaf import stream
# Build lazy pipeline
pipeline = stream("users.jsonl") \
.filter(["eq?", "@status", "active"]) \
.map("@email") \
.take(100)
# Execute when ready
for email in pipeline.evaluate():
send_newsletter(email)
When to Use JAF vs Dotsuite
Use JAF when:
- ✅ Processing large datasets (GB+)
- ✅ Need lazy evaluation
- ✅ Production systems
- ✅ Advanced path features (regex, fuzzy)
- ✅ Windowed operations for memory efficiency
Use dotsuite when:
- ✅ Learning data processing concepts
- ✅ Teaching/prototyping
- ✅ Small datasets that fit in memory
- ✅ Want simple, copyable code
- ✅ Building custom tools
Use both:
- Prototype with dotsuite concepts
- Productionize with JAF
- Teach with dotsuite, build with JAF
Resources
- Repository: github.com/queelius/jaf
- Documentation: Getting Started
- API Guide: API Reference
- Query Language: Query Syntax
- Cookbook: Practical Examples
- Dotsuite Pedagogy: Understanding JAF Through Dotsuite
License
MIT
JAF: Where dotsuite’s pedagogical concepts meet production reality. Boolean algebra over nested JSON, streaming all the way down.
Discussion