Skip to content

API Reference

Welcome to the complete API reference for JSONL Algebra. Every function and class is listed here with its full documentation.

ja.core

Core relational operations for the JSONL algebra system.

This module implements the fundamental set and relational operations that form the algebra for manipulating collections of JSON objects. All operations are designed to work with lists of dictionaries, making them suitable for processing JSONL data.

Classes

Functions

select(data, expr, use_jmespath=False)

Filter rows based on an expression.

Parameters:

Name Type Description Default
data Relation

List of dictionaries to filter

required
expr str

Expression to evaluate (simple expression or JMESPath)

required
use_jmespath bool

If True, use JMESPath evaluation

False

Returns:

Type Description
Relation

List of rows where the expression evaluates to true

Source code in ja/core.py
def select(
    data: Relation, expr: str, use_jmespath: bool = False
) -> Relation:
    """Filter rows based on an expression.

    Args:
        data: List of dictionaries to filter
        expr: Expression to evaluate (simple expression or JMESPath)
        use_jmespath: If True, use JMESPath evaluation

    Returns:
        List of rows where the expression evaluates to true
    """
    if use_jmespath:
        compiled_expr = jmespath.compile(expr)
        return [row for row in data if compiled_expr.search(row)]

    # Use simple expression parser
    parser = ExprEval()
    result = []

    # Handle 'and' at the command level for simplicity
    if " and " in expr:
        # Multiple conditions with 'and'
        conditions = expr.split(" and ")
        for row in data:
            if all(parser.evaluate(cond.strip(), row) for cond in conditions):
                result.append(row)
    elif " or " in expr:
        # Multiple conditions with 'or'
        conditions = expr.split(" or ")
        for row in data:
            if any(parser.evaluate(cond.strip(), row) for cond in conditions):
                result.append(row)
    else:
        # Single condition
        for row in data:
            if parser.evaluate(expr, row):
                result.append(row)

    return result

project(data, fields, use_jmespath=False)

Project specific fields from each row.

Parameters:

Name Type Description Default
data Relation

List of dictionaries to project

required
fields List[str] | str

Comma-separated field names or expressions

required
use_jmespath bool

If True, use JMESPath for projection

False

Returns:

Type Description
Relation

List of dictionaries with only the specified fields

Source code in ja/core.py
def project(
    data: Relation, fields: List[str] | str, use_jmespath: bool = False
) -> Relation:
    """Project specific fields from each row.

    Args:
        data: List of dictionaries to project
        fields: Comma-separated field names or expressions
        use_jmespath: If True, use JMESPath for projection

    Returns:
        List of dictionaries with only the specified fields
    """

    if use_jmespath:
        compiled_expr = jmespath.compile(fields)
        return [compiled_expr.search(row) for row in data]

    # Parse field specifications
    result = []
    parser = ExprEval()
    field_specs = fields if isinstance(fields, list) else fields.split(",")

    for row in data:
        new_row = {}

        for spec in field_specs:
            if "=" in spec:
                # Computed field: "total=amount*1.1" or "is_adult=age>=18"
                name, expr = spec.split("=", 1)
                name = name.strip()
                expr = expr.strip()

                # Check if it's an arithmetic expression
                arith_result = parser.evaluate_arithmetic(expr, row)
                if arith_result is not None:
                    new_row[name] = arith_result
                else:
                    # Try as boolean expression
                    new_row[name] = parser.evaluate(expr, row)
            else:
                # Simple field projection
                value = parser.get_field_value(row, spec)
                if value is not None:
                    # Build nested structure
                    parser.set_field_value(new_row, spec, value)

        result.append(new_row)

    return result

join(left, right, on, how='inner')

Join two relations with support for multiple join types.

Parameters:

Name Type Description Default
left Relation

Left relation (list of dictionaries)

required
right Relation

Right relation (list of dictionaries)

required
on List[Tuple[str, str]]

List of (left_key, right_key) tuples specifying join columns

required
how str

Join type - "inner", "left", "right", "outer", or "cross" - inner: Only matching rows from both sides (default) - left: All rows from left, matching rows from right (nulls if no match) - right: All rows from right, matching rows from left (nulls if no match) - outer: All rows from both sides (nulls where no match) - cross: Cartesian product (ignores 'on' parameter)

'inner'

Returns:

Type Description
Relation

Joined relation as list of dictionaries

Examples:

>>> left = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
>>> right = [{"user_id": 1, "order": "Book"}]
>>> join(left, right, [("id", "user_id")], how="left")
[{"id": 1, "name": "Alice", "order": "Book"},
 {"id": 2, "name": "Bob", "order": None}]
Source code in ja/core.py
def join(left: Relation,
         right: Relation,
         on: List[Tuple[str, str]],
         how: str = "inner") -> Relation:
    """Join two relations with support for multiple join types.

    Args:
        left: Left relation (list of dictionaries)
        right: Right relation (list of dictionaries)
        on: List of (left_key, right_key) tuples specifying join columns
        how: Join type - "inner", "left", "right", "outer", or "cross"
            - inner: Only matching rows from both sides (default)
            - left: All rows from left, matching rows from right (nulls if no match)
            - right: All rows from right, matching rows from left (nulls if no match)
            - outer: All rows from both sides (nulls where no match)
            - cross: Cartesian product (ignores 'on' parameter)

    Returns:
        Joined relation as list of dictionaries

    Examples:
        >>> left = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
        >>> right = [{"user_id": 1, "order": "Book"}]
        >>> join(left, right, [("id", "user_id")], how="left")
        [{"id": 1, "name": "Alice", "order": "Book"},
         {"id": 2, "name": "Bob", "order": None}]
    """
    how = how.lower()
    valid_types = {"inner", "left", "right", "outer", "cross"}
    if how not in valid_types:
        raise ValueError(f"Invalid join type '{how}'. Must be one of: {', '.join(sorted(valid_types))}")

    # Cross join is special - no key matching
    if how == "cross":
        return product(left, right)

    parser = ExprEval()

    # Index right side by join keys
    right_index: Dict[Tuple[Any, ...], List[Row]] = defaultdict(list)
    for r in right:
        key = tuple(parser.get_field_value(r, rk) for _, rk in on)
        if all(v is not None for v in key):
            right_index[key].append(r)

    # Roots of every RHS join path (e.g. 'user.id' → 'user')
    rhs_roots = {re.split(r"[.\[]", rk, 1)[0] for _, rk in on}

    # Get all right-side field names for null placeholders
    right_fields = set()
    for r in right:
        right_fields.update(r.keys())
    # Remove join key roots from right fields
    right_fields -= rhs_roots

    # Get all left-side field names for null placeholders
    left_fields = set()
    for l in left:
        left_fields.update(l.keys())

    def merge_rows(l_row: Optional[Row], r_row: Optional[Row]) -> Row:
        """Merge left and right rows, handling nulls."""
        merged = {}

        if r_row is not None:
            for k, v in r_row.items():
                # Skip right-side join key roots
                root = re.split(r"[.\[]", k, 1)[0]
                if root not in rhs_roots:
                    merged[k] = v
        else:
            # No right match - add null placeholders for right fields
            for field in right_fields:
                merged[field] = None

        if l_row is not None:
            merged.update(l_row)  # Left wins on collision
        else:
            # No left match - add null placeholders for left fields
            for field in left_fields:
                merged[field] = None

        return merged

    joined: Relation = []
    matched_right_keys: set = set()

    # Process left side
    for l in left:
        l_key = tuple(parser.get_field_value(l, lk) for lk, _ in on)

        # Skip rows with null join keys for inner join
        if not all(v is not None for v in l_key):
            if how in ("left", "outer"):
                # Include unmatched left rows for left/outer joins
                joined.append(merge_rows(l, None))
            continue

        matches = right_index.get(l_key, [])

        if matches:
            matched_right_keys.add(l_key)
            for r in matches:
                joined.append(merge_rows(l, r))
        elif how in ("left", "outer"):
            # No match but include left row for left/outer joins
            joined.append(merge_rows(l, None))

    # For right and outer joins, add unmatched right rows
    if how in ("right", "outer"):
        for r in right:
            r_key = tuple(parser.get_field_value(r, rk) for _, rk in on)
            if all(v is not None for v in r_key) and r_key not in matched_right_keys:
                joined.append(merge_rows(None, r))

    return joined

product(left, right)

Cartesian product; colliding keys from right are prefixed with b_.

Source code in ja/core.py
def product(left: Relation, right: Relation) -> Relation:
    """Cartesian product; colliding keys from *right* are prefixed with ``b_``."""
    result: Relation = []
    for l in left:
        for r in right:
            merged = l.copy()
            for k, v in r.items():
                if k in merged:
                    merged[f"b_{k}"] = v
                else:
                    merged[k] = v
            result.append(merged)
    return result

rename(data, mapping)

Rename fields in each row.

Parameters:

Name Type Description Default
data Relation

List of dictionaries

required
mapping Dict[str, str]

Dictionary mapping old names to new names

required

Returns:

Type Description
Relation

List with renamed fields

Source code in ja/core.py
def rename(data: Relation, mapping: Dict[str, str]) -> Relation:
    """Rename fields in each row.

    Args:
        data: List of dictionaries
        mapping: Dictionary mapping old names to new names

    Returns:
        List with renamed fields
    """
    result = []
    for row in data:
        new_row = {}
        for key, value in row.items():
            new_key = mapping.get(key, key)
            new_row[new_key] = value
        result.append(new_row)
    return result

union(left, right)

Compute the union of two collections.

Parameters:

Name Type Description Default
left Relation

First collection

required
right Relation

Second collection

required

Returns:

Type Description
Relation

Union of the two collections

Source code in ja/core.py
def union(
    left: Relation, right: Relation
) -> Relation:
    """Compute the union of two collections.

    Args:
        left: First collection
        right: Second collection

    Returns:
        Union of the two collections
    """
    return left + right

intersection(left, right)

Compute the intersection of two collections.

Parameters:

Name Type Description Default
left Relation

First collection

required
right Relation

Second collection

required

Returns:

Type Description
Relation

Intersection of the two collections

Source code in ja/core.py
def intersection(
    left: Relation, right: Relation
) -> Relation:
    """Compute the intersection of two collections.

    Args:
        left: First collection
        right: Second collection

    Returns:
        Intersection of the two collections
    """
    # Convert right to a set of tuples for efficient lookup
    right_set = {tuple(sorted(row.items())) for row in right}

    result = []
    for row in left:
        if tuple(sorted(row.items())) in right_set:
            result.append(row)

    return result

difference(left, right)

Compute the difference of two collections.

Parameters:

Name Type Description Default
left Relation

First collection

required
right Relation

Second collection

required

Returns:

Type Description
Relation

Elements in left but not in right

Source code in ja/core.py
def difference(
    left: Relation, right: Relation
) -> Relation:
    """Compute the difference of two collections.

    Args:
        left: First collection
        right: Second collection

    Returns:
        Elements in left but not in right
    """
    # Convert right to a set of tuples for efficient lookup
    right_set = {tuple(sorted(row.items())) for row in right}

    result = []
    for row in left:
        if tuple(sorted(row.items())) not in right_set:
            result.append(row)

    return result

distinct(data)

Remove duplicate rows from a collection.

Parameters:

Name Type Description Default
data Relation

List of dictionaries

required

Returns:

Type Description
Relation

List with duplicates removed

Source code in ja/core.py
def distinct(data: Relation) -> Relation:
    """Remove duplicate rows from a collection.

    Args:
        data: List of dictionaries

    Returns:
        List with duplicates removed
    """
    seen = set()
    result = []

    for row in data:
        # Convert to tuple for hashability
        row_tuple = tuple(sorted(row.items()))
        if row_tuple not in seen:
            seen.add(row_tuple)
            result.append(row)

    return result

collect(data)

Collect metadata-grouped rows into actual groups.

This function takes rows with _groups metadata (from groupby operations) and collects them into explicit groups. Each output row represents one group with all its members in a _rows array.

Parameters:

Name Type Description Default
data Relation

List of dictionaries with _groups metadata

required

Returns:

Type Description
Relation

List where each dict represents a group with _rows array

Example

Input: [ {"id": 1, "region": "North", "_groups": [{"field": "region", "value": "North"}]}, {"id": 2, "region": "North", "_groups": [{"field": "region", "value": "North"}]}, {"id": 3, "region": "South", "_groups": [{"field": "region", "value": "South"}]} ]

Output: [ {"region": "North", "_rows": [{"id": 1, "region": "North"}, {"id": 2, "region": "North"}]}, {"region": "South", "_rows": [{"id": 3, "region": "South"}]} ]

Source code in ja/core.py
def collect(data: Relation) -> Relation:
    """Collect metadata-grouped rows into actual groups.

    This function takes rows with _groups metadata (from groupby operations)
    and collects them into explicit groups. Each output row represents one
    group with all its members in a _rows array.

    Args:
        data: List of dictionaries with _groups metadata

    Returns:
        List where each dict represents a group with _rows array

    Example:
        Input:
        [
            {"id": 1, "region": "North", "_groups": [{"field": "region", "value": "North"}]},
            {"id": 2, "region": "North", "_groups": [{"field": "region", "value": "North"}]},
            {"id": 3, "region": "South", "_groups": [{"field": "region", "value": "South"}]}
        ]

        Output:
        [
            {"region": "North", "_rows": [{"id": 1, "region": "North"}, {"id": 2, "region": "North"}]},
            {"region": "South", "_rows": [{"id": 3, "region": "South"}]}
        ]
    """
    if not data:
        return []

    # Check if data has grouping metadata
    if "_groups" not in data[0]:
        # No grouping metadata - treat entire dataset as one group
        return [{"_rows": data}]

    # Collect rows by their group keys
    groups = defaultdict(list)

    for row in data:
        # Build group key from metadata
        group_key = tuple((g["field"], g["value"]) for g in row["_groups"])

        # Create clean row without metadata
        clean_row = {k: v for k, v in row.items() if not k.startswith("_")}

        groups[group_key].append(clean_row)

    # Build output with one row per group
    result = []
    for group_key, rows in groups.items():
        group_dict = {}

        # Add group fields to output
        for field, value in group_key:
            group_dict[field] = value

        # Add collected rows
        group_dict["_rows"] = rows

        result.append(group_dict)

    return result

ja.cli

Command-line interface for JSONL algebra operations.

This module provides the main CLI entry point and argument parsing for all JSONL algebra operations including relational algebra, schema inference, data import/export, and interactive REPL mode.

Functions

json_error(error_type, message, details=None, exit_code=1)

Output error in JSON format and exit.

Parameters:

Name Type Description Default
error_type

Type of error (e.g., "ParseError", "IOError")

required
message

Human-readable error message

required
details

Optional dict with additional error details

None
exit_code

Exit code (default: 1)

