Skip to main content

JAF: Production-Grade Streaming Boolean Algebra Over Nested JSON

JAF (Just Another Flow) is a production-grade streaming data processing system for JSON/JSONL data. It implements boolean algebra over nested JSON structures with lazy evaluation, composable operations, and a fluent API. JAF is the full-featured, battle-tested implementation of the concepts explored in dotsuite.

The Relationship to Dotsuite

Think of it this way:

  • dotsuite: “This is HOW it works” — Pedagogical, simple, learn-by-building
  • JAF: “This is WHAT you use” — Production-ready, feature-complete, battle-tested

JAF implements the highest level of dotsuite’s architecture: boolean algebra over collections of nested documents. Where dotsuite teaches the concepts through isolated, simple tools, JAF combines them into a unified streaming framework optimized for production workloads.

The Boolean Algebra Branch

In dotsuite’s three-pillar architecture (Depth, Truth, Shape), JAF focuses on the collections layer—specifically the boolean wing that provides filtering operations with full boolean algebra:

\[ \text{filter}: (\mathcal{D} \to \mathbb{B}) \to (C \to C) \]

Where:

  • \(\mathcal{D}\) = Document space
  • \(\mathbb{B}\) = Boolean values
  • \(C\) = Collection of documents

JAF lifts boolean operations to streams with:

  • AND: Intersection of filtered streams
  • OR: Union of filtered streams
  • NOT: Complement of filtered stream
  • Composition: Chainable predicates with guaranteed homomorphism

Core Innovation: Lazy Streaming Boolean Algebra

The Problem

Traditional data processing loads entire datasets into memory:

# Eager evaluation - loads everything
all_data = load_json("huge_file.jsonl")
filtered = [d for d in all_data if d['age'] > 25]
mapped = [transform(d) for d in filtered]

This fails on large datasets and wastes resources when you only need the first 10 results.

JAF’s Solution

from jaf import stream

# Lazy evaluation - nothing executes yet
pipeline = stream("huge_file.jsonl") \
    .filter(["gt?", "@age", 25]) \
    .map(transform) \
    .take(10)

# Only processes 10 matching items
for item in pipeline.evaluate():
    process(item)

Key benefits:

  • Constant memory: Processes one item at a time
  • Early termination: Stops after take(10)
  • Composable: Build complex pipelines declaratively
  • Streaming: Works with infinite streams

Three Query Syntaxes

JAF supports multiple query syntaxes that all compile to the same internal representation:

1. S-Expression Syntax (Lisp-like)

# Simple comparisons
(eq? @status "active")
(gt? @age 25)
(contains? @tags "python")

# Boolean logic
(and
    (gte? @age 18)
    (eq? @verified true))

# Nested expressions
(or (eq? @role "admin")
    (and (eq? @role "user")
         (gt? @score 100)))

Why S-expressions?

  • Unambiguous parsing (no precedence rules)
  • Easy to serialize/deserialize
  • Homoiconic (code is data)
  • Composable ASTs

2. JSON Array Syntax

# Same queries in JSON
["eq?", "@status", "active"]
["gt?", "@age", 25]
["contains?", "@tags", "python"]

["and",
    ["gte?", "@age", 18],
    ["eq?", "@verified", true]
]

Why JSON arrays?

  • Easy to generate programmatically
  • Standard JSON format
  • Network-transmissible
  • No parsing needed

3. Infix DSL Syntax

# Natural infix notation
@status == "active"
@age > 25 and @verified == true
@role == "admin" or (@role == "user" and @score > 100)

Why infix?

  • Human-readable
  • Familiar syntax
  • Good for CLI usage

All three compile to the same AST—use whichever fits your use case.

Advanced Path System

JAF extends dotsuite’s simple dot notation with powerful path features:

Exact Paths

stream(data).filter(["eq?", "@user.profile.name", "Alice"])

Wildcards

# Match any array element
stream(data).map("@users.*.email")

# Recursive descent
stream(data).map("@**.id")

Regex Key Matching

# Match keys by pattern
["regex_key", "^error_\\d+$"]

Fuzzy Matching

# Tolerate typos
["fuzzy_key", "username", 0.8]  # 80% similarity

Array Operations

# Array slicing
"@items[0:5]"
"@items[-3:]"

# Predicate filtering
"@users[?(@.age > 25)]"

Streaming Operations

Filter — Boolean Algebra

# AND: Intersection
active_verified = stream("users.jsonl") \
    .filter(["eq?", "@status", "active"]) \
    .filter(["eq?", "@verified", true])

# OR: Union (using boolean logic in query)
admins_or_mods = stream("users.jsonl") \
    .filter(["or",
        ["eq?", "@role", "admin"],
        ["eq?", "@role", "moderator"]
    ])

# NOT: Complement
inactive = stream("users.jsonl") \
    .filter(["not", ["eq?", "@status", "active"]])

Map — Transformations

# Extract fields
emails = stream("users.jsonl").map("@email")

