Skip to content

API Reference

Welcome to the complete API reference for JSONL Algebra. Every function and class is listed here with its full documentation.

ja.core

Core relational operations for the JSONL algebra system.

This module implements the fundamental set and relational operations that form the algebra for manipulating collections of JSON objects. All operations are designed to work with lists of dictionaries, making them suitable for processing JSONL data.

collect(data)

Collect metadata-grouped rows into actual groups.

This function takes rows with _groups metadata (from groupby operations) and collects them into explicit groups. Each output row represents one group with all its members in a _rows array.

Parameters:

Name Type Description Default
data Relation

List of dictionaries with _groups metadata

required

Returns:

Type Description
Relation

List where each dict represents a group with _rows array

Example

Input: [ {"id": 1, "region": "North", "_groups": [{"field": "region", "value": "North"}]}, {"id": 2, "region": "North", "_groups": [{"field": "region", "value": "North"}]}, {"id": 3, "region": "South", "_groups": [{"field": "region", "value": "South"}]} ]

Output: [ {"region": "North", "_rows": [{"id": 1, "region": "North"}, {"id": 2, "region": "North"}]}, {"region": "South", "_rows": [{"id": 3, "region": "South"}]} ]

Source code in ja/core.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def collect(data: Relation) -> Relation:
    """Collect metadata-grouped rows into actual groups.

    This function takes rows with _groups metadata (from groupby operations)
    and collects them into explicit groups. Each output row represents one
    group with all its members in a _rows array.

    Args:
        data: List of dictionaries with _groups metadata

    Returns:
        List where each dict represents a group with _rows array

    Example:
        Input:
        [
            {"id": 1, "region": "North", "_groups": [{"field": "region", "value": "North"}]},
            {"id": 2, "region": "North", "_groups": [{"field": "region", "value": "North"}]},
            {"id": 3, "region": "South", "_groups": [{"field": "region", "value": "South"}]}
        ]

        Output:
        [
            {"region": "North", "_rows": [{"id": 1, "region": "North"}, {"id": 2, "region": "North"}]},
            {"region": "South", "_rows": [{"id": 3, "region": "South"}]}
        ]
    """
    if not data:
        return []

    # Check if data has grouping metadata
    if "_groups" not in data[0]:
        # No grouping metadata - treat entire dataset as one group
        return [{"_rows": data}]

    # Collect rows by their group keys
    groups = defaultdict(list)

    for row in data:
        # Build group key from metadata
        group_key = tuple((g["field"], g["value"]) for g in row["_groups"])

        # Create clean row without metadata
        clean_row = {k: v for k, v in row.items() if not k.startswith("_")}

        groups[group_key].append(clean_row)

    # Build output with one row per group
    result = []
    for group_key, rows in groups.items():
        group_dict = {}

        # Add group fields to output
        for field, value in group_key:
            group_dict[field] = value

        # Add collected rows
        group_dict["_rows"] = rows

        result.append(group_dict)

    return result

difference(left, right)

Compute the difference of two collections.

Parameters:

Name Type Description Default
left Relation

First collection

required
right Relation

Second collection

required

Returns:

Type Description
Relation

Elements in left but not in right

Source code in ja/core.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def difference(
    left: Relation, right: Relation
) -> Relation:
    """Compute the difference of two collections.

    Args:
        left: First collection
        right: Second collection

    Returns:
        Elements in left but not in right
    """
    # Convert right to a set of tuples for efficient lookup
    right_set = {tuple(sorted(row.items())) for row in right}

    result = []
    for row in left:
        if tuple(sorted(row.items())) not in right_set:
            result.append(row)

    return result

distinct(data)

Remove duplicate rows from a collection.

Parameters:

Name Type Description Default
data Relation

List of dictionaries

required

Returns:

Type Description
Relation

List with duplicates removed

Source code in ja/core.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def distinct(data: Relation) -> Relation:
    """Remove duplicate rows from a collection.

    Args:
        data: List of dictionaries

    Returns:
        List with duplicates removed
    """
    seen = set()
    result = []

    for row in data:
        # Convert to tuple for hashability
        row_tuple = tuple(sorted(row.items()))
        if row_tuple not in seen:
            seen.add(row_tuple)
            result.append(row)

    return result

intersection(left, right)

Compute the intersection of two collections.

Parameters:

Name Type Description Default
left Relation

First collection

required
right Relation

Second collection

required

Returns:

Type Description
Relation

Intersection of the two collections

Source code in ja/core.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def intersection(
    left: Relation, right: Relation
) -> Relation:
    """Compute the intersection of two collections.

    Args:
        left: First collection
        right: Second collection

    Returns:
        Intersection of the two collections
    """
    # Convert right to a set of tuples for efficient lookup
    right_set = {tuple(sorted(row.items())) for row in right}

    result = []
    for row in left:
        if tuple(sorted(row.items())) in right_set:
            result.append(row)

    return result

join(left, right, on)

Inner join with nested-key support.

Source code in ja/core.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def join(left: Relation,
         right: Relation,
         on: List[Tuple[str, str]]) -> Relation:
    """Inner join with nested-key support."""
    parser = ExprEval()

    # index right side
    right_index: Dict[Tuple[Any, ...], List[Row]] = defaultdict(list)
    for r in right:
        key = tuple(parser.get_field_value(r, rk) for _, rk in on)
        if all(v is not None for v in key):
            right_index[key].append(r)

    # roots of every RHS join path (e.g. 'user.id' → 'user')
    rhs_roots = {re.split(r"[.\[]", rk, 1)[0] for _, rk in on}

    joined: Relation = []
    for l in left:
        l_key = tuple(parser.get_field_value(l, lk) for lk, _ in on)
        if not all(v is not None for v in l_key):
            continue
        for r in right_index.get(l_key, []):
            merged = r.copy()
            merged.update(l)          # left wins
            # drop right-side join columns
            for root in rhs_roots:
                merged.pop(root, None)
            joined.append(merged)
    return joined

product(left, right)

Cartesian product; colliding keys from right are prefixed with b_.

Source code in ja/core.py
193
194
195
196
197
198
199
200
201
202
203
204
205
def product(left: Relation, right: Relation) -> Relation:
    """Cartesian product; colliding keys from *right* are prefixed with ``b_``."""
    result: Relation = []
    for l in left:
        for r in right:
            merged = l.copy()
            for k, v in r.items():
                if k in merged:
                    merged[f"b_{k}"] = v
                else:
                    merged[k] = v
            result.append(merged)
    return result

project(data, fields, use_jmespath=False)

Project specific fields from each row.

Parameters:

Name Type Description Default
data Relation

List of dictionaries to project

required
fields List[str] | str

Comma-separated field names or expressions

required
use_jmespath bool

If True, use JMESPath for projection

False

Returns:

Type Description
Relation

List of dictionaries with only the specified fields

Source code in ja/core.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def project(
    data: Relation, fields: List[str] | str, use_jmespath: bool = False
) -> Relation:
    """Project specific fields from each row.

    Args:
        data: List of dictionaries to project
        fields: Comma-separated field names or expressions
        use_jmespath: If True, use JMESPath for projection

    Returns:
        List of dictionaries with only the specified fields
    """

    if use_jmespath:
        compiled_expr = jmespath.compile(fields)
        return [compiled_expr.search(row) for row in data]

    # Parse field specifications
    result = []
    parser = ExprEval()
    field_specs = fields if isinstance(fields, list) else fields.split(",")

    for row in data:
        new_row = {}

        for spec in field_specs:
            if "=" in spec:
                # Computed field: "total=amount*1.1" or "is_adult=age>=18"
                name, expr = spec.split("=", 1)
                name = name.strip()
                expr = expr.strip()

                # Check if it's an arithmetic expression
                arith_result = parser.evaluate_arithmetic(expr, row)
                if arith_result is not None:
                    new_row[name] = arith_result
                else:
                    # Try as boolean expression
                    new_row[name] = parser.evaluate(expr, row)
            else:
                # Simple field projection
                value = parser.get_field_value(row, spec)
                if value is not None:
                    # Build nested structure
                    parser.set_field_value(new_row, spec, value)

        result.append(new_row)

    return result

rename(data, mapping)

Rename fields in each row.

Parameters:

Name Type Description Default
data Relation

List of dictionaries

required
mapping Dict[str, str]

Dictionary mapping old names to new names

required

Returns:

Type Description
Relation

List with renamed fields

Source code in ja/core.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def rename(data: Relation, mapping: Dict[str, str]) -> Relation:
    """Rename fields in each row.

    Args:
        data: List of dictionaries
        mapping: Dictionary mapping old names to new names

    Returns:
        List with renamed fields
    """
    result = []
    for row in data:
        new_row = {}
        for key, value in row.items():
            new_key = mapping.get(key, key)
            new_row[new_key] = value
        result.append(new_row)
    return result

select(data, expr, use_jmespath=False)

Filter rows based on an expression.

Parameters:

Name Type Description Default
data Relation

List of dictionaries to filter

required
expr str

Expression to evaluate (simple expression or JMESPath)

required
use_jmespath bool

If True, use JMESPath evaluation

False

Returns:

Type Description
Relation

List of rows where the expression evaluates to true

Source code in ja/core.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def select(
    data: Relation, expr: str, use_jmespath: bool = False
) -> Relation:
    """Filter rows based on an expression.

    Args:
        data: List of dictionaries to filter
        expr: Expression to evaluate (simple expression or JMESPath)
        use_jmespath: If True, use JMESPath evaluation

    Returns:
        List of rows where the expression evaluates to true
    """
    if use_jmespath:
        compiled_expr = jmespath.compile(expr)
        return [row for row in data if compiled_expr.search(row)]

    # Use simple expression parser
    parser = ExprEval()
    result = []

    # Handle 'and' at the command level for simplicity
    if " and " in expr:
        # Multiple conditions with 'and'
        conditions = expr.split(" and ")
        for row in data:
            if all(parser.evaluate(cond.strip(), row) for cond in conditions):
                result.append(row)
    elif " or " in expr:
        # Multiple conditions with 'or'
        conditions = expr.split(" or ")
        for row in data:
            if any(parser.evaluate(cond.strip(), row) for cond in conditions):
                result.append(row)
    else:
        # Single condition
        for row in data:
            if parser.evaluate(expr, row):
                result.append(row)

    return result

union(left, right)

Compute the union of two collections.

Parameters:

Name Type Description Default
left Relation

First collection

required
right Relation

Second collection

required

Returns:

Type Description
Relation

Union of the two collections

Source code in ja/core.py
228
229
230
231
232
233
234
235
236
237
238
239
240
def union(
    left: Relation, right: Relation
) -> Relation:
    """Compute the union of two collections.

    Args:
        left: First collection
        right: Second collection

    Returns:
        Union of the two collections
    """
    return left + right

ja.cli

Command-line interface for JSONL algebra operations.

This module provides the main CLI entry point and argument parsing for all JSONL algebra operations including relational algebra, schema inference, data import/export, and interactive REPL mode.

handle_export_command_group(args)

Handle export subcommands by delegating to appropriate handlers.

Parameters:

Name Type Description Default
args

Parsed command line arguments with export_cmd attribute.

required
Source code in ja/cli.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def handle_export_command_group(args):
    """Handle export subcommands by delegating to appropriate handlers.

    Args:
        args: Parsed command line arguments with export_cmd attribute.
    """
    export_command_handlers = {
        "array": handle_to_array,
        "jsonl": handle_to_jsonl,
        "explode": handle_explode,
        "csv": handle_to_csv,
    }
    handler = export_command_handlers.get(args.export_cmd)
    if handler:
        handler(args)
    else:
        # This should not be reached if subcommands are handled correctly in main
        print(f"Unknown export command: {args.export_cmd}", file=sys.stderr)
        sys.exit(1)

handle_import_command_group(args)

Handle import subcommands by delegating to appropriate handlers.

Parameters:

Name Type Description Default
args

Parsed command line arguments with import_cmd attribute.

required
Source code in ja/cli.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def handle_import_command_group(args):
    """Handle import subcommands by delegating to appropriate handlers.

    Args:
        args: Parsed command line arguments with import_cmd attribute.
    """
    import_command_handlers = {
        "csv": handle_import_csv,
        "implode": handle_implode,
    }
    handler = import_command_handlers.get(args.import_cmd)
    if handler:
        handler(args)
    else:
        # This should not be reached if subcommands are handled correctly in main
        print(f"Unknown import command: {args.import_cmd}", file=sys.stderr)
        sys.exit(1)

json_error(error_type, message, details=None, exit_code=1)

Output error in JSON format and exit.

Parameters:

Name Type Description Default
error_type

Type of error (e.g., "ParseError", "IOError")

required
message

Human-readable error message

required
details

Optional dict with additional error details

None
exit_code

Exit code (default: 1)

1
Source code in ja/cli.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def json_error(error_type, message, details=None, exit_code=1):
    """Output error in JSON format and exit.

    Args:
        error_type: Type of error (e.g., "ParseError", "IOError")
        message: Human-readable error message
        details: Optional dict with additional error details
        exit_code: Exit code (default: 1)
    """
    error_obj = {"error": {"type": error_type, "message": message}}
    if details:
        error_obj["error"]["details"] = details

    # If stderr is not a tty (redirected/piped), always output JSON
    # If stderr is a tty, output human-readable unless JA_JSON_ERRORS is set
    if not sys.stderr.isatty() or os.environ.get("JA_JSON_ERRORS"):
        print(json.dumps(error_obj), file=sys.stderr)
    else:
        # Human-readable format for terminal
        print(f"ja: error: {message}", file=sys.stderr)
        if details:
            for key, value in details.items():
                if value is not None and key != "traceback":
                    print(f"  {key}: {value}", file=sys.stderr)

    sys.exit(exit_code)

