Advanced Topics¶
This guide covers advanced JAF concepts, performance optimization, and extending the system.
Understanding the Streaming Architecture¶
How Lazy Evaluation Works¶
JAF builds a pipeline of operations without executing them:
# This creates a pipeline description, no data is processed
pipeline = stream("huge_file.jsonl") \
    .filter(["gt?", "@value", 100]) \
    .map("@id") \
    .take(10)
# The pipeline is a tree of source descriptors
print(pipeline.to_dict())
# {
#   "type": "take",
#   "n": 10,
#   "inner_source": {
#     "type": "map",
#     "expression": ["@", [["key", "id"]]],
#     "inner_source": {
#       "type": "filter",
#       "query": ["gt?", "@value", 100],
#       "inner_source": {
#         "type": "jsonl",
#         "inner_source": {
#           "type": "file",
#           "path": "huge_file.jsonl"
#         }
#       }
#     }
#   }
# }
Stream Processing Internals¶
When you call .evaluate(), JAF:
- Traverses the source tree from outer to inner
- Each operation requests items from its inner source
- Items flow through the pipeline one at a time
- Operations can short-circuit (e.g., takestops after N items)
# This processes only enough items to find 10 matches
for item in pipeline.evaluate():
    print(item)  # Prints at most 10 items
Performance Optimization¶
Memory-Efficient Patterns¶
# Good: Streaming one item at a time
def process_large_file(path):
    for item in stream(path).filter(expr).evaluate():
        process(item)
# Bad: Loading everything into memory
def process_large_file(path):
    all_items = list(stream(path).filter(expr).evaluate())
    for item in all_items:
        process(item)
Filter Optimization¶
Order filters from most to least selective:
# Efficient: Selective filter first
stream("data.jsonl") \
    .filter(["eq?", "@type", "purchase"])  # Filters out 90% of data
    .filter(["gt?", "@amount", 1000])      # Expensive calculation on 10%
# Inefficient: Expensive filter first  
stream("data.jsonl") \
    .filter(["gt?", "@amount", 1000])      # Calculates on 100% of data
    .filter(["eq?", "@type", "purchase"])  # Then filters 90%
Avoiding Redundant Operations¶
# Inefficient: Multiple passes through data
active_users = list(stream("users.jsonl").filter(["eq?", "@active", true]).evaluate())
premium_users = list(stream("users.jsonl").filter(["eq?", "@plan", "premium"]).evaluate())
# Efficient: Single pass with complex filter
users = stream("users.jsonl")
active_users = []
premium_users = []
for user in users.evaluate():
    if user.get("active"):
        active_users.append(user)
    if user.get("plan") == "premium":
        premium_users.append(user)