# Reshape documents
simplified = stream("users.jsonl").map(["dict",
    "name", "@firstName",
    "age", "@profile.age",
    "verified", "@verified"
])

# Conditional transformations
categorized = stream("sales.jsonl").map(["dict",
    "date", "@timestamp",
    "amount", "@amount",
    "category", ["if", ["gt?", "@amount", 1000], "high", "low"]
])

Take/Skip — Pagination

# First 100 items
stream("data.jsonl").take(100)

# Skip first 100, take next 100
stream("data.jsonl").skip(100).take(100)

Batch — Chunking

# Process in groups of 1000
for batch in stream("data.jsonl").batch(1000).evaluate():
    bulk_insert(batch)

Windowed Operations for Memory Efficiency

JAF supports windowed operations that trade accuracy for bounded memory:

Distinct

# Exact distinct (potentially unbounded memory)
stream("data.jsonl").distinct(window_size=float('inf'))

# Windowed distinct (bounded memory, approximate)
stream("data.jsonl").distinct(window_size=1000)

Groupby

# Tumbling window groupby
stream("logs.jsonl").groupby(
    key="@level",
    window_size=100  # Group every 100 items
)

Set Operations

# Exact intersection
stream1.intersect(stream2, window_size=float('inf'))

# Windowed intersection (memory-efficient)
stream1.intersect(stream2, window_size=10000)

Trade-off: Finite windows provide memory bounds but may miss items if windows don’t overlap. Choose window size based on your data distribution.

Real-World Examples

Log Analysis

from jaf import stream

# Find critical errors in authentication service
critical_auth_errors = stream("app.log.jsonl") \
    .filter(["and",
        ["eq?", "@level", "ERROR"],
        ["eq?", "@service", "auth"],
        ["contains?", "@message", "critical"]
    ]) \
    .map(["dict",
        "timestamp", "@timestamp",
        "user", "@context.user_id",
        "message", "@message"
    ]) \
    .evaluate()

for error in critical_auth_errors:
    alert_oncall(error)

ETL Pipeline

# Transform sales data
pipeline = stream("raw_sales.jsonl") \
    .filter(["eq?", "@status", "completed"]) \
    .map(["dict",
        "date", ["date", "@timestamp"],
        "customer_id", "@customer.id",
        "amount", "@total",
        "category", ["if",
            ["gt?", "@total", 1000],
            "enterprise",
            "standard"
        ]
    ]) \
    .batch(1000)

for batch in pipeline.evaluate():
    warehouse.bulk_insert(batch)

Data Validation