ja.repl

Interactive REPL (Read-Eval-Print Loop) for JSONL algebra operations.

This module provides a friendly, interactive shell for chaining JSONL algebra operations together. It's a great way to explore your data, build up complex transformation pipelines step-by-step, and see the results instantly.

Think of it as a command-line laboratory for your JSONL data!

ReplCompiler

Compiles and executes a sequence of JSONL algebra commands.

This class is the engine of the REPL. It manages the state of the command pipeline, parses user input, and translates the pipeline into a shell command that can be executed. It's designed to provide an intuitive, interactive experience for building data workflows.

Source code in ja/repl.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
class ReplCompiler:
    """Compiles and executes a sequence of JSONL algebra commands.

    This class is the engine of the REPL. It manages the state of the command
    pipeline, parses user input, and translates the pipeline into a shell command
    that can be executed. It's designed to provide an intuitive, interactive
    experience for building data workflows.
    """

    def __init__(self):
        """Initialize the REPL compiler with an empty pipeline."""
        self.pipeline = []
        self.current_input_source = None  # Can be a file path or None (implying stdin)
        self.handlers = {}  # Command handlers are registered in the `run` method.

    def parse_command(self, line):
        """Parse a line of input into a command and its arguments.

        Uses `shlex` to handle quoted arguments correctly.

        Args:
            line (str): The raw input line from the user.

        Returns:
            A tuple of (command, args_list), or (None, None) if parsing fails.
        """
        try:
            parts = shlex.split(line)
        except ValueError as e:
            print(f"Error parsing command: {e}. Check your quotes.")
            return None, None
        if not parts:
            return None, None
        command = parts[0].lower()
        args = parts[1:]
        return command, args

    def handle_from(self, args):
        """Set the initial data source for the pipeline (e.g., a file).

        This command must be the first one used when starting a new pipeline
        with a file source.

        Args:
            args (list): A list containing the file path or "stdin".
        """
        if not args:
            print("Error: 'from' requires a file path (or 'stdin').")
            return
        if self.pipeline:
            print(
                "Error: 'from' can only be used at the beginning of a new pipeline. Use 'reset' first."
            )
            return
        self.current_input_source = args[0]
        if self.current_input_source.lower() == "stdin":
            self.current_input_source = None  # Internally, None means stdin for clarity
            print("Input source set to: stdin")
        else:
            print(f"Input source set to: {self.current_input_source}")

    def add_to_pipeline(self, command_name, args, cli_command_name=None):
        """Add a new command step to the current pipeline.

        Args:
            command_name (str): The name of the REPL command (e.g., "project").
            args (list): The list of arguments for the command.
            cli_command_name (str, optional): The corresponding `ja` CLI command name.
                                             Defaults to `command_name`.
        """
        if not cli_command_name:
            cli_command_name = command_name
        # Ensure 'from' is not added to the pipeline steps directly
        if command_name.lower() == "from":
            print(
                "Error: 'from' is a directive, not a pipeline step. Use 'reset' then 'from <file>'."
            )
            return
        self.pipeline.append(
            {
                "repl_command": command_name,
                "cli_command": cli_command_name,
                "args": args,
            }
        )
        print(f"Added: {command_name} {' '.join(shlex.quote(a) for a in args)}")

    def handle_select(self, args):
        """Handle the 'select' command by adding it to the pipeline."""
        if not args:
            print("Error: 'select' requires an expression.")
            return
        self.add_to_pipeline("select", args)

    def handle_project(self, args):
        """Handle the 'project' command by adding it to the pipeline."""
        if not args:
            print("Error: 'project' requires column names.")
            return
        self.add_to_pipeline("project", args)

    def handle_join(self, args):
        """Handle the 'join' command by adding it to the pipeline."""
        if len(args) < 3 or args[-2].lower() != "--on":
            print("Error: 'join' requires <right_file> --on <key_map>.")
            print("Example: join orders.jsonl --on user.id=customer_id")
            return
        self.add_to_pipeline("join", args)

    def handle_rename(self, args):
        """Handle the 'rename' command by adding it to the pipeline."""
        if not args:
            print("Error: 'rename' requires a mapping (e.g., old_name=new_name).")
            return
        self.add_to_pipeline("rename", args)

    def handle_distinct(self, args):
        """Handle the 'distinct' command by adding it to the pipeline."""
        if args:
            print("Warning: 'distinct' does not take arguments in REPL mode. Ignoring.")
        self.add_to_pipeline("distinct", [])

    def handle_sort(self, args):
        """Handle the 'sort' command by adding it to the pipeline."""
        if not args:
            print("Error: 'sort' requires column names.")
            return
        self.add_to_pipeline("sort", args)

    def handle_groupby(self, args):
        """Handle the 'groupby' command by adding it to the pipeline."""
        # Support both chained groupby (no --agg) and immediate aggregation (with --agg)
        if "--agg" in args:
            # Traditional groupby with immediate aggregation
            if len(args) < 3 or args[-2].lower() != "--agg":
                print("Error: 'groupby --agg' requires <key> --agg <spec>.")
                print("Example: groupby user.location --agg count,sum(amount)")
                return
        else:
            # Chained groupby mode
            if not args:
                print("Error: 'groupby' requires a key.")
                print("Example: groupby region")
                return
        self.add_to_pipeline("groupby", args)

    def handle_agg(self, args):
        """Handle the 'agg' command by adding it to the pipeline."""
        if not args:
            print("Error: 'agg' requires an aggregation specification.")
            print("Example: agg count,total=sum(amount)")
            return
        self.add_to_pipeline("agg", args)

    def handle_product(self, args):
        """Handle the 'product' command by adding it to the pipeline."""
        if not args:
            print("Error: 'product' requires a right file path.")
            return
        self.add_to_pipeline("product", args)

    def handle_union(self, args):
        """Handle the 'union' command by adding it to the pipeline."""
        if not args:
            print("Error: 'union' requires a file path.")
            return
        self.add_to_pipeline("union", args)

    def handle_intersection(self, args):
        """Handle the 'intersection' command by adding it to the pipeline."""
        if not args:
            print("Error: 'intersection' requires a file path.")
            return
        self.add_to_pipeline("intersection", args)

    def handle_difference(self, args):
        """Handle the 'difference' command by adding it to the pipeline."""
        if not args:
            print("Error: 'difference' requires a file path.")
            return
        self.add_to_pipeline("difference", args)

    def _generate_pipeline_command_string_and_segments(self):
        """Construct the full shell command string from the pipeline steps.

        This is the core logic that translates the user's interactive steps into
        a runnable `ja ... | ja ...` shell command.

        Returns:
            A tuple containing:
            - The full, executable shell command string.
            - A list of individual command segments for display.
            - An error message string, if any.
        """
        if not self.pipeline:
            return None, None, "Pipeline is empty."

        display_segments = []
        execution_segments = []

        for i, step in enumerate(self.pipeline):
            current_ja_cmd_parts = ["ja", step["cli_command"]]
            is_first_command_in_pipe = i == 0

            if step["cli_command"] in ["join", "product", "union", "intersection", "difference"]:
                # REPL args: <right_file> [--on <key_map>] for join
                # REPL args: <right_file> for product
                # CLI: ja join <left> <right> --on <key_map>
                # CLI: ja product <left> <right>
                right_file_repl_arg = step["args"][0]

                if is_first_command_in_pipe:
                    left_input_for_cli = (
                        self.current_input_source if self.current_input_source else "-"
                    )
                else:
                    left_input_for_cli = "-"

                current_ja_cmd_parts.append(left_input_for_cli)
                current_ja_cmd_parts.append(right_file_repl_arg)

                if step["cli_command"] == "join":
                    current_ja_cmd_parts.extend(step["args"][1:])  # --on <key_map>

            elif step["cli_command"] == "groupby":
                # REPL args: <key> [--agg <spec>]
                # CLI: ja groupby <key> <file_or_stdin> [--agg <spec>]
                key_repl_arg = step["args"][0]
                other_args = step["args"][1:]

                current_ja_cmd_parts.append(key_repl_arg)

                if is_first_command_in_pipe:
                    input_file_for_cli = (
                        self.current_input_source if self.current_input_source else "-"
                    )
                else:
                    input_file_for_cli = "-"
                current_ja_cmd_parts.append(input_file_for_cli)
                current_ja_cmd_parts.extend(other_args)

            elif step["cli_command"] == "agg":
                # REPL args: <spec>
                # CLI: ja agg <spec> [file_or_stdin]
                current_ja_cmd_parts.extend(step["args"])

                if is_first_command_in_pipe:
                    if self.current_input_source:
                        current_ja_cmd_parts.append(self.current_input_source)

            else:  # select, project, rename, distinct, sort
                # REPL args: <command_specific_args>
                # CLI: ja <command> [command_specific_args...] [file_if_first_and_not_stdin]
                current_ja_cmd_parts.extend(step["args"])

                if is_first_command_in_pipe:
                    if self.current_input_source:
                        current_ja_cmd_parts.append(self.current_input_source)

            joined_segment = shlex.join(current_ja_cmd_parts)
            display_segments.append(joined_segment)
            execution_segments.append(joined_segment)

        executable_command_string = " | ".join(execution_segments)
        return executable_command_string, display_segments, None

    def handle_compile(self, cmd_args):
        """Generate and print a bash script for the current pipeline."""
        _executable_cmd_str, display_segments, error_msg = (
            self._generate_pipeline_command_string_and_segments()
        )

        if error_msg:
            print(error_msg)
            return

        print("\n--- Compiled Bash Script ---")
        print("#!/bin/bash")
        print("# Generated by ja REPL")

        if not display_segments:
            print("# Pipeline is empty.")
        elif len(display_segments) == 1:
            print(display_segments[0])
        else:
            # Build the pretty-printed pipeline string
            script_str = display_segments[0]
            for i in range(1, len(display_segments)):
                script_str += f" | \\\n  {display_segments[i]}"
            print(script_str)
        print("--------------------------\n")

    def handle_execute(self, cmd_args):
        """Execute the current pipeline and display the output."""
        limit_lines = None
        if cmd_args:
            if cmd_args[0].startswith("--lines="):
                try:
                    limit_lines = int(cmd_args[0].split("=")[1])
                    if limit_lines <= 0:
                        print("Error: --lines must be a positive integer.")
                        return
                except (ValueError, IndexError):
                    print(
                        "Error: Invalid format for --lines. Use --lines=N (e.g., --lines=10)."
                    )
                    return
            else:
                print(
                    f"Warning: Unknown argument '{cmd_args[0]}' for execute. Ignoring. Did you mean --lines=N?"
                )

        command_to_execute, _display_segments, error_msg = (
            self._generate_pipeline_command_string_and_segments()
        )

        if error_msg:
            print(error_msg)
            return
        if not command_to_execute:
            print(
                "Internal error: No command to execute."
            )  # Should be caught by error_msg
            return

        print(f"Executing: {command_to_execute}")

        try:
            process = subprocess.run(
                command_to_execute,
                shell=True,  # Essential for pipes
                capture_output=True,
                text=True,
                check=False,  # Manually check returncode
            )

            print("\n--- Output ---")
            if process.stdout:
                output_lines_list = process.stdout.splitlines()
                if limit_lines is not None:
                    for i, line_content in enumerate(output_lines_list):
                        if i < limit_lines:
                            print(line_content)
                        else:
                            print(
                                f"... (output truncated to {limit_lines} lines, total {len(output_lines_list)})"
                            )
                            break
                else:
                    print(process.stdout.strip())
            elif process.returncode == 0:
                print("(No output produced)")

            if process.stderr:
                print("\n--- Errors ---")
                print(process.stderr.strip())

            if process.returncode != 0:
                print(f"\nCommand exited with status {process.returncode}")
            elif not process.stdout and not process.stderr and process.returncode == 0:
                print("(Execution successful: No output and no errors)")

            print("--------------\n")

        except FileNotFoundError:  # pragma: no cover
            print(f"Error: Command 'ja' not found. Make sure it's in your PATH.")
        except Exception as e:  # pragma: no cover
            print(
                f"An unexpected error occurred while trying to execute the command: {e}"
            )
            # import traceback
            # traceback.print_exc()

    def handle_reset(self, args):
        """Clear the current pipeline and reset the input source."""
        self.pipeline = []
        self.current_input_source = None
        print("Pipeline reset.")

    def handle_pipeline_show(self, args):
        """Display the steps in the current pipeline."""
        if not self.pipeline:
            print("Pipeline is empty.")
        else:
            print("Current pipeline:")
            if self.current_input_source:
                print(f"  Input: {self.current_input_source}")
            else:
                print(
                    f"  Input: stdin (assumed for the first command if 'from' not used)"
                )
            for idx, step in enumerate(self.pipeline):
                print(
                    f"  {idx + 1}. {step['repl_command']} {' '.join(shlex.quote(a) for a in step['args'])}"
                )
        print("")

    def handle_help(self, args):
        """Display the help message with all available REPL commands."""
        print("\nWelcome to the `ja` REPL! Build data pipelines interactively.")
        print("Here are the available commands:\n")
        print("  from <file|stdin>      : Start a new pipeline from a file or stdin.")
        print("                           Example: from users.jsonl")
        print("  select '<expr>'        : Filter rows with a Python expression.")
        print("                           Example: select 'user.age > 30'")
        print(
            "  project <cols>         : Pick columns, supporting nested data with dot notation."
        )
        print("                           Example: project id,user.name,user.location")
        print("  join <file> --on <L=R> : Join with another file on one or more keys.")
        print("                           Example: join orders.jsonl --on id=user_id")
        print("  rename <old=new,...>   : Rename columns. Supports dot notation.")
        print("                           Example: rename user.id=user_id,location=loc")
        print("  distinct               : Remove duplicate rows based on all columns.")
        print(
            "  sort <cols>            : Sort rows by one or more columns (supports dot notation)."
        )
        print("                           Example: sort user.age,id")
        print("  groupby <key> --agg <> : Group rows and aggregate data.")
        print(
            "                           Example: groupby cat --agg count,avg:user.score"
        )
        print(
            "  product <file>         : Create a Cartesian product with another file."
        )
        print("                           Example: product features.jsonl")
        print("  union <file>           : Combine rows from another file (deduplicated).")
        print("                           Example: union archived_users.jsonl")
        print("  intersection <file>    : Keep only rows present in another file.")
        print("                           Example: intersection active_users.jsonl")
        print("  difference <file>      : Remove rows present in another file.")
        print("                           Example: difference temp_users.jsonl")

        print("\n--- Pipeline Control ---")
        print(
            "  execute [--lines=N]    : Run the pipeline and see output (e.g., execute --lines=10)."
        )
        print(
            "  compile                : Show the bash script for the current pipeline."
        )
        print("  pipeline               : Show the steps in the current pipeline.")
        print("  reset                  : Clear the current pipeline.")
        print("  help                   : Show this help message.")
        print("  exit                   : Exit the REPL.\n")

        print("Tips:")
        print("- Use dot notation (e.g., `user.address.city`) for nested JSON fields.")
        print(
            """- Wrap arguments with spaces in quotes (e.g., select 'name == "John Doe"').\n"""
        )

    def process(self, line):
        """Process a single line of input from the REPL.

        This method parses the line, finds the appropriate handler for the
        command, and invokes it.

        Args:
            line (str): The line of input to process.
        """
        try:
            if not line:
                return

            command, cmd_args = self.parse_command(line)
            if command is None:  # Parsing error
                return

            if command in self.handlers:
                self.handlers[command](cmd_args)
            elif command:
                print(
                    f"Unknown command: '{command}'. Type 'help' for available commands."
                )

        except EOFError:
            print("\nExiting...")
        except KeyboardInterrupt:
            print("\nInterrupted. Use 'exit' to quit.")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            import traceback

            traceback.print_exc()

    def run(self, initial_command_list=None):  # Renamed 'args' for clarity
        """Start the main REPL event loop.

        This method prints a welcome message, registers all command handlers,
        and enters an infinite loop to read and process user input.

        Args:
            initial_command_list (list, optional): A list of command-line arguments
                                                   to process before starting the
                                                   interactive loop.
        """
        print("Welcome to ja REPL. Type 'help' for commands, 'exit' to quit.")
        self.handlers = {  # Assign to self.handlers so self.process() can use it
            "from": self.handle_from,
            "select": self.handle_select,
            "project": self.handle_project,
            "join": self.handle_join,
            "rename": self.handle_rename,
            "distinct": self.handle_distinct,
            "sort": self.handle_sort,
            "groupby": self.handle_groupby,
            "product": self.handle_product,
            "union": self.handle_union,
            "intersection": self.handle_intersection,
            "difference": self.handle_difference,
            "compile": self.handle_compile,
            "execute": self.handle_execute,
            "agg": self.handle_agg,
            "reset": self.handle_reset,
            "pipeline": self.handle_pipeline_show,
            "help": self.handle_help,
            "exit": lambda _args: sys.exit(
                0
            ),  # Consistent signature with other handlers
        }

        if initial_command_list and len(initial_command_list) > 0:
            processed_initial_parts = list(initial_command_list)
            # If the first token of initial_command_list is not a known REPL command,
            # assume 'from' should be prepended.
            # This allows `ja repl myfile.jsonl` to be treated as `from myfile.jsonl`.
            if processed_initial_parts[0].lower() not in self.handlers:
                processed_initial_parts.insert(0, "from")

            initial_line = shlex.join(
                processed_initial_parts
            )  # Use shlex.join for safety
            self.process(initial_line)

        while True:
            try:
                line = input("ja> ").strip()
                if not line:
                    continue
                # 'exit' command will be handled by self.process -> self.handlers['exit']
                self.process(line)
            except EOFError:
                print("\nExiting...")
                sys.exit(0)  # Ensure clean exit
            except KeyboardInterrupt:
                print("\nInterrupted. Type 'exit' or Ctrl-D to quit.")