Working with Infinite Streams¶
Codata Sources¶
JAF supports infinite data sources:
# Fibonacci sequence
fibs = stream({"type": "fibonacci"})
# Take only what you need
first_100 = fibs.take(100).evaluate()
# Find first Fibonacci > 1000
first_large = next(
    fibs.filter(["gt?", "@value", 1000]).evaluate()
)
Creating Custom Infinite Sources¶
def register_counter_source(loader):
    """Register a simple counter source"""
    def counter_generator(start=0, step=1):
        n = start
        while True:
            yield {"value": n, "index": (n - start) // step}
            n += step
    def load_counter(loader, source):
        start = source.get("start", 0)
        step = source.get("step", 1)
        return counter_generator(start, step)
    loader.streaming_loader.register_loader("counter", load_counter)
# Usage
counter = stream({"type": "counter", "start": 100, "step": 5})
# Generates: {"value": 100}, {"value": 105}, {"value": 110}, ...
Extending JAF¶
Custom Operators¶
You can add custom operators by extending the evaluation logic:
from jaf.jaf_eval import jaf_eval
from jaf.exceptions import InvalidArgumentCountError
def custom_stddev(args, obj):
    """Calculate standard deviation of array"""
    if len(args) != 1:
        raise InvalidArgumentCountError("stddev", 1, len(args))
    values = jaf_eval.eval(args[0], obj)
    if not isinstance(values, list):
        return None
    if len(values) == 0:
        return 0
    mean = sum(values) / len(values)
    variance = sum((x - mean) ** 2 for x in values) / len(values)
    return variance ** 0.5
# Register the operator
# Note: This requires modifying jaf_eval.py
# Better approach: Use map with custom Python functions
Custom Stream Operations¶
Create new stream operations by composing existing ones:
class ExtendedStream(LazyDataStream):
    def sample(self, rate=0.1, seed=None):
        """Random sampling of stream"""
        import hashlib
        def should_include(item):
            # Deterministic sampling based on item hash
            item_str = json.dumps(item, sort_keys=True)
            if seed:
                item_str = f"{seed}:{item_str}"
            hash_val = int(hashlib.md5(item_str.encode()).hexdigest(), 16)
            return (hash_val % 10000) < (rate * 10000)
        return self.filter(["custom", should_include])
    def deduplicate(self, key="@id"):
        """Remove duplicates based on key"""
        seen = set()
        def is_unique(item):
            val = jaf_eval.eval(key, item)
            if val in seen:
                return False
            seen.add(val)
            return True
        return self.filter(["custom", is_unique])
Complex Data Processing Patterns¶
Sessionization¶
Group events into sessions based on time gaps:
def sessionize_events(events_file, gap_seconds=1800):
    """Group events into sessions with 30-minute gaps"""
    current_session = []
    last_timestamp = None
    events = stream(events_file) \
        .filter(["exists?", "@timestamp"]) \
        .map(["dict", 
            "time", "@timestamp",
            "user", "@user_id",
            "action", "@action",
            "data", "@"
        ])
    for event in events.evaluate():
        timestamp = event["time"]
        if last_timestamp and (timestamp - last_timestamp) > gap_seconds:
            # Gap detected, yield session
            if current_session:
                yield {
                    "session_id": f"{current_session[0]['user']}_{current_session[0]['time']}",
                    "user": current_session[0]["user"],
                    "start": current_session[0]["time"],
                    "end": current_session[-1]["time"],
                    "duration": current_session[-1]["time"] - current_session[0]["time"],
                    "events": current_session
                }
            current_session = []
        current_session.append(event)
        last_timestamp = timestamp
    # Don't forget the last session
    if current_session:
        yield {
            "session_id": f"{current_session[0]['user']}_{current_session[0]['time']}",
            "user": current_session[0]["user"],
            "start": current_session[0]["time"],
            "end": current_session[-1]["time"],
            "duration": current_session[-1]["time"] - current_session[0]["time"],
            "events": current_session
        }
Streaming Joins¶
While JAF doesn't have built-in joins (use ja for that), you can implement simple streaming joins:
def stream_join_small_lookup(main_file, lookup_file, join_key):
    """Join stream with small lookup table"""
    # Load lookup table into memory
    lookup = {}
    for item in stream(lookup_file).evaluate():
        key = jaf_eval.eval(join_key, item)
        lookup[key] = item
    # Stream through main file and join
    return stream(main_file) \
        .map(["dict",
            "main", "@",
            "lookup", ["get", lookup, jaf_eval.eval(join_key, "@")]
        ]) \
        .filter(["exists?", "@lookup"])
Debugging and Monitoring¶
Pipeline Inspection¶
def inspect_pipeline(pipeline, sample_size=5):
    """Inspect intermediate results in a pipeline"""
    # Get pipeline info
    info = pipeline.info()
    print(f"Pipeline structure: {info['pipeline']}")
    # Sample some results
    print(f"\nFirst {sample_size} results:")
    for i, item in enumerate(pipeline.evaluate()):
        if i >= sample_size:
            break
        print(f"{i}: {json.dumps(item, indent=2)}")
Performance Profiling¶
import time
from contextlib import contextmanager
@contextmanager
def time_pipeline(name):
    """Time pipeline execution"""
    start = time.time()
    count = 0
    def counter(items):
        nonlocal count
        for item in items:
            count += 1
            yield item
    yield counter
    elapsed = time.time() - start
    print(f"{name}: processed {count} items in {elapsed:.2f}s ({count/elapsed:.0f} items/s)")
# Usage
with time_pipeline("Filter operation") as counter:
    results = list(counter(
        stream("large_file.jsonl")
        .filter(complex_query)
        .evaluate()
    ))
Error Tracking¶
def safe_pipeline(pipeline, error_file="errors.jsonl"):
    """Execute pipeline with error tracking"""
    error_count = 0
    with open(error_file, 'w') as f:
        for i, item in enumerate(pipeline.evaluate()):
            try:
                yield item
            except Exception as e:
                error_count += 1
                error_record = {
                    "index": i,
                    "error": str(e),
                    "type": type(e).__name__,
                    "item": item
                }
                f.write(json.dumps(error_record) + '\n')
    if error_count > 0:
        print(f"Warning: {error_count} errors logged to {error_file}")
Integration with Data Science Tools¶
Pandas Integration¶
import pandas as pd
def jaf_to_dataframe(pipeline, max_rows=None):
    """Convert JAF pipeline results to pandas DataFrame"""
    if max_rows:
        data = list(pipeline.take(max_rows).evaluate())
    else:
        data = list(pipeline.evaluate())
    return pd.DataFrame(data)
# Usage
df = jaf_to_dataframe(
    stream("sales.jsonl")
    .filter(["gt?", "@amount", 100])
    .map(["dict", 
        "date", "@date",
        "amount", "@amount",
        "category", "@product.category"
    ])
)
# Now use pandas for analytics
monthly_sales = df.groupby(pd.to_datetime(df['date']).dt.to_period('M'))['amount'].sum()
DuckDB Integration¶
import duckdb
def jaf_to_duckdb(pipeline, table_name="jaf_data"):
    """Stream JAF results into DuckDB"""
    conn = duckdb.connect()
    # Create table from first item
    first_item = next(pipeline.evaluate(), None)
    if not first_item:
        return conn
    # Infer schema and create table
    columns = []
    for key, value in first_item.items():
        if isinstance(value, (int, float)):
            columns.append(f"{key} NUMERIC")
        else:
            columns.append(f"{key} VARCHAR")
    conn.execute(f"CREATE TABLE {table_name} ({', '.join(columns)})")
    # Insert data in batches
    batch = [first_item]
    for item in pipeline.evaluate():
        batch.append(item)
        if len(batch) >= 1000:
            conn.executemany(f"INSERT INTO {table_name} VALUES ({', '.join(['?'] * len(columns))})", 
                           [[item.get(k) for k in first_item.keys()] for item in batch])
            batch = []
    # Insert remaining
    if batch:
        conn.executemany(f"INSERT INTO {table_name} VALUES ({', '.join(['?'] * len(columns))})", 
                       [[item.get(k) for k in first_item.keys()] for item in batch])
    return conn
# Usage
conn = jaf_to_duckdb(
    stream("transactions.jsonl")
    .filter(["eq?", "@status", "completed"])
)
# Now use SQL
result = conn.execute("""
    SELECT 
        DATE_TRUNC('month', date) as month,
        SUM(amount) as total
    FROM jaf_data
    GROUP BY month
    ORDER BY month
""").fetchall()
Best Practices for Production¶
- Use Type Hints: Add type hints to your pipeline functions
- Validate Early: Add validation filters at the start of pipelines
- Monitor Memory: Use batching for operations that accumulate state
- Handle Errors: Use try-except in custom operations
- Document Pipelines: Save complex pipelines with descriptive names
- Test with Samples: Always test on small data before running on full datasets
- Profile Performance: Measure throughput for critical pipelines
- Version Control: Track pipeline definitions in git
Future Enhancements¶
The JAF ecosystem continues to evolve. Some areas being explored:
- Windowed Operations: Time or count-based windows for streaming aggregations
- Parallel Processing: Multi-threaded evaluation for CPU-bound operations
- Schema Validation: Built-in JSON Schema validation
- Pipeline Optimization: Automatic query reordering for performance
- Extended Type System: Better support for custom types and validations
For the latest updates and to contribute, visit the JAF GitHub repository.