JAF (Just Another Flow) is a streaming data processing system for JSON/JSONL data. It implements boolean algebra over nested JSON structures with lazy evaluation, composable operations, and a fluent API. JAF is the production version of the concepts I explored in dotsuite.
The Relationship to Dotsuite
The short version:
- dotsuite: “This is how it works.” Pedagogical, simple, learn-by-building.
- JAF: “This is what you use.” Feature-complete, lazy, handles real data.
JAF implements the highest level of dotsuite’s architecture: boolean algebra over collections of nested documents. Where dotsuite teaches the concepts through isolated simple tools, JAF combines them into a unified streaming framework.
The Boolean Algebra Branch
In dotsuite’s three-pillar architecture (Depth, Truth, Shape), JAF focuses on the collections layer, specifically the boolean wing that provides filtering operations with full boolean algebra:
\[ \text{filter}: (\mathcal{D} \to \mathbb{B}) \to (C \to C) \]Where \(\mathcal{D}\) is the document space, \(\mathbb{B}\) is boolean values, and \(C\) is a collection of documents.
JAF lifts boolean operations to streams: AND is intersection of filtered streams, OR is union, NOT is complement, and composition gives you chainable predicates with guaranteed homomorphism.
Core Innovation: Lazy Streaming
The Problem
Traditional data processing loads entire datasets into memory:
# Eager evaluation - loads everything
all_data = load_json("huge_file.jsonl")
filtered = [d for d in all_data if d['age'] > 25]
mapped = [transform(d) for d in filtered]
This fails on large datasets and wastes resources when you only need the first 10 results.
JAF’s Solution
from jaf import stream
# Lazy evaluation - nothing executes yet
pipeline = stream("huge_file.jsonl") \
.filter(["gt?", "@age", 25]) \
.map(transform) \
.take(10)
# Only processes 10 matching items
for item in pipeline.evaluate():
process(item)
Constant memory (processes one item at a time), early termination (stops after take(10)), composable (build complex pipelines declaratively), and works with infinite streams.
Three Query Syntaxes
JAF supports multiple query syntaxes that all compile to the same internal representation.
S-Expression Syntax (Lisp-like)
# Simple comparisons
(eq? @status "active")
(gt? @age 25)
(contains? @tags "python")
# Boolean logic
(and
(gte? @age 18)
(eq? @verified true))
# Nested expressions
(or (eq? @role "admin")
(and (eq? @role "user")
(gt? @score 100)))
S-expressions because: unambiguous parsing (no precedence rules), easy to serialize, homoiconic (code is data), composable ASTs.
JSON Array Syntax
# Same queries in JSON
["eq?", "@status", "active"]
["gt?", "@age", 25]
["and",
["gte?", "@age", 18],
["eq?", "@verified", true]
]
Easy to generate programmatically, standard JSON format, network-transmissible.
Infix DSL Syntax
# Natural infix notation
@status == "active"
@age > 25 and @verified == true
@role == "admin" or (@role == "user" and @score > 100)
Human-readable, familiar, good for CLI usage. All three compile to the same AST.
Advanced Path System
JAF extends dotsuite’s simple dot notation with more powerful features:
Exact Paths
stream(data).filter(["eq?", "@user.profile.name", "Alice"])
Wildcards
# Match any array element
stream(data).map("@users.*.email")
# Recursive descent
stream(data).map("@**.id")
Regex Key Matching
# Match keys by pattern
["regex_key", "^error_\\d+$"]
Fuzzy Matching
# Tolerate typos
["fuzzy_key", "username", 0.8] # 80% similarity
Array Operations
# Array slicing
"@items[0:5]"
"@items[-3:]"
# Predicate filtering
"@users[?(@.age > 25)]"
Streaming Operations
Filter (Boolean Algebra)
# AND: Intersection
active_verified = stream("users.jsonl") \
.filter(["eq?", "@status", "active"]) \
.filter(["eq?", "@verified", true])
# OR: Union (using boolean logic in query)
admins_or_mods = stream("users.jsonl") \
.filter(["or",
["eq?", "@role", "admin"],
["eq?", "@role", "moderator"]
])
# NOT: Complement
inactive = stream("users.jsonl") \
.filter(["not", ["eq?", "@status", "active"]])
Map (Transformations)
# Extract fields
emails = stream("users.jsonl").map("@email")
# Reshape documents
simplified = stream("users.jsonl").map(["dict",
"name", "@firstName",
"age", "@profile.age",
"verified", "@verified"
])
# Conditional transformations
categorized = stream("sales.jsonl").map(["dict",
"date", "@timestamp",
"amount", "@amount",
"category", ["if", ["gt?", "@amount", 1000], "high", "low"]
])
Take/Skip (Pagination)
# First 100 items
stream("data.jsonl").take(100)
# Skip first 100, take next 100
stream("data.jsonl").skip(100).take(100)
Batch (Chunking)
# Process in groups of 1000
for batch in stream("data.jsonl").batch(1000).evaluate():
bulk_insert(batch)
Windowed Operations for Memory Efficiency
JAF supports windowed operations that trade accuracy for bounded memory:
Distinct
# Exact distinct (potentially unbounded memory)
stream("data.jsonl").distinct(window_size=float('inf'))
# Windowed distinct (bounded memory, approximate)
stream("data.jsonl").distinct(window_size=1000)
Groupby
# Tumbling window groupby
stream("logs.jsonl").groupby(
key="@level",
window_size=100 # Group every 100 items
)
Set Operations
# Exact intersection
stream1.intersect(stream2, window_size=float('inf'))
# Windowed intersection (memory-efficient)
stream1.intersect(stream2, window_size=10000)
The trade-off: finite windows provide memory bounds but may miss items if windows don’t overlap. Choose window size based on your data distribution.
Real-World Examples
Log Analysis
from jaf import stream
# Find critical errors in authentication service
critical_auth_errors = stream("app.log.jsonl") \
.filter(["and",
["eq?", "@level", "ERROR"],
["eq?", "@service", "auth"],
["contains?", "@message", "critical"]
]) \
.map(["dict",
"timestamp", "@timestamp",
"user", "@context.user_id",
"message", "@message"
]) \
.evaluate()
for error in critical_auth_errors:
alert_oncall(error)
ETL Pipeline
# Transform sales data
pipeline = stream("raw_sales.jsonl") \
.filter(["eq?", "@status", "completed"]) \
.map(["dict",
"date", ["date", "@timestamp"],
"customer_id", "@customer.id",
"amount", "@total",
"category", ["if",
["gt?", "@total", 1000],
"enterprise",
"standard"
]
]) \
.batch(1000)
for batch in pipeline.evaluate():
warehouse.bulk_insert(batch)
Data Validation
# Find invalid user records
invalid_users = stream("users.jsonl") \
.filter(["or",
["not", ["exists?", "@email"]],
["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
["lt?", "@age", 0],
["gt?", "@age", 150]
]) \
.map(["dict",
"id", "@id",
"reason", ["if",
["not", ["exists?", "@email"]],
"missing_email",
["if",
["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
"invalid_email",
"invalid_age"
]
]
]) \
.evaluate()
Command-Line Usage
JAF works well in Unix-style pipelines:
# Filter logs
jaf filter app.log.jsonl '(eq? @level "ERROR")' --eval
# Chain operations
jaf filter users.jsonl '(eq? @status "active")' | \
jaf map - "@email" | \
jaf eval -
# Complex queries
jaf filter logs.jsonl \
'(and (eq? @level "ERROR") (gt? @timestamp "2024-01-01"))' \
--eval
# Integrate with other tools
jaf filter data.jsonl '(exists? @metadata)' --eval | \
jq '.metadata' | \
sort | uniq -c
# Batch processing
jaf filter orders.jsonl '(gt? @amount 100)' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'
Mapping Dotsuite Concepts to JAF
| Dotsuite Tool | JAF Feature | Notes |
|---|---|---|
| dotget | Path system (@path) | JAF adds regex, fuzzy, wildcards |
| dotstar | Wildcard paths | @users.*.name |
| dotfilter | .filter() method | S-expression queries |
| dotquery | Query language | More powerful, S-expressions |
| dotmod | .map() transformations | Immutable by default |
| dotpipe | Method chaining | Fluent API |
| dotpluck | Reshaping with ["dict", ...] | Transform documents |
| dotrelate | Planned (join/union) | Future feature |
The key difference: JAF is lazy by default, while dotsuite is mostly eager. This is what lets JAF handle datasets that don’t fit in memory.
Performance Characteristics
| Operation | Memory | Time | Notes |
|---|---|---|---|
| Filter | O(1) | O(n) | Streams one item at a time |
| Map | O(1) | O(n) | Zero-copy transformations |
| Take | O(k) | O(k) | Early termination |
| Distinct | O(w) | O(n) | w = window_size |
| Groupby | O(w*k) | O(n) | k = unique keys in window |
| Batch | O(b) | O(n) | b = batch_size |
Constant memory for basic operations (filter, map, take). Bounded memory for windowed operations. Early termination when using take/skip. Zero serialization overhead via streaming JSON parsing.
Future Work: Probabilistic Data Structures
I plan to add probabilistic data structures for massive-scale operations:
Bloom filters for memory-efficient approximate distinct. Count-Min Sketch for heavy hitters detection. HyperLogLog for cardinality estimation. These provide controllable accuracy/memory tradeoffs with theoretical guarantees, enabling billion-item operations on commodity hardware.
Theoretical Foundation
JAF implements boolean algebra over document collections.
Predicates as Sets
Each predicate \(p: \mathcal{D} \to \mathbb{B}\) defines a subset:
\[ S_p = \lbrace d \in \mathcal{D} \mid p(d) = \text{true}\rbrace \]Boolean Operations
\[ \text{AND}(p_1, p_2) \Rightarrow S_{p_1} \cap S_{p_2} \]\[ \text{OR}(p_1, p_2) \Rightarrow S_{p_1} \cup S_{p_2} \]\[ \text{NOT}(p) \Rightarrow \mathcal{D} \setminus S_p \]Homomorphism
The filter operation is a monoid homomorphism:
\[ \text{filter}(C_1 \cup C_2, p) = \text{filter}(C_1, p) \cup \text{filter}(C_2, p) \]This guarantees the operation is parallelizable (can split the collection and filter in parallel), composable (sequential filters commute), and streaming (can process incrementally).
Quick Start
# Install
pip install jaf
# Basic usage
jaf filter data.jsonl '(gt? @age 25)' --eval
Python API:
from jaf import stream
# Build lazy pipeline
pipeline = stream("users.jsonl") \
.filter(["eq?", "@status", "active"]) \
.map("@email") \
.take(100)
# Execute when ready
for email in pipeline.evaluate():
send_newsletter(email)
When to Use JAF vs Dotsuite
Use JAF for processing large datasets, when you need lazy evaluation, for production systems, when you need advanced path features (regex, fuzzy), and for windowed operations.
Use dotsuite for learning data processing concepts, teaching and prototyping, small datasets that fit in memory, when you want simple copyable code, and for building custom tools.
Or use both: prototype with dotsuite concepts, productionize with JAF.
Resources
- Repository: github.com/queelius/jaf
- Documentation: Getting Started
- API Guide: API Reference
- Query Language: Query Syntax
- Cookbook: Practical Examples
License
MIT
Discussion