__init__()

Initialize the REPL compiler with an empty pipeline.

Source code in ja/repl.py
24
25
26
27
28
def __init__(self):
    """Initialize the REPL compiler with an empty pipeline."""
    self.pipeline = []
    self.current_input_source = None  # Can be a file path or None (implying stdin)
    self.handlers = {}  # Command handlers are registered in the `run` method.

add_to_pipeline(command_name, args, cli_command_name=None)

Add a new command step to the current pipeline.

Parameters:

Name Type Description Default
command_name str

The name of the REPL command (e.g., "project").

required
args list

The list of arguments for the command.

required
cli_command_name str

The corresponding ja CLI command name. Defaults to command_name.

None
Source code in ja/repl.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def add_to_pipeline(self, command_name, args, cli_command_name=None):
    """Add a new command step to the current pipeline.

    Args:
        command_name (str): The name of the REPL command (e.g., "project").
        args (list): The list of arguments for the command.
        cli_command_name (str, optional): The corresponding `ja` CLI command name.
                                         Defaults to `command_name`.
    """
    if not cli_command_name:
        cli_command_name = command_name
    # Ensure 'from' is not added to the pipeline steps directly
    if command_name.lower() == "from":
        print(
            "Error: 'from' is a directive, not a pipeline step. Use 'reset' then 'from <file>'."
        )
        return
    self.pipeline.append(
        {
            "repl_command": command_name,
            "cli_command": cli_command_name,
            "args": args,
        }
    )
    print(f"Added: {command_name} {' '.join(shlex.quote(a) for a in args)}")

handle_agg(args)

Handle the 'agg' command by adding it to the pipeline.

Source code in ja/repl.py
161
162
163
164
165
166
167
def handle_agg(self, args):
    """Handle the 'agg' command by adding it to the pipeline."""
    if not args:
        print("Error: 'agg' requires an aggregation specification.")
        print("Example: agg count,total=sum(amount)")
        return
    self.add_to_pipeline("agg", args)

handle_compile(cmd_args)

Generate and print a bash script for the current pipeline.

Source code in ja/repl.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def handle_compile(self, cmd_args):
    """Generate and print a bash script for the current pipeline."""
    _executable_cmd_str, display_segments, error_msg = (
        self._generate_pipeline_command_string_and_segments()
    )

    if error_msg:
        print(error_msg)
        return

    print("\n--- Compiled Bash Script ---")
    print("#!/bin/bash")
    print("# Generated by ja REPL")

    if not display_segments:
        print("# Pipeline is empty.")
    elif len(display_segments) == 1:
        print(display_segments[0])
    else:
        # Build the pretty-printed pipeline string
        script_str = display_segments[0]
        for i in range(1, len(display_segments)):
            script_str += f" | \\\n  {display_segments[i]}"
        print(script_str)
    print("--------------------------\n")

handle_difference(args)

Handle the 'difference' command by adding it to the pipeline.

Source code in ja/repl.py
190
191
192
193
194
195
def handle_difference(self, args):
    """Handle the 'difference' command by adding it to the pipeline."""
    if not args:
        print("Error: 'difference' requires a file path.")
        return
    self.add_to_pipeline("difference", args)

handle_distinct(args)

Handle the 'distinct' command by adding it to the pipeline.

Source code in ja/repl.py
131
132
133
134
135
def handle_distinct(self, args):
    """Handle the 'distinct' command by adding it to the pipeline."""
    if args:
        print("Warning: 'distinct' does not take arguments in REPL mode. Ignoring.")
    self.add_to_pipeline("distinct", [])

handle_execute(cmd_args)

Execute the current pipeline and display the output.

Source code in ja/repl.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def handle_execute(self, cmd_args):
    """Execute the current pipeline and display the output."""
    limit_lines = None
    if cmd_args:
        if cmd_args[0].startswith("--lines="):
            try:
                limit_lines = int(cmd_args[0].split("=")[1])
                if limit_lines <= 0:
                    print("Error: --lines must be a positive integer.")
                    return
            except (ValueError, IndexError):
                print(
                    "Error: Invalid format for --lines. Use --lines=N (e.g., --lines=10)."
                )
                return
        else:
            print(
                f"Warning: Unknown argument '{cmd_args[0]}' for execute. Ignoring. Did you mean --lines=N?"
            )

    command_to_execute, _display_segments, error_msg = (
        self._generate_pipeline_command_string_and_segments()
    )

    if error_msg:
        print(error_msg)
        return
    if not command_to_execute:
        print(
            "Internal error: No command to execute."
        )  # Should be caught by error_msg
        return

    print(f"Executing: {command_to_execute}")

    try:
        process = subprocess.run(
            command_to_execute,
            shell=True,  # Essential for pipes
            capture_output=True,
            text=True,
            check=False,  # Manually check returncode
        )

        print("\n--- Output ---")
        if process.stdout:
            output_lines_list = process.stdout.splitlines()
            if limit_lines is not None:
                for i, line_content in enumerate(output_lines_list):
                    if i < limit_lines:
                        print(line_content)
                    else:
                        print(
                            f"... (output truncated to {limit_lines} lines, total {len(output_lines_list)})"
                        )
                        break
            else:
                print(process.stdout.strip())
        elif process.returncode == 0:
            print("(No output produced)")

        if process.stderr:
            print("\n--- Errors ---")
            print(process.stderr.strip())

        if process.returncode != 0:
            print(f"\nCommand exited with status {process.returncode}")
        elif not process.stdout and not process.stderr and process.returncode == 0:
            print("(Execution successful: No output and no errors)")

        print("--------------\n")

    except FileNotFoundError:  # pragma: no cover
        print(f"Error: Command 'ja' not found. Make sure it's in your PATH.")
    except Exception as e:  # pragma: no cover
        print(
            f"An unexpected error occurred while trying to execute the command: {e}"
        )

handle_from(args)

Set the initial data source for the pipeline (e.g., a file).

This command must be the first one used when starting a new pipeline with a file source.

Parameters:

Name Type Description Default
args list

A list containing the file path or "stdin".

required
Source code in ja/repl.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def handle_from(self, args):
    """Set the initial data source for the pipeline (e.g., a file).

    This command must be the first one used when starting a new pipeline
    with a file source.

    Args:
        args (list): A list containing the file path or "stdin".
    """
    if not args:
        print("Error: 'from' requires a file path (or 'stdin').")
        return
    if self.pipeline:
        print(
            "Error: 'from' can only be used at the beginning of a new pipeline. Use 'reset' first."
        )
        return
    self.current_input_source = args[0]
    if self.current_input_source.lower() == "stdin":
        self.current_input_source = None  # Internally, None means stdin for clarity
        print("Input source set to: stdin")
    else:
        print(f"Input source set to: {self.current_input_source}")

handle_groupby(args)

Handle the 'groupby' command by adding it to the pipeline.

Source code in ja/repl.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def handle_groupby(self, args):
    """Handle the 'groupby' command by adding it to the pipeline."""
    # Support both chained groupby (no --agg) and immediate aggregation (with --agg)
    if "--agg" in args:
        # Traditional groupby with immediate aggregation
        if len(args) < 3 or args[-2].lower() != "--agg":
            print("Error: 'groupby --agg' requires <key> --agg <spec>.")
            print("Example: groupby user.location --agg count,sum(amount)")
            return
    else:
        # Chained groupby mode
        if not args:
            print("Error: 'groupby' requires a key.")
            print("Example: groupby region")
            return
    self.add_to_pipeline("groupby", args)

handle_help(args)

Display the help message with all available REPL commands.

Source code in ja/repl.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def handle_help(self, args):
    """Display the help message with all available REPL commands."""
    print("\nWelcome to the `ja` REPL! Build data pipelines interactively.")
    print("Here are the available commands:\n")
    print("  from <file|stdin>      : Start a new pipeline from a file or stdin.")
    print("                           Example: from users.jsonl")
    print("  select '<expr>'        : Filter rows with a Python expression.")
    print("                           Example: select 'user.age > 30'")
    print(
        "  project <cols>         : Pick columns, supporting nested data with dot notation."
    )
    print("                           Example: project id,user.name,user.location")
    print("  join <file> --on <L=R> : Join with another file on one or more keys.")
    print("                           Example: join orders.jsonl --on id=user_id")
    print("  rename <old=new,...>   : Rename columns. Supports dot notation.")
    print("                           Example: rename user.id=user_id,location=loc")
    print("  distinct               : Remove duplicate rows based on all columns.")
    print(
        "  sort <cols>            : Sort rows by one or more columns (supports dot notation)."
    )
    print("                           Example: sort user.age,id")
    print("  groupby <key> --agg <> : Group rows and aggregate data.")
    print(
        "                           Example: groupby cat --agg count,avg:user.score"
    )
    print(
        "  product <file>         : Create a Cartesian product with another file."
    )
    print("                           Example: product features.jsonl")
    print("  union <file>           : Combine rows from another file (deduplicated).")
    print("                           Example: union archived_users.jsonl")
    print("  intersection <file>    : Keep only rows present in another file.")
    print("                           Example: intersection active_users.jsonl")
    print("  difference <file>      : Remove rows present in another file.")
    print("                           Example: difference temp_users.jsonl")

    print("\n--- Pipeline Control ---")
    print(
        "  execute [--lines=N]    : Run the pipeline and see output (e.g., execute --lines=10)."
    )
    print(
        "  compile                : Show the bash script for the current pipeline."
    )
    print("  pipeline               : Show the steps in the current pipeline.")
    print("  reset                  : Clear the current pipeline.")
    print("  help                   : Show this help message.")
    print("  exit                   : Exit the REPL.\n")

    print("Tips:")
    print("- Use dot notation (e.g., `user.address.city`) for nested JSON fields.")
    print(
        """- Wrap arguments with spaces in quotes (e.g., select 'name == "John Doe"').\n"""
    )