# Find invalid user records
invalid_users = stream("users.jsonl") \
    .filter(["or",
        ["not", ["exists?", "@email"]],
        ["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
        ["lt?", "@age", 0],
        ["gt?", "@age", 150]
    ]) \
    .map(["dict",
        "id", "@id",
        "reason", ["if",
            ["not", ["exists?", "@email"]],
            "missing_email",
            ["if",
                ["not", ["regex-match?", "@email", r"^[^@]+@[^@]+\.[^@]+$"]],
                "invalid_email",
                "invalid_age"
            ]
        ]
    ]) \
    .evaluate()

Complex Boolean Logic

# High-value customers: VIP OR (loyal AND high_spending)
high_value = stream("customers.jsonl") \
    .filter(["or",
        ["eq?", "@tier", "VIP"],
        ["and",
            ["gte?", "@years_active", 3],
            ["gte?", "@total_spent", 10000]
        ]
    ])

Command-Line Usage

JAF excels at Unix-style pipelines:

# Filter logs
jaf filter app.log.jsonl '(eq? @level "ERROR")' --eval

# Chain operations
jaf filter users.jsonl '(eq? @status "active")' | \
jaf map - "@email" | \
jaf eval -

# Complex queries
jaf filter logs.jsonl \
  '(and (eq? @level "ERROR") (gt? @timestamp "2024-01-01"))' \
  --eval

# Integrate with other tools
jaf filter data.jsonl '(exists? @metadata)' --eval | \
jq '.metadata' | \
sort | uniq -c

# Batch processing
jaf filter orders.jsonl '(gt? @amount 100)' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'

Mapping Dotsuite Concepts to JAF

Dotsuite ToolJAF FeatureNotes
dotgetPath system (@path)JAF adds regex, fuzzy, wildcards
dotstarWildcard paths@users.*.name
dotfilter.filter() methodS-expression queries
dotqueryQuery languageMore powerful, S-expressions
dotmod.map() transformationsImmutable by default
dotpipeMethod chainingFluent API
dotpluckReshaping with ["dict", ...]Transform documents
dotrelatePlanned (join/union)Future feature

Key difference: JAF is lazy by default, while dotsuite is mostly eager. This enables JAF to handle datasets that don’t fit in memory.

Performance Characteristics

OperationMemoryTimeNotes
FilterO(1)O(n)Streams one item at a time
MapO(1)O(n)Zero-copy transformations
TakeO(k)O(k)Early termination
DistinctO(w)O(n)w = window_size
GroupbyO(w×k)O(n)k = unique keys in window
BatchO(b)O(n)b = batch_size

JAF achieves:

  • Constant memory for basic operations (filter, map, take)
  • Bounded memory for windowed operations
  • Early termination when using take/skip
  • Zero serialization overhead (streaming JSON parsing)

Future Work: Probabilistic Data Structures

JAF plans to add probabilistic data structures for massive-scale operations:

Bloom Filters

# Memory-efficient approximate distinct
stream("huge.jsonl").distinct_approx(
    fpr=0.001  # False positive rate
)

Count-Min Sketch

# Heavy hitters detection
stream("events.jsonl").top_k(
    k=100,
    sketch_width=1000,
    sketch_depth=5
)

HyperLogLog

# Cardinality estimation
count = stream("data.jsonl").cardinality_estimate(
    precision=14  # ±1.04% error
)

These provide controllable accuracy/memory tradeoffs with theoretical guarantees, enabling billion-item operations on commodity hardware.

Integration with Ecosystem

JAF works seamlessly with other tools:

With jsonl-algebra

jaf filter orders.jsonl '["gt?", "@amount", 100]' --eval | \
ja groupby customer_id --aggregate 'total:amount:sum'

With jq

jaf filter data.jsonl '["exists?", "@metadata"]' --eval | \
jq '.metadata'

With Unix tools

jaf map users.jsonl "@email" --eval | sort | uniq -c

Why S-Expressions?

JAF’s choice of S-expressions for the primary query language might seem unusual, but it has deep advantages:

1. Unambiguous parsing: No operator precedence rules to remember

# Clear nesting
["and", ["gt?", "@x", 5], ["lt?", "@y", 10]]

# vs ambiguous infix
x > 5 and y < 10  # Is this (x > 5) and (y < 10) or x > (5 and y) < 10?

2. Homoiconic: Code is data

query = ["and", ["eq?", "@status", "active"], placeholder]
# Can manipulate query as data structure
query[2] = ["gt?", "@age", 18]

3. Composable ASTs:

def build_query(conditions):
    return ["and"] + conditions

query = build_query([
    ["eq?", "@verified", true],
    ["gt?", "@score", 100]
])

4. Language-agnostic: JSON arrays work in any language

{
  "query": ["and", ["eq?", "@status", "active"], ["gt?", "@age", 18]]
}

Theoretical Foundation

JAF implements boolean algebra over document collections:

Predicates as Sets

Each predicate \(p: \mathcal{D} \to \mathbb{B}\) defines a subset:

\[ S_p = \{d \in \mathcal{D} \mid p(d) = \text{true}\} \]

Boolean Operations

\[ \text{AND}(p_1, p_2) \Rightarrow S_{p_1} \cap S_{p_2} \]

\[ \text{OR}(p_1, p_2) \Rightarrow S_{p_1} \cup S_{p_2} \]

\[ \text{NOT}(p) \Rightarrow \mathcal{D} \setminus S_p \]

Homomorphism

The filter operation is a monoid homomorphism:

\[ \text{filter}(C_1 \cup C_2, p) = \text{filter}(C_1, p) \cup \text{filter}(C_2, p) \]

This guarantees:

  • Parallelizable: Can split collection and filter in parallel
  • Composable: Sequential filters commute
  • Streaming: Can process incrementally

Design Principles

🚀 Lazy by Default: Build pipelines, execute when ready

🔗 Fluent API: Method chaining for readable code

🧩 Composable: Operations combine freely

📦 Source Agnostic: Files, stdin, memory, compressed, infinite streams

🛡️ Safe: Graceful handling of missing paths, malformed data

⚡ Efficient: Constant memory, early termination, windowed operations

🎯 Unix Philosophy: Works great with pipes and command-line tools

Quick Start

# Install
pip install jaf

# Basic usage
jaf filter data.jsonl '(gt? @age 25)' --eval

Python API:

from jaf import stream

# Build lazy pipeline
pipeline = stream("users.jsonl") \
    .filter(["eq?", "@status", "active"]) \
    .map("@email") \
    .take(100)

# Execute when ready
for email in pipeline.evaluate():
    send_newsletter(email)

When to Use JAF vs Dotsuite

Use JAF when:

  • ✅ Processing large datasets (GB+)
  • ✅ Need lazy evaluation
  • ✅ Production systems
  • ✅ Advanced path features (regex, fuzzy)
  • ✅ Windowed operations for memory efficiency

Use dotsuite when:

  • ✅ Learning data processing concepts
  • ✅ Teaching/prototyping
  • ✅ Small datasets that fit in memory
  • ✅ Want simple, copyable code
  • ✅ Building custom tools

Use both:

  • Prototype with dotsuite concepts
  • Productionize with JAF
  • Teach with dotsuite, build with JAF

Resources

License

MIT


JAF: Where dotsuite’s pedagogical concepts meet production reality. Boolean algebra over nested JSON, streaming all the way down.

Discussion