1
Source code in ja/cli.py
def json_error(error_type, message, details=None, exit_code=1):
    """Output error in JSON format and exit.

    Args:
        error_type: Type of error (e.g., "ParseError", "IOError")
        message: Human-readable error message
        details: Optional dict with additional error details
        exit_code: Exit code (default: 1)
    """
    error_obj = {"error": {"type": error_type, "message": message}}
    if details:
        error_obj["error"]["details"] = details

    # If stderr is not a tty (redirected/piped), always output JSON
    # If stderr is a tty, output human-readable unless JA_JSON_ERRORS is set
    if not sys.stderr.isatty() or os.environ.get("JA_JSON_ERRORS"):
        print(json.dumps(error_obj), file=sys.stderr)
    else:
        # Human-readable format for terminal
        print(f"ja: error: {message}", file=sys.stderr)
        if details:
            for key, value in details.items():
                if value is not None and key != "traceback":
                    print(f"  {key}: {value}", file=sys.stderr)

    sys.exit(exit_code)

handle_export_command_group(args)

Handle export subcommands by delegating to appropriate handlers.

Parameters:

Name Type Description Default
args

Parsed command line arguments with export_cmd attribute.

required
Source code in ja/cli.py
def handle_export_command_group(args):
    """Handle export subcommands by delegating to appropriate handlers.

    Args:
        args: Parsed command line arguments with export_cmd attribute.
    """
    export_command_handlers = {
        "array": handle_to_array,
        "jsonl": handle_to_jsonl,
        "explode": handle_explode,
        "csv": handle_to_csv,
    }
    handler = export_command_handlers.get(args.export_cmd)
    if handler:
        handler(args)
    else:
        # This should not be reached if subcommands are handled correctly in main
        print(f"Unknown export command: {args.export_cmd}", file=sys.stderr)
        sys.exit(1)

handle_import_command_group(args)

Handle import subcommands by delegating to appropriate handlers.

Parameters:

Name Type Description Default
args

Parsed command line arguments with import_cmd attribute.

required
Source code in ja/cli.py
def handle_import_command_group(args):
    """Handle import subcommands by delegating to appropriate handlers.

    Args:
        args: Parsed command line arguments with import_cmd attribute.
    """
    import_command_handlers = {
        "csv": handle_import_csv,
        "implode": handle_implode,
    }
    handler = import_command_handlers.get(args.import_cmd)
    if handler:
        handler(args)
    else:
        # This should not be reached if subcommands are handled correctly in main
        print(f"Unknown import command: {args.import_cmd}", file=sys.stderr)
        sys.exit(1)

ja.repl

Interactive REPL (Read-Eval-Print Loop) for JSONL algebra operations.

This module provides a powerful, interactive shell for JSONL data manipulation with named datasets, immediate execution, and a non-destructive design.

Key features: - Named datasets: Load and manage multiple JSONL files by name - Safe operations: All transformations require unique output names - Immediate execution: See results right away, no pipeline building - File-based streaming: Store paths, not data (memory efficient) - Shell integration: Execute bash commands with !

Classes

ReplSession

Interactive REPL session for JSONL data manipulation.

This class manages a workspace of named datasets (JSONL files) and provides commands for loading, transforming, and exploring data interactively.

Design principles: - Non-destructive: Operations create new datasets, never modify originals - Explicit: All operations require unique output names - Streaming: Store file paths, not data in memory