handle_intersection(args)

Handle the 'intersection' command by adding it to the pipeline.

Source code in ja/repl.py
183
184
185
186
187
188
def handle_intersection(self, args):
    """Handle the 'intersection' command by adding it to the pipeline."""
    if not args:
        print("Error: 'intersection' requires a file path.")
        return
    self.add_to_pipeline("intersection", args)

handle_join(args)

Handle the 'join' command by adding it to the pipeline.

Source code in ja/repl.py
116
117
118
119
120
121
122
def handle_join(self, args):
    """Handle the 'join' command by adding it to the pipeline."""
    if len(args) < 3 or args[-2].lower() != "--on":
        print("Error: 'join' requires <right_file> --on <key_map>.")
        print("Example: join orders.jsonl --on user.id=customer_id")
        return
    self.add_to_pipeline("join", args)

handle_pipeline_show(args)

Display the steps in the current pipeline.

Source code in ja/repl.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def handle_pipeline_show(self, args):
    """Display the steps in the current pipeline."""
    if not self.pipeline:
        print("Pipeline is empty.")
    else:
        print("Current pipeline:")
        if self.current_input_source:
            print(f"  Input: {self.current_input_source}")
        else:
            print(
                f"  Input: stdin (assumed for the first command if 'from' not used)"
            )
        for idx, step in enumerate(self.pipeline):
            print(
                f"  {idx + 1}. {step['repl_command']} {' '.join(shlex.quote(a) for a in step['args'])}"
            )
    print("")

handle_product(args)

Handle the 'product' command by adding it to the pipeline.

Source code in ja/repl.py
169
170
171
172
173
174
def handle_product(self, args):
    """Handle the 'product' command by adding it to the pipeline."""
    if not args:
        print("Error: 'product' requires a right file path.")
        return
    self.add_to_pipeline("product", args)

handle_project(args)

Handle the 'project' command by adding it to the pipeline.

Source code in ja/repl.py
109
110
111
112
113
114
def handle_project(self, args):
    """Handle the 'project' command by adding it to the pipeline."""
    if not args:
        print("Error: 'project' requires column names.")
        return
    self.add_to_pipeline("project", args)

handle_rename(args)

Handle the 'rename' command by adding it to the pipeline.

Source code in ja/repl.py
124
125
126
127
128
129
def handle_rename(self, args):
    """Handle the 'rename' command by adding it to the pipeline."""
    if not args:
        print("Error: 'rename' requires a mapping (e.g., old_name=new_name).")
        return
    self.add_to_pipeline("rename", args)

handle_reset(args)

Clear the current pipeline and reset the input source.

Source code in ja/repl.py
388
389
390
391
392
def handle_reset(self, args):
    """Clear the current pipeline and reset the input source."""
    self.pipeline = []
    self.current_input_source = None
    print("Pipeline reset.")

handle_select(args)

Handle the 'select' command by adding it to the pipeline.

Source code in ja/repl.py
102
103
104
105
106
107
def handle_select(self, args):
    """Handle the 'select' command by adding it to the pipeline."""
    if not args:
        print("Error: 'select' requires an expression.")
        return
    self.add_to_pipeline("select", args)

handle_sort(args)

Handle the 'sort' command by adding it to the pipeline.

Source code in ja/repl.py
137
138
139
140
141
142
def handle_sort(self, args):
    """Handle the 'sort' command by adding it to the pipeline."""
    if not args:
        print("Error: 'sort' requires column names.")
        return
    self.add_to_pipeline("sort", args)

handle_union(args)

Handle the 'union' command by adding it to the pipeline.

Source code in ja/repl.py
176
177
178
179
180
181
def handle_union(self, args):
    """Handle the 'union' command by adding it to the pipeline."""
    if not args:
        print("Error: 'union' requires a file path.")
        return
    self.add_to_pipeline("union", args)

parse_command(line)

Parse a line of input into a command and its arguments.

Uses shlex to handle quoted arguments correctly.

Parameters:

Name Type Description Default
line str

The raw input line from the user.

required

Returns:

Type Description

A tuple of (command, args_list), or (None, None) if parsing fails.

Source code in ja/repl.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def parse_command(self, line):
    """Parse a line of input into a command and its arguments.

    Uses `shlex` to handle quoted arguments correctly.

    Args:
        line (str): The raw input line from the user.

    Returns:
        A tuple of (command, args_list), or (None, None) if parsing fails.
    """
    try:
        parts = shlex.split(line)
    except ValueError as e:
        print(f"Error parsing command: {e}. Check your quotes.")
        return None, None
    if not parts:
        return None, None
    command = parts[0].lower()
    args = parts[1:]
    return command, args

process(line)

Process a single line of input from the REPL.

This method parses the line, finds the appropriate handler for the command, and invokes it.

Parameters:

Name Type Description Default
line str

The line of input to process.