Source code in ja/repl.py
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
class ReplSession:
    """Interactive REPL session for JSONL data manipulation.

    This class manages a workspace of named datasets (JSONL files) and provides
    commands for loading, transforming, and exploring data interactively.

    Design principles:
    - Non-destructive: Operations create new datasets, never modify originals
    - Explicit: All operations require unique output names
    - Streaming: Store file paths, not data in memory
    """

    def __init__(self):
        """Initialize the REPL session."""
        self.datasets: Dict[str, str] = {}  # name -> file_path
        self.current_dataset: Optional[str] = None
        self.settings = {
            "window_size": 20,  # Default preview limit
        }
        self.temp_dir = tempfile.mkdtemp(prefix="ja_repl_")
        self.temp_counter = 0

    def _get_temp_file(self, name: str) -> str:
        """Generate a unique temp file path for a dataset name."""
        self.temp_counter += 1
        return os.path.join(self.temp_dir, f"{name}_{self.temp_counter}.jsonl")

    def _check_name_conflict(self, name: str) -> None:
        """Raise error if dataset name already exists."""
        if name in self.datasets:
            raise ValueError(
                f"Dataset '{name}' already exists. Use a different name."
            )

    def _require_current(self) -> str:
        """Raise error if no current dataset is set."""
        if self.current_dataset is None:
            raise ValueError(
                "No current dataset. Use 'load <file>' or 'cd <name>' first."
            )
        return self.current_dataset

    def _execute_ja_command(self, cmd_parts: list) -> subprocess.CompletedProcess:
        """Execute a ja command and return the result."""
        cmd = shlex.join(cmd_parts)
        try:
            result = subprocess.run(
                cmd,
                shell=True,
                capture_output=True,
                text=True,
                check=False,
            )
            return result
        except Exception as e:
            raise RuntimeError(f"Failed to execute command: {e}")

    # ==================== Command Handlers ====================

    def handle_load(self, args):
        """Load a JSONL file into the workspace.

        Usage: load <file> [name]

        If name is not provided, uses the file stem (filename without extension).
        The loaded dataset becomes the current dataset.
        """
        if not args:
            print("Error: 'load' requires a file path.")
            print("Usage: load <file> [name]")
            return

        file_path = args[0]
        if not os.path.exists(file_path):
            print(f"Error: File not found: {file_path}")
            return

        # Determine dataset name
        if len(args) > 1:
            name = args[1]
        else:
            name = Path(file_path).stem

        # Check for conflicts
        try:
            self._check_name_conflict(name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        # Register the dataset
        self.datasets[name] = os.path.abspath(file_path)
        self.current_dataset = name
        print(f"Loaded: {name} (current)")
        print(f"  Path: {self.datasets[name]}")

    def handle_cd(self, args):
        """Switch to a different dataset.

        Usage: cd <name>
        """
        if not args:
            print("Error: 'cd' requires a dataset name.")
            print("Usage: cd <name>")
            return

        name = args[0]
        if name not in self.datasets:
            print(f"Error: Unknown dataset '{name}'.")
            print(f"Available datasets: {', '.join(self.datasets.keys())}")
            return

        self.current_dataset = name
        print(f"Current dataset: {name}")

    def handle_pwd(self, args):
        """Show the current dataset name and path.

        Usage: pwd
        Alias: current
        """
        if self.current_dataset is None:
            print("No current dataset.")
            return

        print(f"Current dataset: {self.current_dataset}")
        print(f"  Path: {self.datasets[self.current_dataset]}")

    def handle_current(self, args):
        """Alias for pwd."""
        self.handle_pwd(args)

    def handle_datasets(self, args):
        """List all registered datasets.

        Usage: datasets

        Shows all loaded datasets with a marker for the current one.
        """
        if not self.datasets:
            print("No datasets loaded.")
            return

        print("Registered datasets:")
        for name in sorted(self.datasets.keys()):
            marker = " (current)" if name == self.current_dataset else ""
            print(f"  {name}{marker}")
            print(f"    {self.datasets[name]}")

    def handle_save(self, args):
        """Save the current dataset to a file.

        Usage: save <file>

        Writes the current dataset to the specified file path.
        Does not register the file as a new dataset.
        """
        if not args:
            print("Error: 'save' requires a file path.")
            print("Usage: save <file>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = args[0]
        input_file = self.datasets[current]

        # Copy the current dataset to the output file
        try:
            with open(input_file, 'r') as inf, open(output_file, 'w') as outf:
                outf.write(inf.read())
            print(f"Saved {current} to: {output_file}")
        except Exception as e:
            print(f"Error saving file: {e}")

    def handle_ls(self, args):
        """Preview a dataset.

        Usage: ls [name] [--limit N]

        Shows the first N lines of the dataset (default: window_size setting).
        If name is omitted, shows the current dataset.
        """
        # Parse arguments
        dataset_name = None
        limit = self.settings["window_size"]

        i = 0
        while i < len(args):
            arg = args[i]
            if arg == "--limit":
                if i + 1 >= len(args):
                    print("Error: --limit requires a number.")
                    return
                try:
                    limit = int(args[i + 1])
                    i += 2
                except ValueError:
                    print(f"Error: Invalid limit value '{args[i + 1]}'.")
                    return
            elif arg.startswith("--limit="):
                try:
                    limit = int(arg.split("=", 1)[1])
                    i += 1
                except ValueError:
                    print(f"Error: Invalid limit value in '{arg}'.")
                    return
            else:
                dataset_name = arg
                i += 1

        # Determine which dataset to show
        if dataset_name is None:
            try:
                dataset_name = self._require_current()
            except ValueError as e:
                print(f"Error: {e}")
                return
        elif dataset_name not in self.datasets:
            print(f"Error: Unknown dataset '{dataset_name}'.")
            return

        file_path = self.datasets[dataset_name]

        # Use head to show first N lines
        try:
            result = subprocess.run(
                ["head", f"-n{limit}", file_path],
                capture_output=True,
                text=True,
                check=False,
            )
            if result.returncode == 0:
                print(result.stdout.rstrip())
            else:
                print(f"Error reading dataset: {result.stderr}")
        except Exception as e:
            print(f"Error: {e}")

    def handle_shell(self, args):
        """Execute a shell command.

        Usage: !<command>

        Passes the command directly to the shell.
        """
        # args is already the full command (without the !)
        cmd = " ".join(args)
        try:
            result = subprocess.run(
                cmd,
                shell=True,
                check=False,
            )
        except Exception as e:
            print(f"Error executing shell command: {e}")

    def handle_window_size(self, args):
        """Get or set the window size setting.

        Usage: window-size [N]

        Without arguments, shows the current value.
        With a number, sets the value.
        """
        if not args:
            print(f"window-size: {self.settings['window_size']}")
            return

        try:
            new_size = int(args[0])
            if new_size <= 0:
                print("Error: window-size must be a positive integer.")
                return
            self.settings["window_size"] = new_size
            print(f"window-size set to: {new_size}")
        except ValueError:
            print(f"Error: Invalid number '{args[0]}'.")

    def handle_info(self, args):
        """Show statistics and information about a dataset.

        Usage: info [name]

        If name is omitted, shows info for the current dataset.
        Displays: row count, file size, fields, and a sample row.
        """
        # Determine which dataset to show info for
        dataset_name = None
        if args:
            dataset_name = args[0]
            if dataset_name not in self.datasets:
                print(f"Error: Unknown dataset '{dataset_name}'.")
                return
        else:
            try:
                dataset_name = self._require_current()
            except ValueError as e:
                print(f"Error: {e}")
                return

        file_path = self.datasets[dataset_name]

        try:
            import json
            import os

            # Get file size
            file_size = os.path.getsize(file_path)
            if file_size < 1024:
                size_str = f"{file_size} B"
            elif file_size < 1024 * 1024:
                size_str = f"{file_size / 1024:.1f} KB"
            else:
                size_str = f"{file_size / (1024 * 1024):.1f} MB"

            # Count rows and collect field names
            row_count = 0
            all_fields = set()
            first_row = None

            with open(file_path, 'r') as f:
                for line in f:
                    if line.strip():
                        row_count += 1
                        try:
                            obj = json.loads(line)
                            if first_row is None:
                                first_row = obj
                            # Collect field names (flatten nested objects)
                            self._collect_fields(obj, all_fields)
                        except json.JSONDecodeError:
                            pass  # Skip malformed lines

            # Sort fields for consistent display
            fields = sorted(all_fields)

            # Display info
            print(f"\nDataset: {dataset_name}")
            print(f"Path: {file_path}")
            print(f"Rows: {row_count:,}")
            print(f"Size: {size_str}")

            if fields:
                # Limit field display to avoid clutter
                if len(fields) <= 20:
                    print(f"Fields: {', '.join(fields)}")
                else:
                    print(f"Fields ({len(fields)} total): {', '.join(fields[:20])}, ...")

            if first_row:
                print(f"\nSample (first row):")
                print(f"  {json.dumps(first_row, indent=2)}")

            print()

        except FileNotFoundError:
            print(f"Error: File not found: {file_path}")
        except Exception as e:
            print(f"Error reading dataset: {e}")

    def _collect_fields(self, obj, field_set, prefix=""):
        """Recursively collect field names from a JSON object.

        Nested fields are represented with dot notation (e.g., 'user.name').
        """
        if isinstance(obj, dict):
            for key, value in obj.items():
                full_key = f"{prefix}.{key}" if prefix else key
                field_set.add(full_key)
                if isinstance(value, dict):
                    self._collect_fields(value, field_set, full_key)
                elif isinstance(value, list) and value and isinstance(value[0], dict):
                    # For arrays of objects, show the array field plus nested fields
                    self._collect_fields(value[0], field_set, full_key)
        elif isinstance(obj, list) and obj and isinstance(obj[0], dict):
            self._collect_fields(obj[0], field_set, prefix)

    # ==================== Unary Operations ====================

    def handle_select(self, args):
        """Filter rows with an expression.

        Usage: select '<expr>' <output_name>

        Creates a new dataset with filtered rows.
        """
        if len(args) < 2:
            print("Error: 'select' requires an expression and output name.")
            print("Usage: select '<expr>' <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        expr = args[0]
        output_name = args[1]

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        # Create temp file for output
        output_file = self._get_temp_file(output_name)
        input_file = self.datasets[current]

        # Execute: ja select '<expr>' <input> > <output>
        cmd_parts = ["ja", "select", expr, input_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            # Save output to temp file
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_project(self, args):
        """Select specific fields.

        Usage: project <fields> <output_name>

        Creates a new dataset with only the specified fields.
        """
        if len(args) < 2:
            print("Error: 'project' requires fields and output name.")
            print("Usage: project <fields> <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        fields = args[0]
        output_name = args[1]

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        input_file = self.datasets[current]

        cmd_parts = ["ja", "project", fields, input_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_rename(self, args):
        """Rename fields.

        Usage: rename <mapping> <output_name>

        Example: rename old=new,foo=bar output
        """
        if len(args) < 2:
            print("Error: 'rename' requires a mapping and output name.")
            print("Usage: rename <old=new,...> <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        mapping = args[0]
        output_name = args[1]

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        input_file = self.datasets[current]

        cmd_parts = ["ja", "rename", mapping, input_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_distinct(self, args):
        """Remove duplicate rows.

        Usage: distinct <output_name>
        """
        if len(args) < 1:
            print("Error: 'distinct' requires an output name.")
            print("Usage: distinct <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_name = args[0]

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        input_file = self.datasets[current]

        cmd_parts = ["ja", "distinct", input_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_sort(self, args):
        """Sort rows by key(s).

        Usage: sort <keys> [--desc] <output_name>

        Example: sort age,name output
        Example: sort age --desc output
        """
        if len(args) < 2:
            print("Error: 'sort' requires keys and output name.")
            print("Usage: sort <keys> [--desc] <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        # Parse args: keys, optional --desc, output_name
        keys = args[0]
        desc = False
        output_name = args[-1]

        if "--desc" in args:
            desc = True

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        input_file = self.datasets[current]

        cmd_parts = ["ja", "sort", keys, input_file]
        if desc:
            cmd_parts.append("--desc")

        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_groupby(self, args):
        """Group rows by a key.

        Usage: groupby <key> [--agg <spec>] <output_name>

        Example: groupby region output
        Example: groupby region --agg count,sum(amount) output
        """
        if len(args) < 2:
            print("Error: 'groupby' requires a key and output name.")
            print("Usage: groupby <key> [--agg <spec>] <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        key = args[0]
        output_name = args[-1]

        # Check for --agg
        agg_spec = None
        if "--agg" in args:
            agg_idx = args.index("--agg")
            if agg_idx + 1 < len(args) - 1:  # -1 because last is output_name
                agg_spec = args[agg_idx + 1]

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        input_file = self.datasets[current]

        cmd_parts = ["ja", "groupby", key, input_file]
        if agg_spec:
            cmd_parts.extend(["--agg", agg_spec])

        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    # ==================== Binary Operations ====================

    def handle_join(self, args):
        """Join with another dataset.

        Usage: join <dataset_name> --on <mapping> <output_name>

        Example: join orders --on user_id=id user_orders
        """
        if len(args) < 4 or "--on" not in args:
            print("Error: 'join' requires a dataset, --on mapping, and output name.")
            print("Usage: join <dataset> --on <mapping> <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        right_name = args[0]
        if right_name not in self.datasets:
            print(f"Error: Unknown dataset '{right_name}'.")
            return

        on_idx = args.index("--on")
        on_mapping = args[on_idx + 1]
        output_name = args[-1]

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        left_file = self.datasets[current]
        right_file = self.datasets[right_name]

        cmd_parts = ["ja", "join", left_file, right_file, "--on", on_mapping]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_union(self, args):
        """Union with another dataset.

        Usage: union <dataset_name> <output_name>
        """
        if len(args) < 2:
            print("Error: 'union' requires a dataset name and output name.")
            print("Usage: union <dataset> <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        right_name = args[0]
        output_name = args[1]

        if right_name not in self.datasets:
            print(f"Error: Unknown dataset '{right_name}'.")
            return

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        left_file = self.datasets[current]
        right_file = self.datasets[right_name]

        cmd_parts = ["ja", "union", left_file, right_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_intersection(self, args):
        """Intersection with another dataset.

        Usage: intersection <dataset_name> <output_name>
        """
        if len(args) < 2:
            print("Error: 'intersection' requires a dataset name and output name.")
            print("Usage: intersection <dataset> <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        right_name = args[0]
        output_name = args[1]

        if right_name not in self.datasets:
            print(f"Error: Unknown dataset '{right_name}'.")
            return

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        left_file = self.datasets[current]
        right_file = self.datasets[right_name]

        cmd_parts = ["ja", "intersection", left_file, right_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_difference(self, args):
        """Difference with another dataset.

        Usage: difference <dataset_name> <output_name>
        """
        if len(args) < 2:
            print("Error: 'difference' requires a dataset name and output name.")
            print("Usage: difference <dataset> <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        right_name = args[0]
        output_name = args[1]

        if right_name not in self.datasets:
            print(f"Error: Unknown dataset '{right_name}'.")
            return

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        left_file = self.datasets[current]
        right_file = self.datasets[right_name]

        cmd_parts = ["ja", "difference", left_file, right_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_product(self, args):
        """Cartesian product with another dataset.

        Usage: product <dataset_name> <output_name>
        """
        if len(args) < 2:
            print("Error: 'product' requires a dataset name and output name.")
            print("Usage: product <dataset> <output_name>")
            return

        try:
            current = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

        right_name = args[0]
        output_name = args[1]

        if right_name not in self.datasets:
            print(f"Error: Unknown dataset '{right_name}'.")
            return

        try:
            self._check_name_conflict(output_name)
        except ValueError as e:
            print(f"Error: {e}")
            return

        output_file = self._get_temp_file(output_name)
        left_file = self.datasets[current]
        right_file = self.datasets[right_name]

        cmd_parts = ["ja", "product", left_file, right_file]
        result = self._execute_ja_command(cmd_parts)

        if result.returncode == 0:
            with open(output_file, 'w') as f:
                f.write(result.stdout)

            self.datasets[output_name] = output_file
            self.current_dataset = output_name
            print(f"Created: {output_name} (current)")
        else:
            print(f"Error: {result.stderr}")

    def handle_help(self, args):
        """Display help message."""
        help_text = """
JSONL Algebra REPL - Interactive Data Manipulation

DATASET MANAGEMENT:
  load <file> [name]           Load a JSONL file (default name: file stem)
  cd <name>                    Switch to a dataset
  pwd / current                Show current dataset
  datasets                     List all registered datasets
  info [name]                  Show dataset statistics (rows, size, fields)
  save <file>                  Save current dataset to file

UNARY OPERATIONS (operate on current dataset):
  select '<expr>' <output>     Filter rows with expression
  project <fields> <output>    Select specific fields (comma-separated)
  rename <old=new> <output>    Rename fields
  distinct <output>            Remove duplicates
  sort <keys> [--desc] <out>   Sort by keys
  groupby <key> [--agg <spec>] <output>
                               Group and optionally aggregate

BINARY OPERATIONS (combine current with another dataset):
  join <dataset> --on <map> <output>
                               Join datasets on keys
  union <dataset> <output>     Union of datasets
  intersection <dataset> <out> Intersection of datasets
  difference <dataset> <out>   Difference of datasets
  product <dataset> <output>   Cartesian product

VIEWING & EXPLORATION:
  ls [name] [--limit N]        Preview dataset (default: current)
  !<command>                   Execute shell command

SETTINGS:
  window-size [N]              Get/set preview window size

META:
  help                         Show this help
  exit                         Quit REPL

NOTES:
- All operations create NEW datasets with unique names
- Use dot notation for nested fields (e.g., user.name)
- Current dataset is used as input for operations
- Operations automatically switch to the new output dataset

EXAMPLES:
  ja> load users.jsonl
  ja> select 'age > 30' adults
  ja> project name,email adults_contact
  ja> load orders.jsonl
  ja> cd adults_contact
  ja> join orders --on user_id=id final
  ja> ls --limit 5
  ja> save results.jsonl
"""
        print(help_text)

    def parse_command(self, line: str):
        """Parse a command line into command and arguments."""
        try:
            parts = shlex.split(line)
        except ValueError as e:
            print(f"Error parsing command: {e}")
            return None, None

        if not parts:
            return None, None

        command = parts[0].lower()
        args = parts[1:]
        return command, args

    def process(self, line: str):
        """Process a single command line."""
        if not line or line.strip() == "":
            return

        # Handle shell commands
        if line.startswith("!"):
            cmd = line[1:].strip()
            self.handle_shell(shlex.split(cmd))
            return

        command, args = self.parse_command(line)
        if command is None:
            return

        # Command routing
        handlers = {
            "load": self.handle_load,
            "cd": self.handle_cd,
            "pwd": self.handle_pwd,
            "current": self.handle_current,
            "datasets": self.handle_datasets,
            "info": self.handle_info,
            "save": self.handle_save,
            "ls": self.handle_ls,
            "window-size": self.handle_window_size,
            "select": self.handle_select,
            "project": self.handle_project,
            "rename": self.handle_rename,
            "distinct": self.handle_distinct,
            "sort": self.handle_sort,
            "groupby": self.handle_groupby,
            "join": self.handle_join,
            "union": self.handle_union,
            "intersection": self.handle_intersection,
            "difference": self.handle_difference,
            "product": self.handle_product,
            "help": self.handle_help,
            "exit": lambda _: sys.exit(0),
        }

        handler = handlers.get(command)
        if handler:
            try:
                handler(args)
            except Exception as e:
                print(f"Error: {e}")
                import traceback
                traceback.print_exc()
        else:
            print(f"Unknown command: '{command}'. Type 'help' for available commands.")

    def run(self, initial_args=None):
        """Run the REPL main loop."""
        print("Welcome to ja REPL. Type 'help' for commands, 'exit' to quit.")

        # Handle initial args (e.g., ja repl data.jsonl)
        if initial_args and len(initial_args) > 0:
            # Auto-load the file
            initial_line = f"load {shlex.join(initial_args)}"
            self.process(initial_line)

        while True:
            try:
                line = input("ja> ").strip()
                self.process(line)
            except EOFError:
                print("\nExiting...")
                sys.exit(0)
            except KeyboardInterrupt:
                print("\nInterrupted. Type 'exit' or Ctrl-D to quit.")
Functions
__init__()

Initialize the REPL session.

Source code in ja/repl.py
def __init__(self):
    """Initialize the REPL session."""
    self.datasets: Dict[str, str] = {}  # name -> file_path
    self.current_dataset: Optional[str] = None
    self.settings = {
        "window_size": 20,  # Default preview limit
    }
    self.temp_dir = tempfile.mkdtemp(prefix="ja_repl_")
    self.temp_counter = 0
handle_load(args)

Load a JSONL file into the workspace.

Usage: load [name]

If name is not provided, uses the file stem (filename without extension). The loaded dataset becomes the current dataset.

Source code in ja/repl.py
def handle_load(self, args):
    """Load a JSONL file into the workspace.

    Usage: load <file> [name]

    If name is not provided, uses the file stem (filename without extension).
    The loaded dataset becomes the current dataset.
    """
    if not args:
        print("Error: 'load' requires a file path.")
        print("Usage: load <file> [name]")
        return

    file_path = args[0]
    if not os.path.exists(file_path):
        print(f"Error: File not found: {file_path}")
        return

    # Determine dataset name
    if len(args) > 1:
        name = args[1]
    else:
        name = Path(file_path).stem

    # Check for conflicts
    try:
        self._check_name_conflict(name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    # Register the dataset
    self.datasets[name] = os.path.abspath(file_path)
    self.current_dataset = name
    print(f"Loaded: {name} (current)")
    print(f"  Path: {self.datasets[name]}")
handle_cd(args)

Switch to a different dataset.

Usage: cd

Source code in ja/repl.py
def handle_cd(self, args):
    """Switch to a different dataset.

    Usage: cd <name>
    """
    if not args:
        print("Error: 'cd' requires a dataset name.")
        print("Usage: cd <name>")
        return

    name = args[0]
    if name not in self.datasets:
        print(f"Error: Unknown dataset '{name}'.")
        print(f"Available datasets: {', '.join(self.datasets.keys())}")
        return

    self.current_dataset = name
    print(f"Current dataset: {name}")
handle_pwd(args)

Show the current dataset name and path.

Usage: pwd Alias: current

Source code in ja/repl.py
def handle_pwd(self, args):
    """Show the current dataset name and path.

    Usage: pwd
    Alias: current
    """
    if self.current_dataset is None:
        print("No current dataset.")
        return

    print(f"Current dataset: {self.current_dataset}")
    print(f"  Path: {self.datasets[self.current_dataset]}")
handle_current(args)

Alias for pwd.

Source code in ja/repl.py
def handle_current(self, args):
    """Alias for pwd."""
    self.handle_pwd(args)
handle_datasets(args)

List all registered datasets.

Usage: datasets

Shows all loaded datasets with a marker for the current one.

Source code in ja/repl.py
def handle_datasets(self, args):
    """List all registered datasets.

    Usage: datasets

    Shows all loaded datasets with a marker for the current one.
    """
    if not self.datasets:
        print("No datasets loaded.")
        return

    print("Registered datasets:")
    for name in sorted(self.datasets.keys()):
        marker = " (current)" if name == self.current_dataset else ""
        print(f"  {name}{marker}")
        print(f"    {self.datasets[name]}")
handle_save(args)

Save the current dataset to a file.

Usage: save

Writes the current dataset to the specified file path. Does not register the file as a new dataset.

Source code in ja/repl.py
def handle_save(self, args):
    """Save the current dataset to a file.

    Usage: save <file>

    Writes the current dataset to the specified file path.
    Does not register the file as a new dataset.
    """
    if not args:
        print("Error: 'save' requires a file path.")
        print("Usage: save <file>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = args[0]
    input_file = self.datasets[current]

    # Copy the current dataset to the output file
    try:
        with open(input_file, 'r') as inf, open(output_file, 'w') as outf:
            outf.write(inf.read())
        print(f"Saved {current} to: {output_file}")
    except Exception as e:
        print(f"Error saving file: {e}")
handle_ls(args)

Preview a dataset.

Usage: ls [name][--limit N]

Shows the first N lines of the dataset (default: window_size setting). If name is omitted, shows the current dataset.

Source code in ja/repl.py
def handle_ls(self, args):
    """Preview a dataset.

    Usage: ls [name] [--limit N]

    Shows the first N lines of the dataset (default: window_size setting).
    If name is omitted, shows the current dataset.
    """
    # Parse arguments
    dataset_name = None
    limit = self.settings["window_size"]

    i = 0
    while i < len(args):
        arg = args[i]
        if arg == "--limit":
            if i + 1 >= len(args):
                print("Error: --limit requires a number.")
                return
            try:
                limit = int(args[i + 1])
                i += 2
            except ValueError:
                print(f"Error: Invalid limit value '{args[i + 1]}'.")
                return
        elif arg.startswith("--limit="):
            try:
                limit = int(arg.split("=", 1)[1])
                i += 1
            except ValueError:
                print(f"Error: Invalid limit value in '{arg}'.")
                return
        else:
            dataset_name = arg
            i += 1

    # Determine which dataset to show
    if dataset_name is None:
        try:
            dataset_name = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return
    elif dataset_name not in self.datasets:
        print(f"Error: Unknown dataset '{dataset_name}'.")
        return

    file_path = self.datasets[dataset_name]

    # Use head to show first N lines
    try:
        result = subprocess.run(
            ["head", f"-n{limit}", file_path],
            capture_output=True,
            text=True,
            check=False,
        )
        if result.returncode == 0:
            print(result.stdout.rstrip())
        else:
            print(f"Error reading dataset: {result.stderr}")
    except Exception as e:
        print(f"Error: {e}")
handle_shell(args)

Execute a shell command.

Usage: !

Passes the command directly to the shell.

Source code in ja/repl.py
def handle_shell(self, args):
    """Execute a shell command.

    Usage: !<command>

    Passes the command directly to the shell.
    """
    # args is already the full command (without the !)
    cmd = " ".join(args)
    try:
        result = subprocess.run(
            cmd,
            shell=True,
            check=False,
        )
    except Exception as e:
        print(f"Error executing shell command: {e}")
handle_window_size(args)

Get or set the window size setting.

Usage: window-size [N]

Without arguments, shows the current value. With a number, sets the value.

Source code in ja/repl.py
def handle_window_size(self, args):
    """Get or set the window size setting.

    Usage: window-size [N]

    Without arguments, shows the current value.
    With a number, sets the value.
    """
    if not args:
        print(f"window-size: {self.settings['window_size']}")
        return

    try:
        new_size = int(args[0])
        if new_size <= 0:
            print("Error: window-size must be a positive integer.")
            return
        self.settings["window_size"] = new_size
        print(f"window-size set to: {new_size}")
    except ValueError:
        print(f"Error: Invalid number '{args[0]}'.")
handle_info(args)

Show statistics and information about a dataset.

Usage: info [name]

If name is omitted, shows info for the current dataset. Displays: row count, file size, fields, and a sample row.

Source code in ja/repl.py
def handle_info(self, args):
    """Show statistics and information about a dataset.

    Usage: info [name]

    If name is omitted, shows info for the current dataset.
    Displays: row count, file size, fields, and a sample row.
    """
    # Determine which dataset to show info for
    dataset_name = None
    if args:
        dataset_name = args[0]
        if dataset_name not in self.datasets:
            print(f"Error: Unknown dataset '{dataset_name}'.")
            return
    else:
        try:
            dataset_name = self._require_current()
        except ValueError as e:
            print(f"Error: {e}")
            return

    file_path = self.datasets[dataset_name]

    try:
        import json
        import os

        # Get file size
        file_size = os.path.getsize(file_path)
        if file_size < 1024:
            size_str = f"{file_size} B"
        elif file_size < 1024 * 1024:
            size_str = f"{file_size / 1024:.1f} KB"
        else:
            size_str = f"{file_size / (1024 * 1024):.1f} MB"

        # Count rows and collect field names
        row_count = 0
        all_fields = set()
        first_row = None

        with open(file_path, 'r') as f:
            for line in f:
                if line.strip():
                    row_count += 1
                    try:
                        obj = json.loads(line)
                        if first_row is None:
                            first_row = obj
                        # Collect field names (flatten nested objects)
                        self._collect_fields(obj, all_fields)
                    except json.JSONDecodeError:
                        pass  # Skip malformed lines

        # Sort fields for consistent display
        fields = sorted(all_fields)

        # Display info
        print(f"\nDataset: {dataset_name}")
        print(f"Path: {file_path}")
        print(f"Rows: {row_count:,}")
        print(f"Size: {size_str}")

        if fields:
            # Limit field display to avoid clutter
            if len(fields) <= 20:
                print(f"Fields: {', '.join(fields)}")
            else:
                print(f"Fields ({len(fields)} total): {', '.join(fields[:20])}, ...")

        if first_row:
            print(f"\nSample (first row):")
            print(f"  {json.dumps(first_row, indent=2)}")

        print()

    except FileNotFoundError:
        print(f"Error: File not found: {file_path}")
    except Exception as e:
        print(f"Error reading dataset: {e}")
handle_select(args)

Filter rows with an expression.

Usage: select ''

Creates a new dataset with filtered rows.

Source code in ja/repl.py
def handle_select(self, args):
    """Filter rows with an expression.

    Usage: select '<expr>' <output_name>

    Creates a new dataset with filtered rows.
    """
    if len(args) < 2:
        print("Error: 'select' requires an expression and output name.")
        print("Usage: select '<expr>' <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    expr = args[0]
    output_name = args[1]

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    # Create temp file for output
    output_file = self._get_temp_file(output_name)
    input_file = self.datasets[current]

    # Execute: ja select '<expr>' <input> > <output>
    cmd_parts = ["ja", "select", expr, input_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        # Save output to temp file
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_project(args)

Select specific fields.

Usage: project

Creates a new dataset with only the specified fields.

Source code in ja/repl.py
def handle_project(self, args):
    """Select specific fields.

    Usage: project <fields> <output_name>

    Creates a new dataset with only the specified fields.
    """
    if len(args) < 2:
        print("Error: 'project' requires fields and output name.")
        print("Usage: project <fields> <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    fields = args[0]
    output_name = args[1]

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    input_file = self.datasets[current]

    cmd_parts = ["ja", "project", fields, input_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_rename(args)

Rename fields.

Usage: rename

Example: rename old=new,foo=bar output

Source code in ja/repl.py
def handle_rename(self, args):
    """Rename fields.

    Usage: rename <mapping> <output_name>

    Example: rename old=new,foo=bar output
    """
    if len(args) < 2:
        print("Error: 'rename' requires a mapping and output name.")
        print("Usage: rename <old=new,...> <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    mapping = args[0]
    output_name = args[1]

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    input_file = self.datasets[current]

    cmd_parts = ["ja", "rename", mapping, input_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_distinct(args)

Remove duplicate rows.

Usage: distinct

Source code in ja/repl.py
def handle_distinct(self, args):
    """Remove duplicate rows.

    Usage: distinct <output_name>
    """
    if len(args) < 1:
        print("Error: 'distinct' requires an output name.")
        print("Usage: distinct <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_name = args[0]

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    input_file = self.datasets[current]

    cmd_parts = ["ja", "distinct", input_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_sort(args)

Sort rows by key(s).

Usage: sort [--desc]

Example: sort age,name output Example: sort age --desc output

Source code in ja/repl.py
def handle_sort(self, args):
    """Sort rows by key(s).

    Usage: sort <keys> [--desc] <output_name>

    Example: sort age,name output
    Example: sort age --desc output
    """
    if len(args) < 2:
        print("Error: 'sort' requires keys and output name.")
        print("Usage: sort <keys> [--desc] <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    # Parse args: keys, optional --desc, output_name
    keys = args[0]
    desc = False
    output_name = args[-1]

    if "--desc" in args:
        desc = True

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    input_file = self.datasets[current]

    cmd_parts = ["ja", "sort", keys, input_file]
    if desc:
        cmd_parts.append("--desc")

    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_groupby(args)

Group rows by a key.

Usage: groupby [--agg ]

Example: groupby region output Example: groupby region --agg count,sum(amount) output

Source code in ja/repl.py
def handle_groupby(self, args):
    """Group rows by a key.

    Usage: groupby <key> [--agg <spec>] <output_name>

    Example: groupby region output
    Example: groupby region --agg count,sum(amount) output
    """
    if len(args) < 2:
        print("Error: 'groupby' requires a key and output name.")
        print("Usage: groupby <key> [--agg <spec>] <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    key = args[0]
    output_name = args[-1]

    # Check for --agg
    agg_spec = None
    if "--agg" in args:
        agg_idx = args.index("--agg")
        if agg_idx + 1 < len(args) - 1:  # -1 because last is output_name
            agg_spec = args[agg_idx + 1]

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    input_file = self.datasets[current]

    cmd_parts = ["ja", "groupby", key, input_file]
    if agg_spec:
        cmd_parts.extend(["--agg", agg_spec])

    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_join(args)

Join with another dataset.

Usage: join --on

Example: join orders --on user_id=id user_orders

Source code in ja/repl.py
def handle_join(self, args):
    """Join with another dataset.

    Usage: join <dataset_name> --on <mapping> <output_name>

    Example: join orders --on user_id=id user_orders
    """
    if len(args) < 4 or "--on" not in args:
        print("Error: 'join' requires a dataset, --on mapping, and output name.")
        print("Usage: join <dataset> --on <mapping> <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    right_name = args[0]
    if right_name not in self.datasets:
        print(f"Error: Unknown dataset '{right_name}'.")
        return

    on_idx = args.index("--on")
    on_mapping = args[on_idx + 1]
    output_name = args[-1]

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    left_file = self.datasets[current]
    right_file = self.datasets[right_name]

    cmd_parts = ["ja", "join", left_file, right_file, "--on", on_mapping]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_union(args)

Union with another dataset.

Usage: union

Source code in ja/repl.py
def handle_union(self, args):
    """Union with another dataset.

    Usage: union <dataset_name> <output_name>
    """
    if len(args) < 2:
        print("Error: 'union' requires a dataset name and output name.")
        print("Usage: union <dataset> <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    right_name = args[0]
    output_name = args[1]

    if right_name not in self.datasets:
        print(f"Error: Unknown dataset '{right_name}'.")
        return

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    left_file = self.datasets[current]
    right_file = self.datasets[right_name]

    cmd_parts = ["ja", "union", left_file, right_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_intersection(args)

Intersection with another dataset.

Usage: intersection

Source code in ja/repl.py
def handle_intersection(self, args):
    """Intersection with another dataset.

    Usage: intersection <dataset_name> <output_name>
    """
    if len(args) < 2:
        print("Error: 'intersection' requires a dataset name and output name.")
        print("Usage: intersection <dataset> <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    right_name = args[0]
    output_name = args[1]

    if right_name not in self.datasets:
        print(f"Error: Unknown dataset '{right_name}'.")
        return

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    left_file = self.datasets[current]
    right_file = self.datasets[right_name]

    cmd_parts = ["ja", "intersection", left_file, right_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_difference(args)

Difference with another dataset.

Usage: difference

Source code in ja/repl.py
def handle_difference(self, args):
    """Difference with another dataset.

    Usage: difference <dataset_name> <output_name>
    """
    if len(args) < 2:
        print("Error: 'difference' requires a dataset name and output name.")
        print("Usage: difference <dataset> <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    right_name = args[0]
    output_name = args[1]

    if right_name not in self.datasets:
        print(f"Error: Unknown dataset '{right_name}'.")
        return

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    left_file = self.datasets[current]
    right_file = self.datasets[right_name]

    cmd_parts = ["ja", "difference", left_file, right_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_product(args)

Cartesian product with another dataset.

Usage: product

Source code in ja/repl.py
def handle_product(self, args):
    """Cartesian product with another dataset.

    Usage: product <dataset_name> <output_name>
    """
    if len(args) < 2:
        print("Error: 'product' requires a dataset name and output name.")
        print("Usage: product <dataset> <output_name>")
        return

    try:
        current = self._require_current()
    except ValueError as e:
        print(f"Error: {e}")
        return

    right_name = args[0]
    output_name = args[1]

    if right_name not in self.datasets:
        print(f"Error: Unknown dataset '{right_name}'.")
        return

    try:
        self._check_name_conflict(output_name)
    except ValueError as e:
        print(f"Error: {e}")
        return

    output_file = self._get_temp_file(output_name)
    left_file = self.datasets[current]
    right_file = self.datasets[right_name]

    cmd_parts = ["ja", "product", left_file, right_file]
    result = self._execute_ja_command(cmd_parts)

    if result.returncode == 0:
        with open(output_file, 'w') as f:
            f.write(result.stdout)

        self.datasets[output_name] = output_file
        self.current_dataset = output_name
        print(f"Created: {output_name} (current)")
    else:
        print(f"Error: {result.stderr}")
handle_help(args)

Display help message.

Source code in ja/repl.py
    def handle_help(self, args):
        """Display help message."""
        help_text = """
JSONL Algebra REPL - Interactive Data Manipulation

DATASET MANAGEMENT:
  load <file> [name]           Load a JSONL file (default name: file stem)
  cd <name>                    Switch to a dataset
  pwd / current                Show current dataset
  datasets                     List all registered datasets
  info [name]                  Show dataset statistics (rows, size, fields)
  save <file>                  Save current dataset to file

UNARY OPERATIONS (operate on current dataset):
  select '<expr>' <output>     Filter rows with expression
  project <fields> <output>    Select specific fields (comma-separated)
  rename <old=new> <output>    Rename fields
  distinct <output>            Remove duplicates
  sort <keys> [--desc] <out>   Sort by keys
  groupby <key> [--agg <spec>] <output>
                               Group and optionally aggregate

BINARY OPERATIONS (combine current with another dataset):
  join <dataset> --on <map> <output>
                               Join datasets on keys
  union <dataset> <output>     Union of datasets
  intersection <dataset> <out> Intersection of datasets
  difference <dataset> <out>   Difference of datasets
  product <dataset> <output>   Cartesian product

VIEWING & EXPLORATION:
  ls [name] [--limit N]        Preview dataset (default: current)
  !<command>                   Execute shell command

SETTINGS:
  window-size [N]              Get/set preview window size

META:
  help                         Show this help
  exit                         Quit REPL

NOTES:
- All operations create NEW datasets with unique names
- Use dot notation for nested fields (e.g., user.name)
- Current dataset is used as input for operations
- Operations automatically switch to the new output dataset

EXAMPLES:
  ja> load users.jsonl
  ja> select 'age > 30' adults
  ja> project name,email adults_contact
  ja> load orders.jsonl
  ja> cd adults_contact
  ja> join orders --on user_id=id final
  ja> ls --limit 5
  ja> save results.jsonl
"""
        print(help_text)
parse_command(line)

Parse a command line into command and arguments.

Source code in ja/repl.py
def parse_command(self, line: str):
    """Parse a command line into command and arguments."""
    try:
        parts = shlex.split(line)
    except ValueError as e:
        print(f"Error parsing command: {e}")
        return None, None

    if not parts:
        return None, None

    command = parts[0].lower()
    args = parts[1:]
    return command, args
process(line)

Process a single command line.

Source code in ja/repl.py
def process(self, line: str):
    """Process a single command line."""
    if not line or line.strip() == "":
        return

    # Handle shell commands
    if line.startswith("!"):
        cmd = line[1:].strip()
        self.handle_shell(shlex.split(cmd))
        return

    command, args = self.parse_command(line)
    if command is None:
        return

    # Command routing
    handlers = {
        "load": self.handle_load,
        "cd": self.handle_cd,
        "pwd": self.handle_pwd,
        "current": self.handle_current,
        "datasets": self.handle_datasets,
        "info": self.handle_info,
        "save": self.handle_save,
        "ls": self.handle_ls,
        "window-size": self.handle_window_size,
        "select": self.handle_select,
        "project": self.handle_project,
        "rename": self.handle_rename,
        "distinct": self.handle_distinct,
        "sort": self.handle_sort,
        "groupby": self.handle_groupby,
        "join": self.handle_join,
        "union": self.handle_union,
        "intersection": self.handle_intersection,
        "difference": self.handle_difference,
        "product": self.handle_product,
        "help": self.handle_help,
        "exit": lambda _: sys.exit(0),
    }

    handler = handlers.get(command)
    if handler:
        try:
            handler(args)
        except Exception as e:
            print(f"Error: {e}")
            import traceback
            traceback.print_exc()
    else:
        print(f"Unknown command: '{command}'. Type 'help' for available commands.")
run(initial_args=None)

Run the REPL main loop.

Source code in ja/repl.py
def run(self, initial_args=None):
    """Run the REPL main loop."""
    print("Welcome to ja REPL. Type 'help' for commands, 'exit' to quit.")

    # Handle initial args (e.g., ja repl data.jsonl)
    if initial_args and len(initial_args) > 0:
        # Auto-load the file
        initial_line = f"load {shlex.join(initial_args)}"
        self.process(initial_line)

    while True:
        try:
            line = input("ja> ").strip()
            self.process(line)
        except EOFError:
            print("\nExiting...")
            sys.exit(0)
        except KeyboardInterrupt:
            print("\nInterrupted. Type 'exit' or Ctrl-D to quit.")

Functions

repl(parsed_cli_args)

Entry point for the ja repl command.

Source code in ja/repl.py
def repl(parsed_cli_args):
    """Entry point for the ja repl command."""
    session = ReplSession()
    initial_args = getattr(parsed_cli_args, "initial_args", [])
    session.run(initial_args)

ja.commands

Command handlers for the JSONL algebra CLI.

This module connects the command-line interface to the core data processing functions. Each handle_* function is responsible for reading input data, calling the appropriate core function, and writing the results to stdout.

Functions

get_input_stream(file_path)

Yield a readable file-like object.

  • If file_path is None or '-', yield sys.stdin.
  • Otherwise open the given path for reading.
Source code in ja/commands.py
@contextmanager
def get_input_stream(file_path):
    """
    Yield a readable file-like object.

    - If file_path is None or '-', yield sys.stdin.
    - Otherwise open the given path for reading.
    """
    if file_path is not None and file_path != "-":
        f = open(file_path, "r")
        try:
            yield f
        finally:
            f.close()
    else:
        yield sys.stdin

read_jsonl(input_stream)

Read JSONL data from a file-like object.

Source code in ja/commands.py
def read_jsonl(input_stream) -> List[Dict[str, Any]]:
    """Read JSONL data from a file-like object."""
    return [json.loads(line) for line in input_stream]

write_jsonl(rows)

Write a collection of objects as JSONL to stdout.

Source code in ja/commands.py
def write_jsonl(rows: List[Dict[str, Any]]) -> None:
    """Write a collection of objects as JSONL to stdout."""
    for row in rows:
        print(json.dumps(row))

write_json_object(obj)

Write a single object as pretty-printed JSON to stdout.

Source code in ja/commands.py
def write_json_object(obj: Any) -> None:
    """Write a single object as pretty-printed JSON to stdout."""
    print(json.dumps(obj, indent=2))

json_error(error_type, message, details=None)

Print a JSON error message to stderr and exit.

Source code in ja/commands.py
def json_error(error_type: str, message: str, details: Dict[str, Any] = None) -> None:
    """Print a JSON error message to stderr and exit."""
    error_info = {
        "error": {
            "type": error_type,
            "message": message,
        }
    }
    if details:
        error_info["error"]["details"] = details
    print(json.dumps(error_info), file=sys.stderr)
    sys.exit(1)

handle_select(args)

Handle select command.

Source code in ja/commands.py
def handle_select(args):
    """Handle select command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    use_jmespath = hasattr(args, "jmespath") and args.jmespath

    try:
        result = select(data, args.expr, use_jmespath=use_jmespath)
        write_jsonl(result)
    except jmespath.exceptions.ParseError as e:
        json_error(
            "JMESPathParseError",
            f"Invalid JMESPath expression: {e}",
            {"expression": args.expr},
        )

handle_project(args)

Handle project command.

Source code in ja/commands.py
def handle_project(args):
    """Handle project command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    use_jmespath = hasattr(args, "jmespath") and args.jmespath

    try:
        result = project(data, args.expr, use_jmespath=use_jmespath)
        write_jsonl(result)
    except jmespath.exceptions.ParseError as e:
        json_error(
            "JMESPathParseError",
            f"Invalid JMESPath expression: {e}",
            {"expression": args.expr},
        )

handle_join(args)

Handle join command.

Source code in ja/commands.py
def handle_join(args):
    """Handle join command."""
    with get_input_stream(args.left) as f:
        left = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right = read_jsonl(f)

    lcol_str, rcol_str = args.on.split("=", 1)
    lcol = lcol_str.strip()
    rcol = rcol_str.strip()

    how = getattr(args, "how", "inner")
    result = join(left, right, [(lcol, rcol)], how=how)
    write_jsonl(result)

handle_product(args)

Handle product command.

Source code in ja/commands.py
def handle_product(args):
    """Handle product command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = product(left_data, right_data)
    write_jsonl(result)

handle_rename(args)

Handle rename command.

Source code in ja/commands.py
def handle_rename(args):
    """Handle rename command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    mapping_pairs = args.mapping.split(",")
    mapping = {}
    for pair_str in mapping_pairs:
        parts = pair_str.split("=", 1)
        if len(parts) == 2:
            old_name, new_name = parts
            mapping[old_name.strip()] = new_name.strip()
        else:
            print(
                f"Warning: Malformed rename pair '{pair_str.strip()}' ignored.",
                file=sys.stderr,
            )

    result = rename(data, mapping)
    write_jsonl(result)

handle_union(args)

Handle union command.

Source code in ja/commands.py
def handle_union(args):
    """Handle union command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = union(left_data, right_data)
    write_jsonl(result)

handle_intersection(args)

Handle intersection command.

Source code in ja/commands.py
def handle_intersection(args):
    """Handle intersection command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = intersection(left_data, right_data)
    write_jsonl(result)

handle_difference(args)

Handle difference command.

Source code in ja/commands.py
def handle_difference(args):
    """Handle difference command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = difference(left_data, right_data)
    write_jsonl(result)

handle_distinct(args)

Handle distinct command.

Source code in ja/commands.py
def handle_distinct(args):
    """Handle distinct command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    result = distinct(data)
    write_jsonl(result)

handle_sort(args)

Handle sort command.

Source code in ja/commands.py
def handle_sort(args):
    """Handle sort command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    result = sort_by(data, args.keys, descending=args.desc)
    write_jsonl(result)

handle_groupby(args)

Handle groupby command.

Source code in ja/commands.py
def handle_groupby(args):
    """Handle groupby command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    if hasattr(args, "agg") and args.agg:
        # Traditional groupby with aggregation
        result = groupby_agg(data, args.key, args.agg)
    else:
        # Check if input is already grouped - look for new format
        if data and "_groups" in data[0]:
            # This is a chained groupby
            result = groupby_chained(data, args.key)
        else:
            # First groupby
            result = groupby_with_metadata(data, args.key)

    write_jsonl(result)

handle_agg(args)

Handle agg command.

Source code in ja/commands.py
def handle_agg(args):
    """Handle agg command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    if not data:
        write_jsonl([])
        return

    # Check if input has group metadata - use new format
    if "_groups" in data[0]:
        # Process grouped data
        result = aggregate_grouped_data(data, args.agg)
    else:
        # Process ungrouped data
        result = [aggregate_single_group(data, args.agg)]

    write_jsonl(result)

handle_schema_infer(args)

Handle schema infer command.

Source code in ja/commands.py
def handle_schema_infer(args):
    """Handle schema infer command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    schema = infer_schema(data)
    write_json_object(schema)

handle_to_array(args)

Handle to-array command.

Source code in ja/commands.py
def handle_to_array(args):
    """Handle to-array command."""
    with get_input_stream(args.file) as input_stream:
        array_string = jsonl_to_json_array_string(input_stream)
        print(array_string)

handle_to_jsonl(args)

Handle to-jsonl command.

Source code in ja/commands.py
def handle_to_jsonl(args):
    """Handle to-jsonl command."""
    with get_input_stream(args.file) as input_stream:
        try:
            for line in json_array_to_jsonl_lines(input_stream):
                print(line)
        except ValueError as e:
            print(f"Error: {e}", file=sys.stderr)
            sys.exit(1)

handle_explode(args)

Handle explode command.

Source code in ja/commands.py
def handle_explode(args):
    """Handle explode command."""
    input_filename_stem = "jsonl_output"  # Default stem
    if args.file and args.file != "-":
        input_filename_stem = Path(args.file).stem

    output_directory = args.output_dir if args.output_dir else input_filename_stem

    with get_input_stream(args.file) as input_stream:
        try:
            jsonl_to_dir(input_stream, output_directory, input_filename_stem)
        except Exception as e:
            print(f"Error during explode operation: {e}", file=sys.stderr)
            sys.exit(1)

handle_implode(args)

Handle implode command.

Source code in ja/commands.py
def handle_implode(args):
    """Handle implode command."""
    try:
        for line in dir_to_jsonl_lines(
            args.input_dir, args.add_filename_key, args.recursive
        ):
            print(line)
    except ValueError as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"An unexpected error occurred during implode: {e}", file=sys.stderr)
        sys.exit(1)

handle_import_csv(args)

Handle import-csv command.

Source code in ja/commands.py
def handle_import_csv(args):
    """Handle import-csv command."""
    with get_input_stream(args.file) as input_stream:
        try:
            for line in csv_to_jsonl_lines(
                input_stream, has_header=args.has_header, infer_types=args.infer_types
            ):
                print(line)
        except Exception as e:
            print(
                f"An unexpected error occurred during CSV import: {e}", file=sys.stderr
            )
            sys.exit(1)

handle_to_csv(args)

Handle to-csv command.

Source code in ja/commands.py
def handle_to_csv(args):
    """Handle to-csv command."""
    column_functions = {}
    if args.apply:
        for col, expr_str in args.apply:
            try:
                # WARNING: eval() is a security risk if the expression is not from a trusted source.
                func = eval(expr_str)
                if not callable(func):
                    raise ValueError(
                        f"Expression for column '{col}' did not evaluate to a callable function."
                    )
                column_functions[col] = func
            except Exception as e:
                print(
                    f"Error parsing --apply expression for column '{col}': {e}",
                    file=sys.stderr,
                )
                sys.exit(1)

    with get_input_stream(args.file) as input_stream:
        try:
            jsonl_to_csv_stream(
                input_stream,
                sys.stdout,
                flatten=args.flatten,
                flatten_sep=args.flatten_sep,
                column_functions=column_functions,
            )
        except Exception as e:
            print(
                f"An unexpected error occurred during CSV export: {e}", file=sys.stderr
            )
            sys.exit(1)

handle_schema_validate(args)

Handle schema validate command.

Source code in ja/commands.py
def handle_schema_validate(args):
    """Handle schema validate command."""
    try:
        import jsonschema
    except ImportError:
        print(
            "jsonschema is not installed. Please install it with: pip install jsonschema",
            file=sys.stderr,
        )
        sys.exit(1)

    # Can't read both from stdin
    if args.schema == "-" and (not args.file or args.file == "-"):
        print(
            "Error: When reading schema from stdin, a file argument for the data to validate must be provided.",
            file=sys.stderr,
        )
        sys.exit(1)

    try:
        with get_input_stream(args.schema) as f:
            schema = json.load(f)
    except (IOError, json.JSONDecodeError) as e:
        print(
            f"Error reading or parsing schema file {args.schema}: {e}", file=sys.stderr
        )
        sys.exit(1)

    # If schema was from stdin, the file MUST be from a file, not stdin.
    data_source = args.file if args.schema == "-" else (args.file or "-")

    with get_input_stream(data_source) as lines:
        validation_failed = False
        for i, line in enumerate(lines, 1):
            try:
                instance = json.loads(line)
                jsonschema.validate(instance=instance, schema=schema)
                print(line.strip())
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON on line {i}: {e}", file=sys.stderr)
                validation_failed = True
            except jsonschema.exceptions.ValidationError as e:
                print(f"Validation error on line {i}: {e.message}", file=sys.stderr)
                validation_failed = True

    if validation_failed:
        sys.exit(1)

handle_collect(args)

Handle collect command.

Source code in ja/commands.py
def handle_collect(args):
    """Handle collect command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    if not data:
        write_jsonl([])
        return

    # Check for streaming flag
    if hasattr(args, "streaming") and args.streaming:
        json_error(
            "StreamingError",
            "Collect operation requires seeing all data and cannot be performed in streaming mode. "
            "Remove --streaming flag or use window-based processing with --window-size",
        )
        return

    # Handle window-based collection
    if hasattr(args, "window_size") and args.window_size:
        # Process data in windows
        window_size = args.window_size
        for i in range(0, len(data), window_size):
            window = data[i : i + window_size]
            result = collect(window)
            write_jsonl(result)
    else:
        # Collect all data at once
        result = collect(data)
        write_jsonl(result)

handle_window(args)

Handle window function command.

Source code in ja/commands.py
def handle_window(args):
    """Handle window function command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    if not data:
        write_jsonl([])
        return

    func_name = args.function
    partition_by = getattr(args, "partition_by", None)
    order_by = getattr(args, "order_by", None)
    output_field = getattr(args, "output_field", None)

    # Map function names to implementations
    window_funcs = {
        "row_number": row_number,
        "rank": rank,
        "dense_rank": dense_rank,
        "percent_rank": percent_rank,
        "cume_dist": cume_dist,
    }

    # Functions that need a field parameter
    field_funcs = {
        "lag": lag,
        "lead": lead,
        "first_value": first_value,
        "last_value": last_value,
    }

    if func_name in window_funcs:
        kwargs = {"partition_by": partition_by, "order_by": order_by}
        if output_field:
            kwargs["output_field"] = output_field
        result = window_funcs[func_name](data, **kwargs)

    elif func_name in field_funcs:
        field = getattr(args, "field", None)
        if not field:
            json_error("MissingArgument", f"{func_name} requires --field argument")
            return

        kwargs = {
            "field": field,
            "partition_by": partition_by,
            "order_by": order_by,
        }
        if output_field:
            kwargs["output_field"] = output_field

        if func_name in ("lag", "lead"):
            kwargs["offset"] = getattr(args, "offset", 1)
            default = getattr(args, "default", None)
            if default is not None:
                # Try to parse as JSON, fall back to string
                try:
                    kwargs["default"] = json.loads(default)
                except json.JSONDecodeError:
                    kwargs["default"] = default

        result = field_funcs[func_name](data, **kwargs)

    elif func_name == "ntile":
        n = getattr(args, "n", None)
        if not n:
            json_error("MissingArgument", "ntile requires --n argument")
            return

        kwargs = {"n": n, "partition_by": partition_by, "order_by": order_by}
        if output_field:
            kwargs["output_field"] = output_field
        result = ntile(data, **kwargs)

    else:
        json_error("UnknownFunction", f"Unknown window function: {func_name}")
        return

    write_jsonl(result)

ja.group

Grouping operations for JSONL algebra.

This module provides grouping functionality that supports both immediate aggregation and metadata-based chaining for multi-level grouping.

Classes

Functions

groupby_with_metadata(data, group_key)

Group data and add metadata fields.

This function enables chained groupby operations by adding special metadata fields to each row: - _groups: List of {field, value} objects representing the grouping hierarchy - _group_size: Total number of rows in this group - _group_index: This row's index within its group

Parameters:

Name Type Description Default
data Relation

List of dictionaries to group

required
group_key str

Field to group by (supports dot notation)

required

Returns:

Type Description
Relation

List with group metadata added to each row

Source code in ja/group.py
def groupby_with_metadata(data: Relation, group_key: str) -> Relation:
    """Group data and add metadata fields.

    This function enables chained groupby operations by adding special
    metadata fields to each row:
    - _groups: List of {field, value} objects representing the grouping hierarchy
    - _group_size: Total number of rows in this group
    - _group_index: This row's index within its group

    Args:
        data: List of dictionaries to group
        group_key: Field to group by (supports dot notation)

    Returns:
        List with group metadata added to each row
    """
    parser = ExprEval()

    # First pass: collect groups
    groups = defaultdict(list)
    for row in data:
        try:
            key_value = parser.get_field_value(row, group_key)
            groups[key_value].append(row)
        except Exception:
            key_value = json.dumps(key_value, ensure_ascii=False, sort_keys=True)
            groups[key_value].append(row)

    # Second pass: add metadata and flatten
    result = []

    for i, (group_value, group_rows) in enumerate(groups.items()):
        group_size = len(group_rows)
        for index, row in enumerate(group_rows):
            # Create new row with metadata
            new_row = row.copy()
            # Check if group_value is a serialized json value
            if isinstance(group_value, str):
                try:
                    group_value = json.loads(group_value)
                except json.JSONDecodeError:
                    pass
            new_row["_groups"] = [{"field": group_key, "value": group_value}]
            new_row["_group_size"] = group_size
            new_row["_group_index"] = index
            result.append(new_row)

    return result

groupby_chained(grouped_data, new_group_key)

Apply groupby to already-grouped data.

This function handles multi-level grouping by building on existing group metadata.

Parameters:

Name Type Description Default
grouped_data Relation

Data with existing group metadata

required
new_group_key str

Field to group by

required

Returns:

Type Description
Relation

List with nested group metadata

Source code in ja/group.py
def groupby_chained(grouped_data: Relation, new_group_key: str) -> Relation:
    """Apply groupby to already-grouped data.

    This function handles multi-level grouping by building on existing
    group metadata.

    Args:
        grouped_data: Data with existing group metadata
        new_group_key: Field to group by

    Returns:
        List with nested group metadata
    """
    parser = ExprEval()

    # Group within existing groups
    nested_groups = defaultdict(list)

    for row in grouped_data:
        # Get existing groups
        existing_groups = row.get("_groups", [])
        new_key_value = parser.get_field_value(row, new_group_key)

        # Create a tuple key for grouping (for internal use only)
        group_tuple = tuple((g["field"], g["value"]) for g in existing_groups)
        group_tuple += ((new_group_key, new_key_value),)

        try:
            nested_groups[group_tuple].append(row)
        except Exception:
            # Make group_tuple hashable
            group_tuple = tuple(map(str, group_tuple))
            nested_groups[group_tuple].append(row)

    # Add new metadata
    result = []
    for group_tuple, group_rows in nested_groups.items():
        group_size = len(group_rows)

        for index, row in enumerate(group_rows):
            new_row = row.copy()
            value = parser.get_field_value(row, new_group_key)

            # Extend the groups list
            new_row["_groups"] = row.get("_groups", []).copy()
            new_row["_groups"].append({
                "field": new_group_key,
                "value": value
            })

            new_row["_group_size"] = group_size
            new_row["_group_index"] = index
            result.append(new_row)

    return result

groupby_agg(data, group_key, agg_spec)

Group and aggregate in one operation.

This function is kept for backward compatibility and for the --agg flag. It's more efficient for simple cases but less flexible than chaining.

Parameters:

Name Type Description Default
data Relation

List of dictionaries to group and aggregate

required
group_key str

Field to group by

required
agg_spec Union[str, List[Tuple[str, str]]]

Aggregation specification

required

Returns:

Type Description
Relation

List of aggregated results, one per group

Source code in ja/group.py
def groupby_agg(data: Relation, group_key: str, agg_spec: Union[str, List[Tuple[str, str]]]) -> Relation:
    """Group and aggregate in one operation.

    This function is kept for backward compatibility and for the --agg flag.
    It's more efficient for simple cases but less flexible than chaining.

    Args:
        data: List of dictionaries to group and aggregate
        group_key: Field to group by
        agg_spec: Aggregation specification

    Returns:
        List of aggregated results, one per group
    """
    parser = ExprEval()

    # Group data
    groups = defaultdict(list)
    for row in data:
        key = parser.get_field_value(row, group_key)
        groups[key].append(row)

    # Apply aggregations
    result = []

    # Handle both string and list inputs for backward compatibility
    if isinstance(agg_spec, str):
        agg_specs = parse_agg_specs(agg_spec)
    else:
        # Convert old list format to new format
        supported_funcs = ["count", "sum", "avg", "min", "max", "first", "last", "list"]
        agg_specs = []
        for name, field in agg_spec:
            if name == "count":
                agg_specs.append(("count", "count"))
            elif name in ["sum", "avg", "min", "max", "first", "last", "list"]:
                agg_specs.append((f"{name}_{field}", f"{name}({field})"))
            else:
                raise ValueError(f"Unknown aggregation function: '{name}'. "
                               f"Supported functions: {', '.join(sorted(supported_funcs))}")

    for key, group_rows in groups.items():
        row_result = {group_key: key}
        for spec in agg_specs:
            row_result.update(apply_single_agg(spec, group_rows))
        result.append(row_result)

    return result

ja.agg

Aggregation engine for JSONL algebra operations.

This module provides all aggregation functionality including parsing aggregation specifications, applying aggregations to data, and all built-in aggregation functions (sum, avg, min, max, etc.).

Classes

Functions

parse_agg_specs(agg_spec)

Parse aggregation specification string.

Parameters:

Name Type Description Default
agg_spec str

Aggregation specification (e.g., "count, avg_age=avg(age)")

required

Returns:

Type Description
List[Tuple[str, str]]

List of (name, expression) tuples

Source code in ja/agg.py
def parse_agg_specs(agg_spec: str) -> List[Tuple[str, str]]:
    """Parse aggregation specification string.

    Args:
        agg_spec: Aggregation specification (e.g., "count, avg_age=avg(age)")

    Returns:
        List of (name, expression) tuples
    """
    specs = []
    for part in agg_spec.split(","):
        part = part.strip()
        if "=" in part:
            # Named aggregation: avg_age=avg(age)
            name, expr = part.split("=", 1)
            specs.append((name.strip(), expr.strip()))
        else:
            # Simple aggregation: count
            specs.append((part, part))
    return specs

apply_single_agg(spec, data)

Apply a single aggregation to data.

Parameters:

Name Type Description Default
spec Tuple[str, str]

(name, expression) tuple

required
data Relation

List of dictionaries

required

Returns:

Type Description
Dict[str, Any]

Dictionary with aggregation result

Source code in ja/agg.py
def apply_single_agg(spec: Tuple[str, str], data: Relation) -> Dict[str, Any]:
    """Apply a single aggregation to data.

    Args:
        spec: (name, expression) tuple
        data: List of dictionaries

    Returns:
        Dictionary with aggregation result
    """
    name, expr = spec
    parser = ExprEval()

    # Parse the aggregation expression
    if "(" in expr and expr.endswith(")"):
        func_name = expr[:expr.index("(")]
        field_expr = expr[expr.index("(") + 1:-1].strip()
    else:
        func_name = expr
        field_expr = ""

    # Handle conditional aggregations
    if "_if" in func_name:
        # e.g., count_if(status == active) or sum_if(amount, status == paid)
        base_func = func_name.replace("_if", "")

        if "," in field_expr:
            # sum_if(amount, status == paid)
            field, condition = field_expr.split(",", 1)
            field = field.strip()
            condition = condition.strip()

            # Filter data based on condition
            filtered_data = [row for row in data if parser.evaluate(condition, row)]

            # Apply base aggregation to filtered data
            if base_func == "sum":
                values = [parser.get_field_value(row, field) for row in filtered_data]
                return {name: sum(v for v in values if v is not None)}
            elif base_func == "avg":
                values = [parser.get_field_value(row, field) for row in filtered_data]
                values = [v for v in values if v is not None]
                return {name: sum(values) / len(values) if values else 0}
            elif base_func == "count":
                return {name: len(filtered_data)}
        else:
            # count_if(status == active)
            filtered_data = [row for row in data if parser.evaluate(field_expr, row)]
            return {name: len(filtered_data)}

    # Regular aggregations
    if func_name == "count":
        return {name: len(data)}

    elif func_name in AGGREGATION_FUNCTIONS:
        if func_name in ["first", "last"]:
            # Special handling for first/last
            if not data:
                return {name: None}
            row = data[0] if func_name == "first" else data[-1]
            value = parser.get_field_value(row, field_expr) if field_expr else row
            return {name: value}
        else:
            # Collect values for aggregation
            values = []
            for row in data:
                if field_expr:
                    # Try arithmetic evaluation first
                    val = parser.evaluate_arithmetic(field_expr, row)
                    if val is None:
                        val = parser.get_field_value(row, field_expr)
                else:
                    val = row
                if val is not None:
                    values.append(val)

            # Apply aggregation function
            result = AGGREGATION_FUNCTIONS[func_name](values)
            return {name: result}

    # Unknown aggregation function
    known_funcs = ["count"] + list(AGGREGATION_FUNCTIONS.keys())
    raise ValueError(f"Unknown aggregation function: '{func_name}'. "
                    f"Supported functions: {', '.join(sorted(known_funcs))}")

aggregate_single_group(data, agg_spec)

Aggregate ungrouped data as a single group.

Parameters:

Name Type Description Default
data Relation

List of dictionaries

required
agg_spec str

Aggregation specification

required

Returns:

Type Description
Dict[str, Any]

Dictionary with aggregation results

Source code in ja/agg.py
def aggregate_single_group(data: Relation, agg_spec: str) -> Dict[str, Any]:
    """Aggregate ungrouped data as a single group.

    Args:
        data: List of dictionaries
        agg_spec: Aggregation specification

    Returns:
        Dictionary with aggregation results
    """
    agg_specs = parse_agg_specs(agg_spec)
    result = {}

    for spec in agg_specs:
        result.update(apply_single_agg(spec, data))

    return result

aggregate_grouped_data(grouped_data, agg_spec)

Aggregate data that has group metadata.

Parameters:

Name Type Description Default
grouped_data Relation

Data with group metadata

required
agg_spec str

Aggregation specification

required

Returns:

Type Description
Relation

List of aggregated results

Source code in ja/agg.py
def aggregate_grouped_data(grouped_data: Relation, agg_spec: str) -> Relation:
    """Aggregate data that has group metadata.

    Args:
        grouped_data: Data with group metadata
        agg_spec: Aggregation specification

    Returns:
        List of aggregated results
    """
    from collections import defaultdict

    # Group by the combination of all grouping fields
    groups = defaultdict(list)
    group_keys = {}

    for row in grouped_data:
        # Use the _groups list to create a grouping key
        groups_list = row.get("_groups", [])

        # Create a tuple key for internal grouping
        group_tuple = tuple((g["field"], g["value"]) for g in groups_list)

        # Store the groups for this tuple
        if group_tuple not in group_keys:
            group_keys[group_tuple] = groups_list

        # Remove metadata for aggregation
        clean_row = {k: v for k, v in row.items() if not k.startswith("_group")}
        groups[group_tuple].append(clean_row)

    # Apply aggregations
    result = []

    for group_tuple, group_rows in groups.items():
        # Start with all grouping fields
        agg_result = {}

        # Add all grouping fields from the metadata
        for group_info in group_keys[group_tuple]:
            agg_result[group_info["field"]] = group_info["value"]

        # Parse and apply aggregations
        agg_specs = parse_agg_specs(agg_spec)
        for spec in agg_specs:
            agg_result.update(apply_single_agg(spec, group_rows))

        result.append(agg_result)

    return result

ja.export

Utilities for exporting JSONL data to other formats.

This module provides a collection of functions for converting JSONL data into various other formats. It powers the ja export command group, enabling transformations like converting JSONL to a standard JSON array or "exploding" a JSONL file into a directory of individual JSON files.

Functions

jsonl_to_json_array_string(jsonl_input_stream)

Read JSONL from a stream and return a JSON array string.

Parameters:

Name Type Description Default
jsonl_input_stream

Input stream containing JSONL data.

required

Returns:

Type Description
str

A JSON array string containing all records.

Source code in ja/export.py
def jsonl_to_json_array_string(jsonl_input_stream) -> str:
    """Read JSONL from a stream and return a JSON array string.

    Args:
        jsonl_input_stream: Input stream containing JSONL data.

    Returns:
        A JSON array string containing all records.
    """
    records = []
    for line in jsonl_input_stream:
        try:
            records.append(json.loads(line))
        except json.JSONDecodeError as e:
            print(
                f"Skipping invalid JSON line: {line.strip()} - Error: {e}",
                file=sys.stderr,
            )
            continue
    return json.dumps(records, indent=2)

json_array_to_jsonl_lines(json_array_input_stream)

Read a JSON array from a stream and yield each element as a JSONL line.

Parameters:

Name Type Description Default
json_array_input_stream

Input stream containing a JSON array.

required

Yields:

Type Description

JSON strings representing each array element.

Raises:

Type Description
ValueError

If the input is not a valid JSON array.

Source code in ja/export.py
def json_array_to_jsonl_lines(json_array_input_stream):
    """Read a JSON array from a stream and yield each element as a JSONL line.

    Args:
        json_array_input_stream: Input stream containing a JSON array.

    Yields:
        JSON strings representing each array element.

    Raises:
        ValueError: If the input is not a valid JSON array.
    """
    try:
        json_string = "".join(json_array_input_stream)
        data = json.loads(json_string)
        if not isinstance(data, list):
            raise ValueError("Input is not a JSON array.")
        for record in data:
            yield json.dumps(record)
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON array input: {e}")
    except ValueError as e:
        raise e

jsonl_to_dir(jsonl_input_stream, output_dir_path_str, input_filename_stem='data')

Exports JSONL lines to individual JSON files in a directory. The output directory is named after input_filename_stem if output_dir_path_str is not specific. Files are named item-.json.

Source code in ja/export.py
def jsonl_to_dir(
    jsonl_input_stream, output_dir_path_str: str, input_filename_stem: str = "data"
):
    """
    Exports JSONL lines to individual JSON files in a directory.
    The output directory is named after input_filename_stem if output_dir_path_str is not specific.
    Files are named item-<index>.json.
    """
    output_dir = pathlib.Path(output_dir_path_str)

    # If output_dir_path_str was just a name (not a path), it might be used as the stem.
    # If it's a directory, we use the provided input_filename_stem for the sub-directory.
    if (
        output_dir.is_dir() and not output_dir.exists()
    ):  # A path like "output/my_data" where "output" exists
        # This case is tricky. Let's assume output_dir_path_str is the target directory.
        pass
    elif not output_dir.name.endswith((".jsonl", ".json")) and not output_dir.exists():
        # Treat as a new directory to be created directly
        pass
    else:  # Default behavior: create a subdirectory based on the input stem
        output_dir = output_dir / input_filename_stem

    output_dir.mkdir(parents=True, exist_ok=True)

    count = 0
    for i, line in enumerate(jsonl_input_stream):
        try:
            record = json.loads(line)
            file_path = output_dir / f"item-{i}.json"
            with open(file_path, "w") as f:
                json.dump(record, f, indent=2)
            count += 1
        except json.JSONDecodeError as e:
            print(
                f"Skipping invalid JSON line during export: {line.strip()} - Error: {e}",
                file=sys.stderr,
            )
            continue
    print(f"Exported {count} items to {output_dir.resolve()}", file=sys.stderr)

dir_to_jsonl(input_dir_path_str, add_filename_key=None, recursive=False)

Converts JSON files in a directory to JSONL lines. Files are sorted by 'item-.json' pattern if applicable, otherwise lexicographically. Optionally adds filename as a key to each JSON object.

Source code in ja/export.py
def dir_to_jsonl(
    input_dir_path_str: str, add_filename_key: str = None, recursive: bool = False
):
    """
    Converts JSON files in a directory to JSONL lines.
    Files are sorted by 'item-<index>.json' pattern if applicable, otherwise lexicographically.
    Optionally adds filename as a key to each JSON object.
    """
    input_dir = pathlib.Path(input_dir_path_str)
    if not input_dir.is_dir():
        raise ValueError(f"Input path is not a directory: {input_dir_path_str}")

    json_files_paths = []
    if recursive:
        for root, _, files in os.walk(input_dir):
            for file in files:
                if file.lower().endswith(".json"):
                    json_files_paths.append(pathlib.Path(root) / file)
    else:
        for item in input_dir.iterdir():
            if item.is_file() and item.name.lower().endswith(".json"):
                json_files_paths.append(item)

    sorted_file_paths = _sort_files_for_implode(json_files_paths)

    for file_path in sorted_file_paths:
        try:
            with open(file_path, "r") as f:
                data = json.load(f)

            if add_filename_key:
                # Use relative path from the input_dir to keep it cleaner
                relative_filename = str(file_path.relative_to(input_dir))
                actual_key = _ensure_unique_key(data, add_filename_key)
                data[actual_key] = relative_filename

            yield json.dumps(data)
        except json.JSONDecodeError as e:
            print(
                f"Skipping invalid JSON file: {file_path} - Error: {e}", file=sys.stderr
            )
            continue
        except Exception as e:
            print(f"Error processing file {file_path}: {e}", file=sys.stderr)
            continue

ja.exporter

Export your JSONL data to other popular formats like CSV.

This module is your gateway to the wider data ecosystem. It provides powerful and flexible tools to convert your JSONL data into formats that are easy to use with spreadsheets, traditional databases, or other data analysis tools.

The key feature is its intelligent handling of nested JSON, which can be "flattened" into separate columns, making complex data accessible in a simple CSV format.

Functions

jsonl_to_csv_stream(jsonl_stream, output_stream, flatten=True, flatten_sep='.', column_functions=None)

Convert a stream of JSONL data into a CSV stream.

This is a highly flexible function for exporting your data. It reads JSONL records, intelligently discovers all possible headers (even if they vary between lines), and writes to a CSV format.

It shines when dealing with nested data. By default, it will flatten structures like {"user": {"name": "X"}} into a user.name column. You can also provide custom functions to transform data on the fly.

Parameters:

Name Type Description Default
jsonl_stream

An input stream (like a file handle) yielding JSONL strings.

required
output_stream

An output stream (like sys.stdout or a file handle) where the CSV data will be written.

required
flatten bool

If True, nested dictionaries are flattened into columns with dot-separated keys. Defaults to True.

True
flatten_sep str

The separator to use when flattening keys. Defaults to ".".

'.'
column_functions dict

A dictionary mapping column names to functions that will be applied to that column's data before writing to CSV. For example, {"price": float}.

None
Source code in ja/exporter.py
def jsonl_to_csv_stream(
    jsonl_stream,
    output_stream,
    flatten: bool = True,
    flatten_sep: str = ".",
    column_functions: dict = None,
):
    """Convert a stream of JSONL data into a CSV stream.

    This is a highly flexible function for exporting your data. It reads JSONL
    records, intelligently discovers all possible headers (even if they vary
    between lines), and writes to a CSV format.

    It shines when dealing with nested data. By default, it will flatten
    structures like `{"user": {"name": "X"}}` into a `user.name` column.
    You can also provide custom functions to transform data on the fly.

    Args:
        jsonl_stream: An input stream (like a file handle) yielding JSONL strings.
        output_stream: An output stream (like `sys.stdout` or a file handle)
                       where the CSV data will be written.
        flatten (bool): If `True`, nested dictionaries are flattened into columns
                        with dot-separated keys. Defaults to `True`.
        flatten_sep (str): The separator to use when flattening keys.
                           Defaults to ".".
        column_functions (dict): A dictionary mapping column names to functions
                                 that will be applied to that column's data
                                 before writing to CSV. For example,
                                 `{"price": float}`.
    """
    if column_functions is None:
        column_functions = {}

    # First pass: Discover all possible headers from the entire stream
    records = [json.loads(line) for line in jsonl_stream if line.strip()]
    if not records:
        return

    # Apply column functions before flattening
    for rec in records:
        for col, func in column_functions.items():
            if col in rec:
                try:
                    rec[col] = func(rec[col])
                except Exception as e:
                    # Optionally, log this error or handle it as needed
                    print(
                        f"Error applying function to column '{col}' for a record: {e}",
                        file=sys.stderr,
                    )

    if flatten:
        processed_records = [(_flatten_dict(rec, sep=flatten_sep)) for rec in records]
    else:
        processed_records = []
        for rec in records:
            processed_rec = {}
            for k, v in rec.items():
                if isinstance(v, (dict, list)):
                    processed_rec[k] = json.dumps(v)
                else:
                    processed_rec[k] = v
            processed_records.append(processed_rec)

    # Discover all unique keys to form the CSV header
    headers = []
    header_set = set()
    for rec in processed_records:
        for key in rec.keys():
            if key not in header_set:
                header_set.add(key)
                headers.append(key)

    # Second pass: Write to the output stream
    writer = csv.DictWriter(output_stream, fieldnames=headers, lineterminator="\n")
    writer.writeheader()
    writer.writerows(processed_records)

ja.importer

Import data from other formats like CSV into the world of JSONL.

This module is the bridge that brings your existing data into the JSONL Algebra ecosystem. It provides a collection of powerful functions for converting various data formats—such as CSV or directories of individual JSON files—into the clean, line-oriented JSONL format that ja is built to handle.

Functions

dir_to_jsonl_lines(dir_path)

Stream a directory of .json or .jsonl files as a single JSONL stream.

A handy utility for consolidating data. It reads all files ending in .json or .jsonl from a specified directory and yields each JSON object as a separate line. This is perfect for preparing a dataset that has been stored as many small files.

  • For .json files, the entire file is treated as a single JSON object.
  • For .jsonl files, each line is treated as a separate JSON object.

Parameters:

Name Type Description Default
dir_path str

The path to the directory to read.

required

Yields:

Type Description

A string for each JSON object found, ready for processing.

Source code in ja/importer.py
def dir_to_jsonl_lines(dir_path):
    """Stream a directory of .json or .jsonl files as a single JSONL stream.

    A handy utility for consolidating data. It reads all files ending in `.json`
    or `.jsonl` from a specified directory and yields each JSON object as a
    separate line. This is perfect for preparing a dataset that has been
    stored as many small files.

    - For `.json` files, the entire file is treated as a single JSON object.
    - For `.jsonl` files, each line is treated as a separate JSON object.

    Args:
        dir_path (str): The path to the directory to read.

    Yields:
        A string for each JSON object found, ready for processing.
    """
    for filename in sorted(os.listdir(dir_path)):
        file_path = os.path.join(dir_path, filename)
        if filename.endswith(".json"):
            try:
                with open(file_path, "r") as f:
                    yield f.read().strip()
            except (IOError, json.JSONDecodeError) as e:
                print(f"Error reading or parsing {file_path}: {e}", file=sys.stderr)
        elif filename.endswith(".jsonl"):
            try:
                with open(file_path, "r") as f:
                    for line in f:
                        yield line.strip()
            except IOError as e:
                print(f"Error reading {file_path}: {e}", file=sys.stderr)

csv_to_jsonl_lines(csv_input_stream, has_header, infer_types=False)

Convert a stream of CSV data into a stream of JSONL lines.

This function reads CSV data and transforms each row into a JSON object. It can automatically handle headers to use as keys and can even infer the data types of your values, converting them from strings to numbers or booleans where appropriate.

Parameters:

Name Type Description Default
csv_input_stream

An input stream (like a file handle) containing CSV data.

required
has_header bool

Set to True if the first row of the CSV is a header that should be used for JSON keys.

required
infer_types bool

If True, automatically convert values to int, float, bool, or None. Defaults to False.

False

Yields:

Type Description

A JSON-formatted string for each row in the CSV data.

Source code in ja/importer.py
def csv_to_jsonl_lines(csv_input_stream, has_header: bool, infer_types: bool = False):
    """Convert a stream of CSV data into a stream of JSONL lines.

    This function reads CSV data and transforms each row into a JSON object.
    It can automatically handle headers to use as keys and can even infer the
    data types of your values, converting them from strings to numbers or
    booleans where appropriate.

    Args:
        csv_input_stream: An input stream (like a file handle) containing CSV data.
        has_header (bool): Set to `True` if the first row of the CSV is a header
                           that should be used for JSON keys.
        infer_types (bool): If `True`, automatically convert values to `int`,
                            `float`, `bool`, or `None`. Defaults to `False`.

    Yields:
        A JSON-formatted string for each row in the CSV data.
    """

    def process_row(row):
        if not infer_types:
            return row
        return {k: _infer_value(v) for k, v in row.items()}

    if has_header:
        # Use DictReader which handles headers automatically
        reader = csv.DictReader(csv_input_stream)
        for row in reader:
            yield json.dumps(process_row(row))
    else:
        # Use the standard reader and manually create dictionaries
        reader = csv.reader(csv_input_stream)
        headers = []
        try:
            first_row = next(reader)
            # Generate headers based on the number of columns in the first row
            headers = [f"col_{i}" for i in range(len(first_row))]
            # Yield the first row which we've already consumed
            row_dict = dict(zip(headers, first_row))
            yield json.dumps(process_row(row_dict))
        except StopIteration:
            return  # Handle empty file

        # Yield the rest of the rows
        for row in reader:
            row_dict = dict(zip(headers, row))
            yield json.dumps(process_row(row_dict))

ja.schema

Discover the structure of your data automatically.

This module provides powerful tools to infer a JSON Schema from your JSONL files. A schema acts as a blueprint for your data, describing its fields, types, and which fields are required. This is incredibly useful for validation, documentation, and ensuring data quality.

Functions

get_json_type(value)

Determine the appropriate JSON Schema type for a given Python value.

Maps Python types to their corresponding JSON Schema type names.

Parameters:

Name Type Description Default
value

Any Python value.

required

Returns:

Type Description

The JSON Schema type name as a string.

Example

get_json_type("hello") 'string' get_json_type(42) 'integer'

Source code in ja/schema.py
def get_json_type(value):
    """Determine the appropriate JSON Schema type for a given Python value.

    Maps Python types to their corresponding JSON Schema type names.

    Args:
        value: Any Python value.

    Returns:
        The JSON Schema type name as a string.

    Example:
        >>> get_json_type("hello")
        'string'
        >>> get_json_type(42)
        'integer'
    """
    if isinstance(value, str):
        return "string"
    if isinstance(value, bool):
        return "boolean"
    if isinstance(value, int):
        return "integer"
    if isinstance(value, float):
        return "number"
    if value is None:
        return "null"
    if isinstance(value, list):
        return "array"
    if isinstance(value, dict):
        return "object"
    return "unknown"

merge_schemas(s1, s2)

Intelligently merge two JSON schemas into one.

This is the secret sauce that allows schema inference to work across many different JSON objects, even if they have different fields or types. It handles type unions (e.g., a field that is sometimes a string, sometimes an integer) and recursively merges nested object properties and array item schemas.

Parameters:

Name Type Description Default
s1

First JSON schema dictionary or None.

required
s2

Second JSON schema dictionary or None.

required

Returns:

Type Description

A merged schema dictionary combining both inputs.

Example

s1 = {"type": "string"} s2 = {"type": "integer"} merge_schemas(s1, s2)

Source code in ja/schema.py
def merge_schemas(s1, s2):
    """Intelligently merge two JSON schemas into one.

    This is the secret sauce that allows schema inference to work across many
    different JSON objects, even if they have different fields or types. It handles
    type unions (e.g., a field that is sometimes a string, sometimes an integer)
    and recursively merges nested object properties and array item schemas.

    Args:
        s1: First JSON schema dictionary or None.
        s2: Second JSON schema dictionary or None.

    Returns:
        A merged schema dictionary combining both inputs.

    Example:
        >>> s1 = {"type": "string"}
        >>> s2 = {"type": "integer"}
        >>> merge_schemas(s1, s2)
        {'type': ['integer', 'string']}
    """
    if s1 is None:
        return s2
    if s2 is None:
        return s1
    if s1 == s2:
        return s1

    # Merge types
    type1 = s1.get("type", [])
    if not isinstance(type1, list):
        type1 = [type1]
    type2 = s2.get("type", [])
    if not isinstance(type2, list):
        type2 = [type2]

    merged_types = sorted(list(set(type1) | set(type2)))
    if "integer" in merged_types and "number" in merged_types:
        merged_types.remove("integer")

    merged_schema = {}
    if len(merged_types) == 1:
        merged_schema["type"] = merged_types[0]
    else:
        merged_schema["type"] = merged_types

    # If both schemas could be objects, merge properties
    if "object" in type1 and "object" in type2:
        props1 = s1.get("properties", {})
        props2 = s2.get("properties", {})
        all_keys = set(props1.keys()) | set(props2.keys())
        merged_props = {
            key: merge_schemas(props1.get(key), props2.get(key)) for key in all_keys
        }
        if merged_props:
            merged_schema["properties"] = merged_props

    # If both schemas could be arrays, merge items
    if "array" in type1 and "array" in type2:
        items1 = s1.get("items")
        items2 = s2.get("items")
        merged_items = merge_schemas(items1, items2)
        if merged_items:
            merged_schema["items"] = merged_items

    return merged_schema

infer_value_schema(value)

Infer a JSON Schema for a single Python value.

Creates a schema that describes the structure and type of the given value, handling nested objects and arrays recursively.

Parameters:

Name Type Description Default
value

Any JSON-serializable Python value.

required

Returns:

Type Description

A JSON schema dictionary describing the value.

Example

infer_value_schema({"name": "Alice", "age": 30}) {'type': 'object', 'properties': {'name': {'type': 'string'}, 'age': {'type': 'integer'}}}

Source code in ja/schema.py
def infer_value_schema(value):
    """Infer a JSON Schema for a single Python value.

    Creates a schema that describes the structure and type of the given value,
    handling nested objects and arrays recursively.

    Args:
        value: Any JSON-serializable Python value.

    Returns:
        A JSON schema dictionary describing the value.

    Example:
        >>> infer_value_schema({"name": "Alice", "age": 30})
        {'type': 'object', 'properties': {'name': {'type': 'string'}, 'age': {'type': 'integer'}}}
    """
    type_name = get_json_type(value)
    schema = {"type": type_name}
    if type_name == "object":
        schema["properties"] = {k: infer_value_schema(v) for k, v in value.items()}
    elif type_name == "array":
        if value:
            item_schema = None
            for item in value:
                item_schema = merge_schemas(item_schema, infer_value_schema(item))
            if item_schema:
                schema["items"] = item_schema
    return schema

add_required_fields(schema, data_samples)

Refine a schema by identifying which fields are always present.

This function analyzes a list of data samples and updates the schema to mark fields as 'required' if they appear in every single sample. This process is applied recursively to nested objects, making the resulting schema more precise.

Parameters:

Name Type Description Default
schema

The schema dictionary to modify in place.

required
data_samples

A list of data samples to analyze for required fields.

required
Example

If all samples in data_samples have 'name' and 'age' fields, this function adds {"required": ["age", "name"]} to the schema.

Source code in ja/schema.py
def add_required_fields(schema, data_samples):
    """Refine a schema by identifying which fields are always present.

    This function analyzes a list of data samples and updates the schema to mark
    fields as 'required' if they appear in every single sample. This process is
    applied recursively to nested objects, making the resulting schema more precise.

    Args:
        schema: The schema dictionary to modify in place.
        data_samples: A list of data samples to analyze for required fields.

    Example:
        If all samples in `data_samples` have 'name' and 'age' fields, this
        function adds `{"required": ["age", "name"]}` to the schema.
    """
    if schema.get("type") == "object" and "properties" in schema:
        # For object schemas, find fields present in all samples
        dict_samples = [s for s in data_samples if isinstance(s, dict)]
        if dict_samples:
            required_keys = set(dict_samples[0].keys())
            for sample in dict_samples[1:]:
                required_keys.intersection_update(sample.keys())
            if required_keys:
                schema["required"] = sorted(list(required_keys))

        # Recursively add required fields to nested object properties
        for prop_name, prop_schema in schema["properties"].items():
            prop_samples = [s.get(prop_name) for s in dict_samples if prop_name in s]
            if prop_samples:
                add_required_fields(prop_schema, prop_samples)

    elif schema.get("type") == "array" and "items" in schema:
        # For array schemas, collect all array items and add required fields
        array_items = []
        for sample in data_samples:
            if isinstance(sample, list):
                array_items.extend(sample)
        if array_items:
            add_required_fields(schema["items"], array_items)

infer_schema(data)

Infer a complete JSON schema from a collection of data records.

This is the main entry point for schema inference. Give it an iterable of JSON objects (like a list of dictionaries), and it will return a complete JSON Schema that describes the entire dataset. It automatically handles varying fields, mixed types, nested structures, and identifies required fields.

Parameters:

Name Type Description Default
data

An iterable of data records (typically dictionaries).

required

Returns:

Type Description

A JSON schema dictionary with $schema, type, properties, and required fields.

Example

data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}] schema = infer_schema(data) schema["properties"]["name"] {'type': 'string'} schema["required"]['age', 'name']

Source code in ja/schema.py
def infer_schema(data):
    """Infer a complete JSON schema from a collection of data records.

    This is the main entry point for schema inference. Give it an iterable of
    JSON objects (like a list of dictionaries), and it will return a complete
    JSON Schema that describes the entire dataset. It automatically handles
    varying fields, mixed types, nested structures, and identifies required fields.

    Args:
        data: An iterable of data records (typically dictionaries).

    Returns:
        A JSON schema dictionary with `$schema`, type, properties, and required fields.

    Example:
        >>> data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
        >>> schema = infer_schema(data)
        >>> schema["properties"]["name"]
        {'type': 'string'}
        >>> schema["required"]
        ['age', 'name']
    """
    records = list(data)
    if not records:
        return {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "properties": {},
        }

    # Infer schema for each record
    inferred_schemas = [infer_value_schema(rec) for rec in records]

    # Merge all inferred schemas
    merged_schema = None
    for s in inferred_schemas:
        merged_schema = merge_schemas(merged_schema, s)

    # Add required fields recursively
    if merged_schema:
        add_required_fields(merged_schema, records)

    # Add the meta-schema URL
    final_schema = {"$schema": "http://json-schema.org/draft-07/schema#"}
    if merged_schema:
        final_schema.update(merged_schema)

    return final_schema

ja.expr

Expression parser for ja commands.

This module provides a lightweight expression parser that allows intuitive syntax without quotes for most common cases.

Classes

ExprEval

Parse and evaluate expressions for filtering, comparison, and arithmetic.

Source code in ja/expr.py
class ExprEval:
    """Parse and evaluate expressions for filtering, comparison, and arithmetic."""

    def __init__(self):
        # Operators in precedence order (longest first to handle >= before >)
        self.operators = [
            ("==", operator.eq),
            ("!=", operator.ne),
            (">=", operator.ge),
            ("<=", operator.le),
            (">", operator.gt),
            ("<", operator.lt),
        ]

    def parse_value(self, value_str: str) -> Any:
        """Parse a value string into appropriate Python type.

        Examples:
            "123" -> 123
            "12.5" -> 12.5
            "true" -> True
            "false" -> False
            "null" -> None
            "active" -> "active" (string)
        """
        value_str = value_str.strip()

        # Empty string
        if not value_str:
            return ""

        # Boolean literals (case-insensitive)
        if value_str.lower() == "true":
            return True
        if value_str.lower() == "false":
            return False

        # Null literal
        if value_str.lower() in ("null", "none"):
            return None

        # Numbers
        try:
            if "." in value_str:
                return float(value_str)
            return int(value_str)
        except ValueError:
            pass

        # Quoted strings (remove quotes)
        if (value_str.startswith('"') and value_str.endswith('"')) or (
            value_str.startswith("'") and value_str.endswith("'")
        ):
            return value_str[1:-1]

        # Unquoted strings (the nice default!)
        return value_str

    def get_field_value(self, obj: Dict[str, Any], field_path: str) -> Any:
        """Get value from nested object using dot notation.

        Examples:
            get_field_value({"user": {"name": "Alice"}}, "user.name") -> "Alice"
            get_field_value({"items": [{"id": 1}]}, "items[0].id") -> 1
        """
        if not field_path:
            return obj

        current = obj

        # Handle array indexing and dots
        parts = re.split(r"\.|\[|\]", field_path)
        parts = [p for p in parts if p]  # Remove empty strings

        for part in parts:
            if current is None:
                return None

            # Try as dict key
            if isinstance(current, dict):
                current = current.get(part)
            # Try as array index
            elif isinstance(current, list):
                try:
                    idx = int(part)
                    current = current[idx] if 0 <= idx < len(current) else None
                except (ValueError, IndexError):
                    return None
            else:
                return None

        return current

    def set_field_value(self, obj: Dict[str, Any], field_path: str, value: Any) -> None:
        """Set value in nested object using dot notation."""
        if not field_path:
            return

        parts = field_path.split(".")
        current = obj

        # Navigate to the parent of the target field
        for part in parts[:-1]:
            if part not in current:
                current[part] = {}
            current = current[part]

        # Set the value
        current[parts[-1]] = value

    def evaluate_comparison(self, left: Any, op_str: str, right: Any) -> bool:
        """Evaluate a comparison operation."""
        op_func = None
        for op, func in self.operators:
            if op == op_str:
                op_func = func
                break

        if op_func is None:
            raise ValueError(f"Unknown operator: {op_str}")

        # Special handling for null comparisons
        if left is None or right is None:
            if op_str == "==":
                return left == right
            elif op_str == "!=":
                return left != right
            else:
                return False

        # Type coercion for comparison
        try:
            return op_func(left, right)
        except (TypeError, ValueError):
            # If comparison fails, try string comparison
            try:
                return op_func(str(left), str(right))
            except:
                return False

    def evaluate(self, expr: str, context: Dict[str, Any]) -> bool:
        """Parse and evaluate an expression.

        Examples:
            "status == active"
            "age > 30"
            "user.type == premium"
        """
        expr = expr.strip()

        # Empty expression is false
        if not expr:
            return False

        # Check for operators
        for op_str, op_func in self.operators:
            if op_str in expr:
                # Split on the FIRST occurrence of the operator
                parts = expr.split(op_str, 1)
                if len(parts) == 2:
                    left_expr = parts[0].strip()
                    right_expr = parts[1].strip()

                    # Left side is always a field path
                    left_val = self.get_field_value(context, left_expr)

                    # Right side: check if it's a field or a literal
                    # A token is a field if it exists as a key in the context
                    # and is not a boolean/null keyword.
                    if right_expr in context and right_expr.lower() not in ['true', 'false', 'null', 'none']:
                        # It's a field reference
                        right_val = self.get_field_value(context, right_expr)
                    else:
                        # It's a literal value
                        right_val = self.parse_value(right_expr)

                    return self.evaluate_comparison(left_val, op_str, right_val)

        # No operator found - treat as existence/truthiness check
        value = self.get_field_value(context, expr)
        return bool(value)

    def evaluate_arithmetic(
        self, expr: str, context: Dict[str, Any]
    ) -> Optional[float]:
        """Evaluate simple arithmetic expressions.

        Examples:
            "amount * 1.1"
            "score + bonus"
        """
        # Simple arithmetic support
        for op, func in [
            ("*", operator.mul),
            ("+", operator.add),
            ("-", operator.sub),
            ("/", operator.truediv),
        ]:
            if op in expr:
                parts = expr.split(op, 1)
                if len(parts) == 2:
                    left_str = parts[0].strip()
                    right_str = parts[1].strip()

                    # Get left value (field or literal)
                    left_val = self.get_field_value(context, left_str)
                    if left_val is None:
                        left_val = self.parse_value(left_str)

                    # Get right value (field or literal)
                    right_val = self.get_field_value(context, right_str)
                    if right_val is None:
                        right_val = self.parse_value(right_str)

                    try:
                        return func(float(left_val), float(right_val))
                    except (TypeError, ValueError):
                        return None

        # No operator - try as field or literal
        val = self.get_field_value(context, expr)
        if val is None:
            val = self.parse_value(expr)

        try:
            return float(val)
        except (TypeError, ValueError):
            return None
Functions
parse_value(value_str)

Parse a value string into appropriate Python type.

Examples:

"123" -> 123 "12.5" -> 12.5 "true" -> True "false" -> False "null" -> None "active" -> "active" (string)

Source code in ja/expr.py
def parse_value(self, value_str: str) -> Any:
    """Parse a value string into appropriate Python type.

    Examples:
        "123" -> 123
        "12.5" -> 12.5
        "true" -> True
        "false" -> False
        "null" -> None
        "active" -> "active" (string)
    """
    value_str = value_str.strip()

    # Empty string
    if not value_str:
        return ""

    # Boolean literals (case-insensitive)
    if value_str.lower() == "true":
        return True
    if value_str.lower() == "false":
        return False

    # Null literal
    if value_str.lower() in ("null", "none"):
        return None

    # Numbers
    try:
        if "." in value_str:
            return float(value_str)
        return int(value_str)
    except ValueError:
        pass

    # Quoted strings (remove quotes)
    if (value_str.startswith('"') and value_str.endswith('"')) or (
        value_str.startswith("'") and value_str.endswith("'")
    ):
        return value_str[1:-1]

    # Unquoted strings (the nice default!)
    return value_str
get_field_value(obj, field_path)

Get value from nested object using dot notation.

Examples:

get_field_value({"user": {"name": "Alice"}}, "user.name") -> "Alice" get_field_value({"items": [{"id": 1}]}, "items[0].id") -> 1

Source code in ja/expr.py
def get_field_value(self, obj: Dict[str, Any], field_path: str) -> Any:
    """Get value from nested object using dot notation.

    Examples:
        get_field_value({"user": {"name": "Alice"}}, "user.name") -> "Alice"
        get_field_value({"items": [{"id": 1}]}, "items[0].id") -> 1
    """
    if not field_path:
        return obj

    current = obj

    # Handle array indexing and dots
    parts = re.split(r"\.|\[|\]", field_path)
    parts = [p for p in parts if p]  # Remove empty strings

    for part in parts:
        if current is None:
            return None

        # Try as dict key
        if isinstance(current, dict):
            current = current.get(part)
        # Try as array index
        elif isinstance(current, list):
            try:
                idx = int(part)
                current = current[idx] if 0 <= idx < len(current) else None
            except (ValueError, IndexError):
                return None
        else:
            return None

    return current
set_field_value(obj, field_path, value)

Set value in nested object using dot notation.

Source code in ja/expr.py
def set_field_value(self, obj: Dict[str, Any], field_path: str, value: Any) -> None:
    """Set value in nested object using dot notation."""
    if not field_path:
        return

    parts = field_path.split(".")
    current = obj

    # Navigate to the parent of the target field
    for part in parts[:-1]:
        if part not in current:
            current[part] = {}
        current = current[part]

    # Set the value
    current[parts[-1]] = value
evaluate_comparison(left, op_str, right)

Evaluate a comparison operation.

Source code in ja/expr.py
def evaluate_comparison(self, left: Any, op_str: str, right: Any) -> bool:
    """Evaluate a comparison operation."""
    op_func = None
    for op, func in self.operators:
        if op == op_str:
            op_func = func
            break

    if op_func is None:
        raise ValueError(f"Unknown operator: {op_str}")

    # Special handling for null comparisons
    if left is None or right is None:
        if op_str == "==":
            return left == right
        elif op_str == "!=":
            return left != right
        else:
            return False

    # Type coercion for comparison
    try:
        return op_func(left, right)
    except (TypeError, ValueError):
        # If comparison fails, try string comparison
        try:
            return op_func(str(left), str(right))
        except:
            return False
evaluate(expr, context)

Parse and evaluate an expression.

Examples:

"status == active" "age > 30" "user.type == premium"

Source code in ja/expr.py
def evaluate(self, expr: str, context: Dict[str, Any]) -> bool:
    """Parse and evaluate an expression.

    Examples:
        "status == active"
        "age > 30"
        "user.type == premium"
    """
    expr = expr.strip()

    # Empty expression is false
    if not expr:
        return False

    # Check for operators
    for op_str, op_func in self.operators:
        if op_str in expr:
            # Split on the FIRST occurrence of the operator
            parts = expr.split(op_str, 1)
            if len(parts) == 2:
                left_expr = parts[0].strip()
                right_expr = parts[1].strip()

                # Left side is always a field path
                left_val = self.get_field_value(context, left_expr)

                # Right side: check if it's a field or a literal
                # A token is a field if it exists as a key in the context
                # and is not a boolean/null keyword.
                if right_expr in context and right_expr.lower() not in ['true', 'false', 'null', 'none']:
                    # It's a field reference
                    right_val = self.get_field_value(context, right_expr)
                else:
                    # It's a literal value
                    right_val = self.parse_value(right_expr)

                return self.evaluate_comparison(left_val, op_str, right_val)

    # No operator found - treat as existence/truthiness check
    value = self.get_field_value(context, expr)
    return bool(value)
evaluate_arithmetic(expr, context)

Evaluate simple arithmetic expressions.

Examples:

"amount * 1.1" "score + bonus"

Source code in ja/expr.py
def evaluate_arithmetic(
    self, expr: str, context: Dict[str, Any]
) -> Optional[float]:
    """Evaluate simple arithmetic expressions.

    Examples:
        "amount * 1.1"
        "score + bonus"
    """
    # Simple arithmetic support
    for op, func in [
        ("*", operator.mul),
        ("+", operator.add),
        ("-", operator.sub),
        ("/", operator.truediv),
    ]:
        if op in expr:
            parts = expr.split(op, 1)
            if len(parts) == 2:
                left_str = parts[0].strip()
                right_str = parts[1].strip()

                # Get left value (field or literal)
                left_val = self.get_field_value(context, left_str)
                if left_val is None:
                    left_val = self.parse_value(left_str)

                # Get right value (field or literal)
                right_val = self.get_field_value(context, right_str)
                if right_val is None:
                    right_val = self.parse_value(right_str)

                try:
                    return func(float(left_val), float(right_val))
                except (TypeError, ValueError):
                    return None

    # No operator - try as field or literal
    val = self.get_field_value(context, expr)
    if val is None:
        val = self.parse_value(expr)

    try:
        return float(val)
    except (TypeError, ValueError):
        return None