required
Source code in ja/repl.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def process(self, line):
    """Process a single line of input from the REPL.

    This method parses the line, finds the appropriate handler for the
    command, and invokes it.

    Args:
        line (str): The line of input to process.
    """
    try:
        if not line:
            return

        command, cmd_args = self.parse_command(line)
        if command is None:  # Parsing error
            return

        if command in self.handlers:
            self.handlers[command](cmd_args)
        elif command:
            print(
                f"Unknown command: '{command}'. Type 'help' for available commands."
            )

    except EOFError:
        print("\nExiting...")
    except KeyboardInterrupt:
        print("\nInterrupted. Use 'exit' to quit.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        import traceback

        traceback.print_exc()

run(initial_command_list=None)

Start the main REPL event loop.

This method prints a welcome message, registers all command handlers, and enters an infinite loop to read and process user input.

Parameters:

Name Type Description Default
initial_command_list list

A list of command-line arguments to process before starting the interactive loop.

None
Source code in ja/repl.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def run(self, initial_command_list=None):  # Renamed 'args' for clarity
    """Start the main REPL event loop.

    This method prints a welcome message, registers all command handlers,
    and enters an infinite loop to read and process user input.

    Args:
        initial_command_list (list, optional): A list of command-line arguments
                                               to process before starting the
                                               interactive loop.
    """
    print("Welcome to ja REPL. Type 'help' for commands, 'exit' to quit.")
    self.handlers = {  # Assign to self.handlers so self.process() can use it
        "from": self.handle_from,
        "select": self.handle_select,
        "project": self.handle_project,
        "join": self.handle_join,
        "rename": self.handle_rename,
        "distinct": self.handle_distinct,
        "sort": self.handle_sort,
        "groupby": self.handle_groupby,
        "product": self.handle_product,
        "union": self.handle_union,
        "intersection": self.handle_intersection,
        "difference": self.handle_difference,
        "compile": self.handle_compile,
        "execute": self.handle_execute,
        "agg": self.handle_agg,
        "reset": self.handle_reset,
        "pipeline": self.handle_pipeline_show,
        "help": self.handle_help,
        "exit": lambda _args: sys.exit(
            0
        ),  # Consistent signature with other handlers
    }

    if initial_command_list and len(initial_command_list) > 0:
        processed_initial_parts = list(initial_command_list)
        # If the first token of initial_command_list is not a known REPL command,
        # assume 'from' should be prepended.
        # This allows `ja repl myfile.jsonl` to be treated as `from myfile.jsonl`.
        if processed_initial_parts[0].lower() not in self.handlers:
            processed_initial_parts.insert(0, "from")

        initial_line = shlex.join(
            processed_initial_parts
        )  # Use shlex.join for safety
        self.process(initial_line)

    while True:
        try:
            line = input("ja> ").strip()
            if not line:
                continue
            # 'exit' command will be handled by self.process -> self.handlers['exit']
            self.process(line)
        except EOFError:
            print("\nExiting...")
            sys.exit(0)  # Ensure clean exit
        except KeyboardInterrupt:
            print("\nInterrupted. Type 'exit' or Ctrl-D to quit.")

repl(parsed_cli_args)

Entry point for the ja repl command.

Initializes and runs the ReplCompiler.

Parameters:

Name Type Description Default
parsed_cli_args Namespace

The parsed command-line arguments, which may include an initial file to load.

required
Source code in ja/repl.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
def repl(parsed_cli_args):  # Receives the argparse.Namespace object
    """Entry point for the `ja repl` command.

    Initializes and runs the ReplCompiler.

    Args:
        parsed_cli_args (argparse.Namespace): The parsed command-line arguments,
                                              which may include an initial file
                                              to load.
    """
    compiler = ReplCompiler()
    # Get the list of initial arguments passed to `ja repl ...`
    # getattr default to empty list if 'initial_args' is not present (it will be due to nargs="*")
    initial_repl_args_list = getattr(parsed_cli_args, "initial_args", [])
    compiler.run(initial_command_list=initial_repl_args_list)

ja.commands

Command handlers for the JSONL algebra CLI.

This module connects the command-line interface to the core data processing functions. Each handle_* function is responsible for reading input data, calling the appropriate core function, and writing the results to stdout.

get_input_stream(file_path)

Yield a readable file-like object.

  • If file_path is None or '-', yield sys.stdin.
  • Otherwise open the given path for reading.
Source code in ja/commands.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@contextmanager
def get_input_stream(file_path):
    """
    Yield a readable file-like object.

    - If file_path is None or '-', yield sys.stdin.
    - Otherwise open the given path for reading.
    """
    if file_path is not None and file_path != "-":
        f = open(file_path, "r")
        try:
            yield f
        finally:
            f.close()
    else:
        yield sys.stdin

handle_agg(args)

Handle agg command.

Source code in ja/commands.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def handle_agg(args):
    """Handle agg command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    if not data:
        write_jsonl([])
        return

    # Check if input has group metadata - use new format
    if "_groups" in data[0]:
        # Process grouped data
        result = aggregate_grouped_data(data, args.agg)
    else:
        # Process ungrouped data
        result = [aggregate_single_group(data, args.agg)]

    write_jsonl(result)

handle_collect(args)

Handle collect command.

Source code in ja/commands.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
def handle_collect(args):
    """Handle collect command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    if not data:
        write_jsonl([])
        return

    # Check for streaming flag
    if hasattr(args, "streaming") and args.streaming:
        json_error(
            "StreamingError",
            "Collect operation requires seeing all data and cannot be performed in streaming mode. "
            "Remove --streaming flag or use window-based processing with --window-size",
        )
        return

    # Handle window-based collection
    if hasattr(args, "window_size") and args.window_size:
        # Process data in windows
        window_size = args.window_size
        for i in range(0, len(data), window_size):
            window = data[i : i + window_size]
            result = collect(window)
            write_jsonl(result)
    else:
        # Collect all data at once
        result = collect(data)
        write_jsonl(result)

handle_difference(args)

Handle difference command.

Source code in ja/commands.py
199
200
201
202
203
204
205
206
207
def handle_difference(args):
    """Handle difference command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = difference(left_data, right_data)
    write_jsonl(result)

handle_distinct(args)

Handle distinct command.

Source code in ja/commands.py
210
211
212
213
214
215
216
def handle_distinct(args):
    """Handle distinct command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    result = distinct(data)
    write_jsonl(result)

handle_explode(args)

Handle explode command.

Source code in ja/commands.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def handle_explode(args):
    """Handle explode command."""
    input_filename_stem = "jsonl_output"  # Default stem
    if args.file and args.file != "-":
        input_filename_stem = Path(args.file).stem

    output_directory = args.output_dir if args.output_dir else input_filename_stem

    with get_input_stream(args.file) as input_stream:
        try:
            jsonl_to_dir(input_stream, output_directory, input_filename_stem)
        except Exception as e:
            print(f"Error during explode operation: {e}", file=sys.stderr)
            sys.exit(1)

handle_groupby(args)

Handle groupby command.

Source code in ja/commands.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def handle_groupby(args):
    """Handle groupby command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    if hasattr(args, "agg") and args.agg:
        # Traditional groupby with aggregation
        result = groupby_agg(data, args.key, args.agg)
    else:
        # Check if input is already grouped - look for new format
        if data and "_groups" in data[0]:
            # This is a chained groupby
            result = groupby_chained(data, args.key)
        else:
            # First groupby
            result = groupby_with_metadata(data, args.key)

    write_jsonl(result)

handle_implode(args)

Handle implode command.

Source code in ja/commands.py
311
312
313
314
315
316
317
318
319
320
321
322
323
def handle_implode(args):
    """Handle implode command."""
    try:
        for line in dir_to_jsonl_lines(
            args.input_dir, args.add_filename_key, args.recursive
        ):
            print(line)
    except ValueError as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"An unexpected error occurred during implode: {e}", file=sys.stderr)
        sys.exit(1)

handle_import_csv(args)

Handle import-csv command.

Source code in ja/commands.py
326
327
328
329
330
331
332
333
334
335
336
337
338
def handle_import_csv(args):
    """Handle import-csv command."""
    with get_input_stream(args.file) as input_stream:
        try:
            for line in csv_to_jsonl_lines(
                input_stream, has_header=args.has_header, infer_types=args.infer_types
            ):
                print(line)
        except Exception as e:
            print(
                f"An unexpected error occurred during CSV import: {e}", file=sys.stderr
            )
            sys.exit(1)

handle_intersection(args)

Handle intersection command.

Source code in ja/commands.py
188
189
190
191
192
193
194
195
196
def handle_intersection(args):
    """Handle intersection command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = intersection(left_data, right_data)
    write_jsonl(result)

handle_join(args)

Handle join command.

Source code in ja/commands.py
129
130
131
132
133
134
135
136
137
138
139
140
141
def handle_join(args):
    """Handle join command."""
    with get_input_stream(args.left) as f:
        left = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right = read_jsonl(f)

    lcol_str, rcol_str = args.on.split("=", 1)
    lcol = lcol_str.strip()
    rcol = rcol_str.strip()

    result = join(left, right, [(lcol, rcol)])
    write_jsonl(result)

handle_product(args)

Handle product command.

Source code in ja/commands.py
144
145
146
147
148
149
150
151
152
def handle_product(args):
    """Handle product command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = product(left_data, right_data)
    write_jsonl(result)

handle_project(args)

Handle project command.

Source code in ja/commands.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def handle_project(args):
    """Handle project command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    use_jmespath = hasattr(args, "jmespath") and args.jmespath

    try:
        result = project(data, args.expr, use_jmespath=use_jmespath)
        write_jsonl(result)
    except jmespath.exceptions.ParseError as e:
        json_error(
            "JMESPathParseError",
            f"Invalid JMESPath expression: {e}",
            {"expression": args.expr},
        )

handle_rename(args)

Handle rename command.

Source code in ja/commands.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def handle_rename(args):
    """Handle rename command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    mapping_pairs = args.mapping.split(",")
    mapping = {}
    for pair_str in mapping_pairs:
        parts = pair_str.split("=", 1)
        if len(parts) == 2:
            old_name, new_name = parts
            mapping[old_name.strip()] = new_name.strip()
        else:
            print(
                f"Warning: Malformed rename pair '{pair_str.strip()}' ignored.",
                file=sys.stderr,
            )

    result = rename(data, mapping)
    write_jsonl(result)

handle_schema_infer(args)

Handle schema infer command.

Source code in ja/commands.py
268
269
270
271
272
273
274
def handle_schema_infer(args):
    """Handle schema infer command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    schema = infer_schema(data)
    write_json_object(schema)

handle_schema_validate(args)

Handle schema validate command.

Source code in ja/commands.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def handle_schema_validate(args):
    """Handle schema validate command."""
    try:
        import jsonschema
    except ImportError:
        print(
            "jsonschema is not installed. Please install it with: pip install jsonschema",
            file=sys.stderr,
        )
        sys.exit(1)

    # Can't read both from stdin
    if args.schema == "-" and (not args.file or args.file == "-"):
        print(
            "Error: When reading schema from stdin, a file argument for the data to validate must be provided.",
            file=sys.stderr,
        )
        sys.exit(1)

    try:
        with get_input_stream(args.schema) as f:
            schema = json.load(f)
    except (IOError, json.JSONDecodeError) as e:
        print(
            f"Error reading or parsing schema file {args.schema}: {e}", file=sys.stderr
        )
        sys.exit(1)

    # If schema was from stdin, the file MUST be from a file, not stdin.
    data_source = args.file if args.schema == "-" else (args.file or "-")

    with get_input_stream(data_source) as lines:
        validation_failed = False
        for i, line in enumerate(lines, 1):
            try:
                instance = json.loads(line)
                jsonschema.validate(instance=instance, schema=schema)
                print(line.strip())
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON on line {i}: {e}", file=sys.stderr)
                validation_failed = True
            except jsonschema.exceptions.ValidationError as e:
                print(f"Validation error on line {i}: {e.message}", file=sys.stderr)
                validation_failed = True

    if validation_failed:
        sys.exit(1)

handle_select(args)

Handle select command.

Source code in ja/commands.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def handle_select(args):
    """Handle select command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    use_jmespath = hasattr(args, "jmespath") and args.jmespath

    try:
        result = select(data, args.expr, use_jmespath=use_jmespath)
        write_jsonl(result)
    except jmespath.exceptions.ParseError as e:
        json_error(
            "JMESPathParseError",
            f"Invalid JMESPath expression: {e}",
            {"expression": args.expr},
        )

handle_sort(args)

Handle sort command.

Source code in ja/commands.py
219
220
221
222
223
224
225
def handle_sort(args):
    """Handle sort command."""
    with get_input_stream(args.file) as f:
        data = read_jsonl(f)

    result = sort_by(data, args.keys, descending=args.desc)
    write_jsonl(result)

handle_to_array(args)

Handle to-array command.

Source code in ja/commands.py
277
278
279
280
281
def handle_to_array(args):
    """Handle to-array command."""
    with get_input_stream(args.file) as input_stream:
        array_string = jsonl_to_json_array_string(input_stream)
        print(array_string)

handle_to_csv(args)

Handle to-csv command.

Source code in ja/commands.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def handle_to_csv(args):
    """Handle to-csv command."""
    column_functions = {}
    if args.apply:
        for col, expr_str in args.apply:
            try:
                # WARNING: eval() is a security risk if the expression is not from a trusted source.
                func = eval(expr_str)
                if not callable(func):
                    raise ValueError(
                        f"Expression for column '{col}' did not evaluate to a callable function."
                    )
                column_functions[col] = func
            except Exception as e:
                print(
                    f"Error parsing --apply expression for column '{col}': {e}",
                    file=sys.stderr,
                )
                sys.exit(1)

    with get_input_stream(args.file) as input_stream:
        try:
            jsonl_to_csv_stream(
                input_stream,
                sys.stdout,
                flatten=args.flatten,
                flatten_sep=args.flatten_sep,
                column_functions=column_functions,
            )
        except Exception as e:
            print(
                f"An unexpected error occurred during CSV export: {e}", file=sys.stderr
            )
            sys.exit(1)

handle_to_jsonl(args)

Handle to-jsonl command.

Source code in ja/commands.py
284
285
286
287
288
289
290
291
292
def handle_to_jsonl(args):
    """Handle to-jsonl command."""
    with get_input_stream(args.file) as input_stream:
        try:
            for line in json_array_to_jsonl_lines(input_stream):
                print(line)
        except ValueError as e:
            print(f"Error: {e}", file=sys.stderr)
            sys.exit(1)

handle_union(args)

Handle union command.

Source code in ja/commands.py
177
178
179
180
181
182
183
184
185
def handle_union(args):
    """Handle union command."""
    with get_input_stream(args.left) as f:
        left_data = read_jsonl(f)
    with get_input_stream(args.right) as f:
        right_data = read_jsonl(f)

    result = union(left_data, right_data)
    write_jsonl(result)

json_error(error_type, message, details=None)

Print a JSON error message to stderr and exit.

Source code in ja/commands.py
78
79
80
81
82
83
84
85
86
87
88
89
def json_error(error_type: str, message: str, details: Dict[str, Any] = None) -> None:
    """Print a JSON error message to stderr and exit."""
    error_info = {
        "error": {
            "type": error_type,
            "message": message,
        }
    }
    if details:
        error_info["error"]["details"] = details
    print(json.dumps(error_info), file=sys.stderr)
    sys.exit(1)

read_jsonl(input_stream)

Read JSONL data from a file-like object.

Source code in ja/commands.py
62
63
64
def read_jsonl(input_stream) -> List[Dict[str, Any]]:
    """Read JSONL data from a file-like object."""
    return [json.loads(line) for line in input_stream]

write_json_object(obj)

Write a single object as pretty-printed JSON to stdout.

Source code in ja/commands.py
73
74
75
def write_json_object(obj: Any) -> None:
    """Write a single object as pretty-printed JSON to stdout."""
    print(json.dumps(obj, indent=2))

write_jsonl(rows)

Write a collection of objects as JSONL to stdout.

Source code in ja/commands.py
67
68
69
70
def write_jsonl(rows: List[Dict[str, Any]]) -> None:
    """Write a collection of objects as JSONL to stdout."""
    for row in rows:
        print(json.dumps(row))

ja.group

Grouping operations for JSONL algebra.

This module provides grouping functionality that supports both immediate aggregation and metadata-based chaining for multi-level grouping.

groupby_agg(data, group_key, agg_spec)

Group and aggregate in one operation.

This function is kept for backward compatibility and for the --agg flag. It's more efficient for simple cases but less flexible than chaining.

Parameters:

Name Type Description Default
data Relation

List of dictionaries to group and aggregate

required
group_key str

Field to group by

required
agg_spec str

Aggregation specification

required

Returns:

Type Description
Relation

List of aggregated results, one per group

Source code in ja/group.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def groupby_agg(data: Relation, group_key: str, agg_spec: str) -> Relation:
    """Group and aggregate in one operation.

    This function is kept for backward compatibility and for the --agg flag.
    It's more efficient for simple cases but less flexible than chaining.

    Args:
        data: List of dictionaries to group and aggregate
        group_key: Field to group by
        agg_spec: Aggregation specification

    Returns:
        List of aggregated results, one per group
    """
    parser = ExprEval()

    # Group data
    groups = defaultdict(list)
    for row in data:
        key = parser.get_field_value(row, group_key)
        groups[key].append(row)

    # Apply aggregations
    result = []
    agg_specs = parse_agg_specs(agg_spec)

    for key, group_rows in groups.items():
        row_result = {group_key: key}
        for spec in agg_specs:
            row_result.update(apply_single_agg(spec, group_rows))
        result.append(row_result)

    return result

groupby_chained(grouped_data, new_group_key)

Apply groupby to already-grouped data.

This function handles multi-level grouping by building on existing group metadata.

Parameters:

Name Type Description Default
grouped_data Relation

Data with existing group metadata

required
new_group_key str

Field to group by

required

Returns:

Type Description
Relation

List with nested group metadata

Source code in ja/group.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def groupby_chained(grouped_data: Relation, new_group_key: str) -> Relation:
    """Apply groupby to already-grouped data.

    This function handles multi-level grouping by building on existing
    group metadata.

    Args:
        grouped_data: Data with existing group metadata
        new_group_key: Field to group by

    Returns:
        List with nested group metadata
    """
    parser = ExprEval()

    # Group within existing groups
    nested_groups = defaultdict(list)

    import sys
    print("hi", file=sys.stderr)

    for row in grouped_data:
        # Get existing groups
        existing_groups = row.get("_groups", [])
        try:
            # print(row, file=sys.stderr)
            key_value = parser.get_field_value(row, new_group_key)
            nested_groups[key_value].append(row)
        except Exception as e:
            key_value = json.dumps(key_value, ensure_ascii=False, sort_keys=True)
            nested_groups[key_value].append(row)

        new_key_value = parser.get_field_value(row, new_group_key)

        print("Processing row:", row, "New group key:", new_group_key, "Value:", new_key_value, file=sys.stderr)

        # Create a tuple key for grouping (for internal use only)

        group_tuple = tuple((g["field"], g["value"]) for g in existing_groups)
        group_tuple += ((new_group_key, new_key_value),)

        print("Hmm...", file=sys.stderr)
        try:

            nested_groups[group_tuple].append(row)
        except Exception as e:
            # make group_tuple hashable
            # here is how to do it: 
            group_tuple = tuple(map(str, group_tuple))
            nested_groups[group_tuple].append(row)

    import sys
    print("hi", file=sys.stderr)
    # Add new metadata
    result = []
    for group_tuple, group_rows in nested_groups.items():
        group_size = len(group_rows)

        for index, row in enumerate(group_rows):
            new_row = row.copy()

            value = parser.get_field_value(row, new_group_key)

            # Extend the groups list
            new_row["_groups"] = row.get("_groups", []).copy()
            new_row["_groups"].append({
                "field": new_group_key,
                "value": value
            })

            new_row["_group_size"] = group_size
            new_row["_group_index"] = index
            result.append(new_row)

    return result

groupby_with_metadata(data, group_key)

Group data and add metadata fields.

This function enables chained groupby operations by adding special metadata fields to each row: - _groups: List of {field, value} objects representing the grouping hierarchy - _group_size: Total number of rows in this group - _group_index: This row's index within its group

Parameters:

Name Type Description Default
data Relation

List of dictionaries to group

required
group_key str

Field to group by (supports dot notation)

required

Returns:

Type Description
Relation

List with group metadata added to each row

Source code in ja/group.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def groupby_with_metadata(data: Relation, group_key: str) -> Relation:
    """Group data and add metadata fields.

    This function enables chained groupby operations by adding special
    metadata fields to each row:
    - _groups: List of {field, value} objects representing the grouping hierarchy
    - _group_size: Total number of rows in this group
    - _group_index: This row's index within its group

    Args:
        data: List of dictionaries to group
        group_key: Field to group by (supports dot notation)

    Returns:
        List with group metadata added to each row
    """
    parser = ExprEval()

    # First pass: collect groups
    groups = defaultdict(list)
    for row in data:
        try:
            # print(row, file=sys.stderr)
            key_value = parser.get_field_value(row, group_key)
            groups[key_value].append(row)
        except Exception as e:
            key_value = json.dumps(key_value, ensure_ascii=False, sort_keys=True)
            groups[key_value].append(row)

    import sys
    print("hi", file=sys.stderr)
    # print("Huh?")
    # Second pass: add metadata and flatten
    result = []
    last_index = len(groups) - 1

    for i, (group_value, group_rows) in enumerate(groups.items()):
        print("Processing group:", group_value, "Size:", len(group_rows), "Index:", i,   "Last Index:", last_index, file=sys.stderr)
        group_size = len(group_rows)
        for index, row in enumerate(group_rows):
            # Create new row with metadata
            new_row = row.copy()
            # check if group_value is a serialized json value
            if isinstance(group_value, str):
                try:
                    group_value = json.loads(group_value)
                    # print(group_value)
                    # print("Deserialized group value:", group_value)
                except json.JSONDecodeError:
                    pass
                    # print({"Huh?"})
            # print("Processing group:", group_value, "Size:", group_size)
            new_row["_groups"] = [{"field": group_key, "value": group_value}]
            new_row["_group_size"] = group_size
            new_row["_group_index"] = index
            result.append(new_row)

    print("Done processing groups", file=sys.stderr)

    return result

ja.agg

Aggregation engine for JSONL algebra operations.

This module provides all aggregation functionality including parsing aggregation specifications, applying aggregations to data, and all built-in aggregation functions (sum, avg, min, max, etc.).

aggregate_grouped_data(grouped_data, agg_spec)

Aggregate data that has group metadata.

Parameters:

Name Type Description Default
grouped_data Relation

Data with group metadata

required
agg_spec str

Aggregation specification

required

Returns:

Type Description
Relation

List of aggregated results

Source code in ja/agg.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def aggregate_grouped_data(grouped_data: Relation, agg_spec: str) -> Relation:
    """Aggregate data that has group metadata.

    Args:
        grouped_data: Data with group metadata
        agg_spec: Aggregation specification

    Returns:
        List of aggregated results
    """
    from collections import defaultdict

    # Group by the combination of all grouping fields
    groups = defaultdict(list)
    group_keys = {}

    for row in grouped_data:
        # Use the _groups list to create a grouping key
        groups_list = row.get("_groups", [])

        # Create a tuple key for internal grouping
        group_tuple = tuple((g["field"], g["value"]) for g in groups_list)

        # Store the groups for this tuple
        if group_tuple not in group_keys:
            group_keys[group_tuple] = groups_list

        # Remove metadata for aggregation
        clean_row = {k: v for k, v in row.items() if not k.startswith("_group")}
        groups[group_tuple].append(clean_row)

    # Apply aggregations
    result = []

    for group_tuple, group_rows in groups.items():
        # Start with all grouping fields
        agg_result = {}

        # Add all grouping fields from the metadata
        for group_info in group_keys[group_tuple]:
            agg_result[group_info["field"]] = group_info["value"]

        # Parse and apply aggregations
        agg_specs = parse_agg_specs(agg_spec)
        for spec in agg_specs:
            agg_result.update(apply_single_agg(spec, group_rows))

        result.append(agg_result)

    return result

aggregate_single_group(data, agg_spec)

Aggregate ungrouped data as a single group.

Parameters:

Name Type Description Default
data Relation

List of dictionaries

required
agg_spec str

Aggregation specification

required

Returns:

Type Description
Dict[str, Any]

Dictionary with aggregation results

Source code in ja/agg.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def aggregate_single_group(data: Relation, agg_spec: str) -> Dict[str, Any]:
    """Aggregate ungrouped data as a single group.

    Args:
        data: List of dictionaries
        agg_spec: Aggregation specification

    Returns:
        Dictionary with aggregation results
    """
    agg_specs = parse_agg_specs(agg_spec)
    result = {}

    for spec in agg_specs:
        result.update(apply_single_agg(spec, data))

    return result

apply_single_agg(spec, data)

Apply a single aggregation to data.

Parameters:

Name Type Description Default
spec Tuple[str, str]

(name, expression) tuple

required
data Relation

List of dictionaries

required

Returns:

Type Description
Dict[str, Any]

Dictionary with aggregation result

Source code in ja/agg.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def apply_single_agg(spec: Tuple[str, str], data: Relation) -> Dict[str, Any]:
    """Apply a single aggregation to data.

    Args:
        spec: (name, expression) tuple
        data: List of dictionaries

    Returns:
        Dictionary with aggregation result
    """
    name, expr = spec
    parser = ExprEval()

    # Parse the aggregation expression
    if "(" in expr and expr.endswith(")"):
        func_name = expr[:expr.index("(")]
        field_expr = expr[expr.index("(") + 1:-1].strip()
    else:
        func_name = expr
        field_expr = ""

    # Handle conditional aggregations
    if "_if" in func_name:
        # e.g., count_if(status == active) or sum_if(amount, status == paid)
        base_func = func_name.replace("_if", "")

        if "," in field_expr:
            # sum_if(amount, status == paid)
            field, condition = field_expr.split(",", 1)
            field = field.strip()
            condition = condition.strip()

            # Filter data based on condition
            filtered_data = [row for row in data if parser.evaluate(condition, row)]

            # Apply base aggregation to filtered data
            if base_func == "sum":
                values = [parser.get_field_value(row, field) for row in filtered_data]
                return {name: sum(v for v in values if v is not None)}
            elif base_func == "avg":
                values = [parser.get_field_value(row, field) for row in filtered_data]
                values = [v for v in values if v is not None]
                return {name: sum(values) / len(values) if values else 0}
            elif base_func == "count":
                return {name: len(filtered_data)}
        else:
            # count_if(status == active)
            filtered_data = [row for row in data if parser.evaluate(field_expr, row)]
            return {name: len(filtered_data)}

    # Regular aggregations
    if func_name == "count":
        return {name: len(data)}

    elif func_name in AGGREGATION_FUNCTIONS:
        if func_name in ["first", "last"]:
            # Special handling for first/last
            if not data:
                return {name: None}
            row = data[0] if func_name == "first" else data[-1]
            value = parser.get_field_value(row, field_expr) if field_expr else row
            return {name: value}
        else:
            # Collect values for aggregation
            values = []
            for row in data:
                if field_expr:
                    # Try arithmetic evaluation first
                    val = parser.evaluate_arithmetic(field_expr, row)
                    if val is None:
                        val = parser.get_field_value(row, field_expr)
                else:
                    val = row
                if val is not None:
                    values.append(val)

            # Apply aggregation functionsum(_agg_numeric_values(values))
            result = AGGREGATION_FUNCTIONS[func_name](values)
            return {name: result}

    return {name: None}

parse_agg_specs(agg_spec)

Parse aggregation specification string.

Parameters:

Name Type Description Default
agg_spec str

Aggregation specification (e.g., "count, avg_age=avg(age)")

required

Returns:

Type Description
List[Tuple[str, str]]

List of (name, expression) tuples

Source code in ja/agg.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def parse_agg_specs(agg_spec: str) -> List[Tuple[str, str]]:
    """Parse aggregation specification string.

    Args:
        agg_spec: Aggregation specification (e.g., "count, avg_age=avg(age)")

    Returns:
        List of (name, expression) tuples
    """
    specs = []
    for part in agg_spec.split(","):
        part = part.strip()
        if "=" in part:
            # Named aggregation: avg_age=avg(age)
            name, expr = part.split("=", 1)
            specs.append((name.strip(), expr.strip()))
        else:
            # Simple aggregation: count
            specs.append((part, part))
    return specs

ja.export

Utilities for exporting JSONL data to other formats.

This module provides a collection of functions for converting JSONL data into various other formats. It powers the ja export command group, enabling transformations like converting JSONL to a standard JSON array or "exploding" a JSONL file into a directory of individual JSON files.

dir_to_jsonl(input_dir_path_str, add_filename_key=None, recursive=False)

Converts JSON files in a directory to JSONL lines. Files are sorted by 'item-.json' pattern if applicable, otherwise lexicographically. Optionally adds filename as a key to each JSON object.

Source code in ja/export.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def dir_to_jsonl(
    input_dir_path_str: str, add_filename_key: str = None, recursive: bool = False
):
    """
    Converts JSON files in a directory to JSONL lines.
    Files are sorted by 'item-<index>.json' pattern if applicable, otherwise lexicographically.
    Optionally adds filename as a key to each JSON object.
    """
    input_dir = pathlib.Path(input_dir_path_str)
    if not input_dir.is_dir():
        raise ValueError(f"Input path is not a directory: {input_dir_path_str}")

    json_files_paths = []
    if recursive:
        for root, _, files in os.walk(input_dir):
            for file in files:
                if file.lower().endswith(".json"):
                    json_files_paths.append(pathlib.Path(root) / file)
    else:
        for item in input_dir.iterdir():
            if item.is_file() and item.name.lower().endswith(".json"):
                json_files_paths.append(item)

    sorted_file_paths = _sort_files_for_implode(json_files_paths)

    for file_path in sorted_file_paths:
        try:
            with open(file_path, "r") as f:
                data = json.load(f)

            if add_filename_key:
                # Use relative path from the input_dir to keep it cleaner
                relative_filename = str(file_path.relative_to(input_dir))
                actual_key = _ensure_unique_key(data, add_filename_key)
                data[actual_key] = relative_filename

            yield json.dumps(data)
        except json.JSONDecodeError as e:
            print(
                f"Skipping invalid JSON file: {file_path} - Error: {e}", file=sys.stderr
            )
            continue
        except Exception as e:
            print(f"Error processing file {file_path}: {e}", file=sys.stderr)
            continue

json_array_to_jsonl_lines(json_array_input_stream)

Read a JSON array from a stream and yield each element as a JSONL line.

Parameters:

Name Type Description Default
json_array_input_stream

Input stream containing a JSON array.

required

Yields:

Type Description

JSON strings representing each array element.

Raises:

Type Description
ValueError

If the input is not a valid JSON array.

Source code in ja/export.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def json_array_to_jsonl_lines(json_array_input_stream):
    """Read a JSON array from a stream and yield each element as a JSONL line.

    Args:
        json_array_input_stream: Input stream containing a JSON array.

    Yields:
        JSON strings representing each array element.

    Raises:
        ValueError: If the input is not a valid JSON array.
    """
    try:
        json_string = "".join(json_array_input_stream)
        data = json.loads(json_string)
        if not isinstance(data, list):
            raise ValueError("Input is not a JSON array.")
        for record in data:
            yield json.dumps(record)
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON array input: {e}")
    except ValueError as e:
        raise e

jsonl_to_dir(jsonl_input_stream, output_dir_path_str, input_filename_stem='data')

Exports JSONL lines to individual JSON files in a directory. The output directory is named after input_filename_stem if output_dir_path_str is not specific. Files are named item-.json.

Source code in ja/export.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def jsonl_to_dir(
    jsonl_input_stream, output_dir_path_str: str, input_filename_stem: str = "data"
):
    """
    Exports JSONL lines to individual JSON files in a directory.
    The output directory is named after input_filename_stem if output_dir_path_str is not specific.
    Files are named item-<index>.json.
    """
    output_dir = pathlib.Path(output_dir_path_str)

    # If output_dir_path_str was just a name (not a path), it might be used as the stem.
    # If it's a directory, we use the provided input_filename_stem for the sub-directory.
    if (
        output_dir.is_dir() and not output_dir.exists()
    ):  # A path like "output/my_data" where "output" exists
        # This case is tricky. Let's assume output_dir_path_str is the target directory.
        pass
    elif not output_dir.name.endswith((".jsonl", ".json")) and not output_dir.exists():
        # Treat as a new directory to be created directly
        pass
    else:  # Default behavior: create a subdirectory based on the input stem
        output_dir = output_dir / input_filename_stem

    output_dir.mkdir(parents=True, exist_ok=True)

    count = 0
    for i, line in enumerate(jsonl_input_stream):
        try:
            record = json.loads(line)
            file_path = output_dir / f"item-{i}.json"
            with open(file_path, "w") as f:
                json.dump(record, f, indent=2)
            count += 1
        except json.JSONDecodeError as e:
            print(
                f"Skipping invalid JSON line during export: {line.strip()} - Error: {e}",
                file=sys.stderr,
            )
            continue
    print(f"Exported {count} items to {output_dir.resolve()}", file=sys.stderr)

jsonl_to_json_array_string(jsonl_input_stream)

Read JSONL from a stream and return a JSON array string.

Parameters:

Name Type Description Default
jsonl_input_stream

Input stream containing JSONL data.

required

Returns:

Type Description
str

A JSON array string containing all records.

Source code in ja/export.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def jsonl_to_json_array_string(jsonl_input_stream) -> str:
    """Read JSONL from a stream and return a JSON array string.

    Args:
        jsonl_input_stream: Input stream containing JSONL data.

    Returns:
        A JSON array string containing all records.
    """
    records = []
    for line in jsonl_input_stream:
        try:
            records.append(json.loads(line))
        except json.JSONDecodeError as e:
            print(
                f"Skipping invalid JSON line: {line.strip()} - Error: {e}",
                file=sys.stderr,
            )
            continue
    return json.dumps(records, indent=2)

ja.exporter

Export your JSONL data to other popular formats like CSV.

This module is your gateway to the wider data ecosystem. It provides powerful and flexible tools to convert your JSONL data into formats that are easy to use with spreadsheets, traditional databases, or other data analysis tools.

The key feature is its intelligent handling of nested JSON, which can be "flattened" into separate columns, making complex data accessible in a simple CSV format.

jsonl_to_csv_stream(jsonl_stream, output_stream, flatten=True, flatten_sep='.', column_functions=None)

Convert a stream of JSONL data into a CSV stream.

This is a highly flexible function for exporting your data. It reads JSONL records, intelligently discovers all possible headers (even if they vary between lines), and writes to a CSV format.

It shines when dealing with nested data. By default, it will flatten structures like {"user": {"name": "X"}} into a user.name column. You can also provide custom functions to transform data on the fly.

Parameters:

Name Type Description Default
jsonl_stream

An input stream (like a file handle) yielding JSONL strings.

required
output_stream

An output stream (like sys.stdout or a file handle) where the CSV data will be written.

required
flatten bool

If True, nested dictionaries are flattened into columns with dot-separated keys. Defaults to True.

True
flatten_sep str

The separator to use when flattening keys. Defaults to ".".

'.'
column_functions dict

A dictionary mapping column names to functions that will be applied to that column's data before writing to CSV. For example, {"price": float}.

None
Source code in ja/exporter.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def jsonl_to_csv_stream(
    jsonl_stream,
    output_stream,
    flatten: bool = True,
    flatten_sep: str = ".",
    column_functions: dict = None,
):
    """Convert a stream of JSONL data into a CSV stream.

    This is a highly flexible function for exporting your data. It reads JSONL
    records, intelligently discovers all possible headers (even if they vary
    between lines), and writes to a CSV format.

    It shines when dealing with nested data. By default, it will flatten
    structures like `{"user": {"name": "X"}}` into a `user.name` column.
    You can also provide custom functions to transform data on the fly.

    Args:
        jsonl_stream: An input stream (like a file handle) yielding JSONL strings.
        output_stream: An output stream (like `sys.stdout` or a file handle)
                       where the CSV data will be written.
        flatten (bool): If `True`, nested dictionaries are flattened into columns
                        with dot-separated keys. Defaults to `True`.
        flatten_sep (str): The separator to use when flattening keys.
                           Defaults to ".".
        column_functions (dict): A dictionary mapping column names to functions
                                 that will be applied to that column's data
                                 before writing to CSV. For example,
                                 `{"price": float}`.
    """
    if column_functions is None:
        column_functions = {}

    # First pass: Discover all possible headers from the entire stream
    records = [json.loads(line) for line in jsonl_stream if line.strip()]
    if not records:
        return

    # Apply column functions before flattening
    for rec in records:
        for col, func in column_functions.items():
            if col in rec:
                try:
                    rec[col] = func(rec[col])
                except Exception as e:
                    # Optionally, log this error or handle it as needed
                    print(
                        f"Error applying function to column '{col}' for a record: {e}",
                        file=sys.stderr,
                    )

    if flatten:
        processed_records = [(_flatten_dict(rec, sep=flatten_sep)) for rec in records]
    else:
        processed_records = []
        for rec in records:
            processed_rec = {}
            for k, v in rec.items():
                if isinstance(v, (dict, list)):
                    processed_rec[k] = json.dumps(v)
                else:
                    processed_rec[k] = v
            processed_records.append(processed_rec)

    # Discover all unique keys to form the CSV header
    headers = []
    header_set = set()
    for rec in processed_records:
        for key in rec.keys():
            if key not in header_set:
                header_set.add(key)
                headers.append(key)

    # Second pass: Write to the output stream
    writer = csv.DictWriter(output_stream, fieldnames=headers, lineterminator="\n")
    writer.writeheader()
    writer.writerows(processed_records)

ja.importer

Import data from other formats like CSV into the world of JSONL.

This module is the bridge that brings your existing data into the JSONL Algebra ecosystem. It provides a collection of powerful functions for converting various data formats—such as CSV or directories of individual JSON files—into the clean, line-oriented JSONL format that ja is built to handle.

csv_to_jsonl_lines(csv_input_stream, has_header, infer_types=False)

Convert a stream of CSV data into a stream of JSONL lines.

This function reads CSV data and transforms each row into a JSON object. It can automatically handle headers to use as keys and can even infer the data types of your values, converting them from strings to numbers or booleans where appropriate.

Parameters:

Name Type Description Default
csv_input_stream

An input stream (like a file handle) containing CSV data.

required
has_header bool

Set to True if the first row of the CSV is a header that should be used for JSON keys.

required
infer_types bool

If True, automatically convert values to int, float, bool, or None. Defaults to False.

False

Yields:

Type Description

A JSON-formatted string for each row in the CSV data.

Source code in ja/importer.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def csv_to_jsonl_lines(csv_input_stream, has_header: bool, infer_types: bool = False):
    """Convert a stream of CSV data into a stream of JSONL lines.

    This function reads CSV data and transforms each row into a JSON object.
    It can automatically handle headers to use as keys and can even infer the
    data types of your values, converting them from strings to numbers or
    booleans where appropriate.

    Args:
        csv_input_stream: An input stream (like a file handle) containing CSV data.
        has_header (bool): Set to `True` if the first row of the CSV is a header
                           that should be used for JSON keys.
        infer_types (bool): If `True`, automatically convert values to `int`,
                            `float`, `bool`, or `None`. Defaults to `False`.

    Yields:
        A JSON-formatted string for each row in the CSV data.
    """

    def process_row(row):
        if not infer_types:
            return row
        return {k: _infer_value(v) for k, v in row.items()}

    if has_header:
        # Use DictReader which handles headers automatically
        reader = csv.DictReader(csv_input_stream)
        for row in reader:
            yield json.dumps(process_row(row))
    else:
        # Use the standard reader and manually create dictionaries
        reader = csv.reader(csv_input_stream)
        headers = []
        try:
            first_row = next(reader)
            # Generate headers based on the number of columns in the first row
            headers = [f"col_{i}" for i in range(len(first_row))]
            # Yield the first row which we've already consumed
            row_dict = dict(zip(headers, first_row))
            yield json.dumps(process_row(row_dict))
        except StopIteration:
            return  # Handle empty file

        # Yield the rest of the rows
        for row in reader:
            row_dict = dict(zip(headers, row))
            yield json.dumps(process_row(row_dict))

dir_to_jsonl_lines(dir_path)

Stream a directory of .json or .jsonl files as a single JSONL stream.

A handy utility for consolidating data. It reads all files ending in .json or .jsonl from a specified directory and yields each JSON object as a separate line. This is perfect for preparing a dataset that has been stored as many small files.

  • For .json files, the entire file is treated as a single JSON object.
  • For .jsonl files, each line is treated as a separate JSON object.

Parameters:

Name Type Description Default
dir_path str

The path to the directory to read.

required

Yields:

Type Description

A string for each JSON object found, ready for processing.

Source code in ja/importer.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def dir_to_jsonl_lines(dir_path):
    """Stream a directory of .json or .jsonl files as a single JSONL stream.

    A handy utility for consolidating data. It reads all files ending in `.json`
    or `.jsonl` from a specified directory and yields each JSON object as a
    separate line. This is perfect for preparing a dataset that has been
    stored as many small files.

    - For `.json` files, the entire file is treated as a single JSON object.
    - For `.jsonl` files, each line is treated as a separate JSON object.

    Args:
        dir_path (str): The path to the directory to read.

    Yields:
        A string for each JSON object found, ready for processing.
    """
    for filename in sorted(os.listdir(dir_path)):
        file_path = os.path.join(dir_path, filename)
        if filename.endswith(".json"):
            try:
                with open(file_path, "r") as f:
                    yield f.read().strip()
            except (IOError, json.JSONDecodeError) as e:
                print(f"Error reading or parsing {file_path}: {e}", file=sys.stderr)
        elif filename.endswith(".jsonl"):
            try:
                with open(file_path, "r") as f:
                    for line in f:
                        yield line.strip()
            except IOError as e:
                print(f"Error reading {file_path}: {e}", file=sys.stderr)

ja.schema

Discover the structure of your data automatically.

This module provides powerful tools to infer a JSON Schema from your JSONL files. A schema acts as a blueprint for your data, describing its fields, types, and which fields are required. This is incredibly useful for validation, documentation, and ensuring data quality.

add_required_fields(schema, data_samples)

Refine a schema by identifying which fields are always present.

This function analyzes a list of data samples and updates the schema to mark fields as 'required' if they appear in every single sample. This process is applied recursively to nested objects, making the resulting schema more precise.

Parameters:

Name Type Description Default
schema

The schema dictionary to modify in place.

required
data_samples

A list of data samples to analyze for required fields.

required
Example

If all samples in data_samples have 'name' and 'age' fields, this function adds {"required": ["age", "name"]} to the schema.

Source code in ja/schema.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def add_required_fields(schema, data_samples):
    """Refine a schema by identifying which fields are always present.

    This function analyzes a list of data samples and updates the schema to mark
    fields as 'required' if they appear in every single sample. This process is
    applied recursively to nested objects, making the resulting schema more precise.

    Args:
        schema: The schema dictionary to modify in place.
        data_samples: A list of data samples to analyze for required fields.

    Example:
        If all samples in `data_samples` have 'name' and 'age' fields, this
        function adds `{"required": ["age", "name"]}` to the schema.
    """
    if schema.get("type") == "object" and "properties" in schema:
        # For object schemas, find fields present in all samples
        dict_samples = [s for s in data_samples if isinstance(s, dict)]
        if dict_samples:
            required_keys = set(dict_samples[0].keys())
            for sample in dict_samples[1:]:
                required_keys.intersection_update(sample.keys())
            if required_keys:
                schema["required"] = sorted(list(required_keys))

        # Recursively add required fields to nested object properties
        for prop_name, prop_schema in schema["properties"].items():
            prop_samples = [s.get(prop_name) for s in dict_samples if prop_name in s]
            if prop_samples:
                add_required_fields(prop_schema, prop_samples)

    elif schema.get("type") == "array" and "items" in schema:
        # For array schemas, collect all array items and add required fields
        array_items = []
        for sample in data_samples:
            if isinstance(sample, list):
                array_items.extend(sample)
        if array_items:
            add_required_fields(schema["items"], array_items)

get_json_type(value)

Determine the appropriate JSON Schema type for a given Python value.

Maps Python types to their corresponding JSON Schema type names.

Parameters:

Name Type Description Default
value

Any Python value.

required

Returns:

Type Description

The JSON Schema type name as a string.

Example

get_json_type("hello") 'string' get_json_type(42) 'integer'

Source code in ja/schema.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def get_json_type(value):
    """Determine the appropriate JSON Schema type for a given Python value.

    Maps Python types to their corresponding JSON Schema type names.

    Args:
        value: Any Python value.

    Returns:
        The JSON Schema type name as a string.

    Example:
        >>> get_json_type("hello")
        'string'
        >>> get_json_type(42)
        'integer'
    """
    if isinstance(value, str):
        return "string"
    if isinstance(value, bool):
        return "boolean"
    if isinstance(value, int):
        return "integer"
    if isinstance(value, float):
        return "number"
    if value is None:
        return "null"
    if isinstance(value, list):
        return "array"
    if isinstance(value, dict):
        return "object"
    return "unknown"

infer_schema(data)

Infer a complete JSON schema from a collection of data records.

This is the main entry point for schema inference. Give it an iterable of JSON objects (like a list of dictionaries), and it will return a complete JSON Schema that describes the entire dataset. It automatically handles varying fields, mixed types, nested structures, and identifies required fields.

Parameters:

Name Type Description Default
data

An iterable of data records (typically dictionaries).

required

Returns:

Type Description

A JSON schema dictionary with $schema, type, properties, and required fields.

Example

data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}] schema = infer_schema(data) schema["properties"]["name"] {'type': 'string'} schema["required"]['age', 'name']

Source code in ja/schema.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def infer_schema(data):
    """Infer a complete JSON schema from a collection of data records.

    This is the main entry point for schema inference. Give it an iterable of
    JSON objects (like a list of dictionaries), and it will return a complete
    JSON Schema that describes the entire dataset. It automatically handles
    varying fields, mixed types, nested structures, and identifies required fields.

    Args:
        data: An iterable of data records (typically dictionaries).

    Returns:
        A JSON schema dictionary with `$schema`, type, properties, and required fields.

    Example:
        >>> data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
        >>> schema = infer_schema(data)
        >>> schema["properties"]["name"]
        {'type': 'string'}
        >>> schema["required"]
        ['age', 'name']
    """
    records = list(data)
    if not records:
        return {
            "$schema": "http://json-schema.org/draft-07/schema#",
            "type": "object",
            "properties": {},
        }

    # Infer schema for each record
    inferred_schemas = [infer_value_schema(rec) for rec in records]

    # Merge all inferred schemas
    merged_schema = None
    for s in inferred_schemas:
        merged_schema = merge_schemas(merged_schema, s)

    # Add required fields recursively
    if merged_schema:
        add_required_fields(merged_schema, records)

    # Add the meta-schema URL
    final_schema = {"$schema": "http://json-schema.org/draft-07/schema#"}
    if merged_schema:
        final_schema.update(merged_schema)

    return final_schema

infer_value_schema(value)

Infer a JSON Schema for a single Python value.

Creates a schema that describes the structure and type of the given value, handling nested objects and arrays recursively.

Parameters:

Name Type Description Default
value

Any JSON-serializable Python value.

required

Returns:

Type Description

A JSON schema dictionary describing the value.

Example

infer_value_schema({"name": "Alice", "age": 30}) {'type': 'object', 'properties': {'name': {'type': 'string'}, 'age': {'type': 'integer'}}}

Source code in ja/schema.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def infer_value_schema(value):
    """Infer a JSON Schema for a single Python value.

    Creates a schema that describes the structure and type of the given value,
    handling nested objects and arrays recursively.

    Args:
        value: Any JSON-serializable Python value.

    Returns:
        A JSON schema dictionary describing the value.

    Example:
        >>> infer_value_schema({"name": "Alice", "age": 30})
        {'type': 'object', 'properties': {'name': {'type': 'string'}, 'age': {'type': 'integer'}}}
    """
    type_name = get_json_type(value)
    schema = {"type": type_name}
    if type_name == "object":
        schema["properties"] = {k: infer_value_schema(v) for k, v in value.items()}
    elif type_name == "array":
        if value:
            item_schema = None
            for item in value:
                item_schema = merge_schemas(item_schema, infer_value_schema(item))
            if item_schema:
                schema["items"] = item_schema
    return schema

merge_schemas(s1, s2)

Intelligently merge two JSON schemas into one.

This is the secret sauce that allows schema inference to work across many different JSON objects, even if they have different fields or types. It handles type unions (e.g., a field that is sometimes a string, sometimes an integer) and recursively merges nested object properties and array item schemas.

Parameters:

Name Type Description Default
s1

First JSON schema dictionary or None.

required
s2

Second JSON schema dictionary or None.

required

Returns:

Type Description

A merged schema dictionary combining both inputs.

Example

s1 = {"type": "string"} s2 = {"type": "integer"} merge_schemas(s1, s2) {'type': ['integer', 'string']}

Source code in ja/schema.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def merge_schemas(s1, s2):
    """Intelligently merge two JSON schemas into one.

    This is the secret sauce that allows schema inference to work across many
    different JSON objects, even if they have different fields or types. It handles
    type unions (e.g., a field that is sometimes a string, sometimes an integer)
    and recursively merges nested object properties and array item schemas.

    Args:
        s1: First JSON schema dictionary or None.
        s2: Second JSON schema dictionary or None.

    Returns:
        A merged schema dictionary combining both inputs.

    Example:
        >>> s1 = {"type": "string"}
        >>> s2 = {"type": "integer"}
        >>> merge_schemas(s1, s2)
        {'type': ['integer', 'string']}
    """
    if s1 is None:
        return s2
    if s2 is None:
        return s1
    if s1 == s2:
        return s1

    # Merge types
    type1 = s1.get("type", [])
    if not isinstance(type1, list):
        type1 = [type1]
    type2 = s2.get("type", [])
    if not isinstance(type2, list):
        type2 = [type2]

    merged_types = sorted(list(set(type1) | set(type2)))
    if "integer" in merged_types and "number" in merged_types:
        merged_types.remove("integer")

    merged_schema = {}
    if len(merged_types) == 1:
        merged_schema["type"] = merged_types[0]
    else:
        merged_schema["type"] = merged_types

    # If both schemas could be objects, merge properties
    if "object" in type1 and "object" in type2:
        props1 = s1.get("properties", {})
        props2 = s2.get("properties", {})
        all_keys = set(props1.keys()) | set(props2.keys())
        merged_props = {
            key: merge_schemas(props1.get(key), props2.get(key)) for key in all_keys
        }
        if merged_props:
            merged_schema["properties"] = merged_props

    # If both schemas could be arrays, merge items
    if "array" in type1 and "array" in type2:
        items1 = s1.get("items")
        items2 = s2.get("items")
        merged_items = merge_schemas(items1, items2)
        if merged_items:
            merged_schema["items"] = merged_items

    return merged_schema

ja.expr

Expression parser for ja commands.

This module provides a lightweight expression parser that allows intuitive syntax without quotes for most common cases.

ExprEval

Parse and evaluate expressions for filtering, comparison, and arithmetic.

Source code in ja/expr.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class ExprEval:
    """Parse and evaluate expressions for filtering, comparison, and arithmetic."""

    def __init__(self):
        # Operators in precedence order (longest first to handle >= before >)
        self.operators = [
            ("==", operator.eq),
            ("!=", operator.ne),
            (">=", operator.ge),
            ("<=", operator.le),
            (">", operator.gt),
            ("<", operator.lt),
        ]

    def parse_value(self, value_str: str) -> Any:
        """Parse a value string into appropriate Python type.

        Examples:
            "123" -> 123
            "12.5" -> 12.5
            "true" -> True
            "false" -> False
            "null" -> None
            "active" -> "active" (string)
        """
        value_str = value_str.strip()

        # Empty string
        if not value_str:
            return ""

        # Boolean literals (case-insensitive)
        if value_str.lower() == "true":
            return True
        if value_str.lower() == "false":
            return False

        # Null literal
        if value_str.lower() in ("null", "none"):
            return None

        # Numbers
        try:
            if "." in value_str:
                return float(value_str)
            return int(value_str)
        except ValueError:
            pass

        # Quoted strings (remove quotes)
        if (value_str.startswith('"') and value_str.endswith('"')) or (
            value_str.startswith("'") and value_str.endswith("'")
        ):
            return value_str[1:-1]

        # Unquoted strings (the nice default!)
        return value_str

    def get_field_value(self, obj: Dict[str, Any], field_path: str) -> Any:
        """Get value from nested object using dot notation.

        Examples:
            get_field_value({"user": {"name": "Alice"}}, "user.name") -> "Alice"
            get_field_value({"items": [{"id": 1}]}, "items[0].id") -> 1
        """
        if not field_path:
            return obj

        current = obj

        # Handle array indexing and dots
        parts = re.split(r"\.|\[|\]", field_path)
        parts = [p for p in parts if p]  # Remove empty strings

        for part in parts:
            if current is None:
                return None

            # Try as dict key
            if isinstance(current, dict):
                current = current.get(part)
            # Try as array index
            elif isinstance(current, list):
                try:
                    idx = int(part)
                    current = current[idx] if 0 <= idx < len(current) else None
                except (ValueError, IndexError):
                    return None
            else:
                return None

        return current

    def set_field_value(self, obj: Dict[str, Any], field_path: str, value: Any) -> None:
        """Set value in nested object using dot notation."""
        if not field_path:
            return

        parts = field_path.split(".")
        current = obj

        # Navigate to the parent of the target field
        for part in parts[:-1]:
            if part not in current:
                current[part] = {}
            current = current[part]

        # Set the value
        current[parts[-1]] = value

    def evaluate_comparison(self, left: Any, op_str: str, right: Any) -> bool:
        """Evaluate a comparison operation."""
        op_func = None
        for op, func in self.operators:
            if op == op_str:
                op_func = func
                break

        if op_func is None:
            raise ValueError(f"Unknown operator: {op_str}")

        # Special handling for null comparisons
        if left is None or right is None:
            if op_str == "==":
                return left == right
            elif op_str == "!=":
                return left != right
            else:
                return False

        # Type coercion for comparison
        try:
            return op_func(left, right)
        except (TypeError, ValueError):
            # If comparison fails, try string comparison
            try:
                return op_func(str(left), str(right))
            except:
                return False

    def evaluate(self, expr: str, context: Dict[str, Any]) -> bool:
        """Parse and evaluate an expression.

        Examples:
            "status == active"
            "age > 30"
            "user.type == premium"
        """
        expr = expr.strip()

        # Empty expression is false
        if not expr:
            return False

        # Check for operators
        for op_str, op_func in self.operators:
            if op_str in expr:
                # Split on the FIRST occurrence of the operator
                parts = expr.split(op_str, 1)
                if len(parts) == 2:
                    left_expr = parts[0].strip()
                    right_expr = parts[1].strip()

                    # Left side is always a field path
                    left_val = self.get_field_value(context, left_expr)

                    # Right side: check if it's a field or a literal
                    # A token is a field if it exists as a key in the context
                    # and is not a boolean/null keyword.
                    if right_expr in context and right_expr.lower() not in ['true', 'false', 'null', 'none']:
                        # It's a field reference
                        right_val = self.get_field_value(context, right_expr)
                    else:
                        # It's a literal value
                        right_val = self.parse_value(right_expr)

                    return self.evaluate_comparison(left_val, op_str, right_val)

        # No operator found - treat as existence/truthiness check
        value = self.get_field_value(context, expr)
        return bool(value)

    def evaluate_arithmetic(
        self, expr: str, context: Dict[str, Any]
    ) -> Optional[float]:
        """Evaluate simple arithmetic expressions.

        Examples:
            "amount * 1.1"
            "score + bonus"
        """
        # Simple arithmetic support
        for op, func in [
            ("*", operator.mul),
            ("+", operator.add),
            ("-", operator.sub),
            ("/", operator.truediv),
        ]:
            if op in expr:
                parts = expr.split(op, 1)
                if len(parts) == 2:
                    left_str = parts[0].strip()
                    right_str = parts[1].strip()

                    # Get left value (field or literal)
                    left_val = self.get_field_value(context, left_str)
                    if left_val is None:
                        left_val = self.parse_value(left_str)

                    # Get right value (field or literal)
                    right_val = self.get_field_value(context, right_str)
                    if right_val is None:
                        right_val = self.parse_value(right_str)

                    try:
                        return func(float(left_val), float(right_val))
                    except (TypeError, ValueError):
                        return None

        # No operator - try as field or literal
        val = self.get_field_value(context, expr)
        if val is None:
            val = self.parse_value(expr)

        try:
            return float(val)
        except (TypeError, ValueError):
            return None

evaluate(expr, context)

Parse and evaluate an expression.

Examples:

"status == active" "age > 30" "user.type == premium"

Source code in ja/expr.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def evaluate(self, expr: str, context: Dict[str, Any]) -> bool:
    """Parse and evaluate an expression.

    Examples:
        "status == active"
        "age > 30"
        "user.type == premium"
    """
    expr = expr.strip()

    # Empty expression is false
    if not expr:
        return False

    # Check for operators
    for op_str, op_func in self.operators:
        if op_str in expr:
            # Split on the FIRST occurrence of the operator
            parts = expr.split(op_str, 1)
            if len(parts) == 2:
                left_expr = parts[0].strip()
                right_expr = parts[1].strip()

                # Left side is always a field path
                left_val = self.get_field_value(context, left_expr)

                # Right side: check if it's a field or a literal
                # A token is a field if it exists as a key in the context
                # and is not a boolean/null keyword.
                if right_expr in context and right_expr.lower() not in ['true', 'false', 'null', 'none']:
                    # It's a field reference
                    right_val = self.get_field_value(context, right_expr)
                else:
                    # It's a literal value
                    right_val = self.parse_value(right_expr)

                return self.evaluate_comparison(left_val, op_str, right_val)

    # No operator found - treat as existence/truthiness check
    value = self.get_field_value(context, expr)
    return bool(value)

evaluate_arithmetic(expr, context)

Evaluate simple arithmetic expressions.

Examples:

"amount * 1.1" "score + bonus"

Source code in ja/expr.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def evaluate_arithmetic(
    self, expr: str, context: Dict[str, Any]
) -> Optional[float]:
    """Evaluate simple arithmetic expressions.

    Examples:
        "amount * 1.1"
        "score + bonus"
    """
    # Simple arithmetic support
    for op, func in [
        ("*", operator.mul),
        ("+", operator.add),
        ("-", operator.sub),
        ("/", operator.truediv),
    ]:
        if op in expr:
            parts = expr.split(op, 1)
            if len(parts) == 2:
                left_str = parts[0].strip()
                right_str = parts[1].strip()

                # Get left value (field or literal)
                left_val = self.get_field_value(context, left_str)
                if left_val is None:
                    left_val = self.parse_value(left_str)

                # Get right value (field or literal)
                right_val = self.get_field_value(context, right_str)
                if right_val is None:
                    right_val = self.parse_value(right_str)

                try:
                    return func(float(left_val), float(right_val))
                except (TypeError, ValueError):
                    return None

    # No operator - try as field or literal
    val = self.get_field_value(context, expr)
    if val is None:
        val = self.parse_value(expr)

    try:
        return float(val)
    except (TypeError, ValueError):
        return None

evaluate_comparison(left, op_str, right)

Evaluate a comparison operation.

Source code in ja/expr.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def evaluate_comparison(self, left: Any, op_str: str, right: Any) -> bool:
    """Evaluate a comparison operation."""
    op_func = None
    for op, func in self.operators:
        if op == op_str:
            op_func = func
            break

    if op_func is None:
        raise ValueError(f"Unknown operator: {op_str}")

    # Special handling for null comparisons
    if left is None or right is None:
        if op_str == "==":
            return left == right
        elif op_str == "!=":
            return left != right
        else:
            return False

    # Type coercion for comparison
    try:
        return op_func(left, right)
    except (TypeError, ValueError):
        # If comparison fails, try string comparison
        try:
            return op_func(str(left), str(right))
        except:
            return False

get_field_value(obj, field_path)

Get value from nested object using dot notation.

Examples:

get_field_value({"user": {"name": "Alice"}}, "user.name") -> "Alice" get_field_value({"items": [{"id": 1}]}, "items[0].id") -> 1

Source code in ja/expr.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def get_field_value(self, obj: Dict[str, Any], field_path: str) -> Any:
    """Get value from nested object using dot notation.

    Examples:
        get_field_value({"user": {"name": "Alice"}}, "user.name") -> "Alice"
        get_field_value({"items": [{"id": 1}]}, "items[0].id") -> 1
    """
    if not field_path:
        return obj

    current = obj

    # Handle array indexing and dots
    parts = re.split(r"\.|\[|\]", field_path)
    parts = [p for p in parts if p]  # Remove empty strings

    for part in parts:
        if current is None:
            return None

        # Try as dict key
        if isinstance(current, dict):
            current = current.get(part)
        # Try as array index
        elif isinstance(current, list):
            try:
                idx = int(part)
                current = current[idx] if 0 <= idx < len(current) else None
            except (ValueError, IndexError):
                return None
        else:
            return None

    return current

parse_value(value_str)

Parse a value string into appropriate Python type.

Examples:

"123" -> 123 "12.5" -> 12.5 "true" -> True "false" -> False "null" -> None "active" -> "active" (string)

Source code in ja/expr.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def parse_value(self, value_str: str) -> Any:
    """Parse a value string into appropriate Python type.

    Examples:
        "123" -> 123
        "12.5" -> 12.5
        "true" -> True
        "false" -> False
        "null" -> None
        "active" -> "active" (string)
    """
    value_str = value_str.strip()

    # Empty string
    if not value_str:
        return ""

    # Boolean literals (case-insensitive)
    if value_str.lower() == "true":
        return True
    if value_str.lower() == "false":
        return False

    # Null literal
    if value_str.lower() in ("null", "none"):
        return None

    # Numbers
    try:
        if "." in value_str:
            return float(value_str)
        return int(value_str)
    except ValueError:
        pass

    # Quoted strings (remove quotes)
    if (value_str.startswith('"') and value_str.endswith('"')) or (
        value_str.startswith("'") and value_str.endswith("'")
    ):
        return value_str[1:-1]

    # Unquoted strings (the nice default!)
    return value_str

set_field_value(obj, field_path, value)

Set value in nested object using dot notation.

Source code in ja/expr.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def set_field_value(self, obj: Dict[str, Any], field_path: str, value: Any) -> None:
    """Set value in nested object using dot notation."""
    if not field_path:
        return

    parts = field_path.split(".")
    current = obj

    # Navigate to the parent of the target field
    for part in parts[:-1]:
        if part not in current:
            current[part] = {}
        current = current[part]

    # Set the value
    current[parts[-1]] = value