Skip to content

API Reference

JSONL Algebra - Relational algebra operations for JSONL data.

A Python package for performing relational algebra operations on lists of JSON objects (JSONL data). Provides both a CLI and a library interface for data manipulation, schema inference, and format conversion.

This package allows you to: - Perform relational operations like select, project, join, union, etc. - Infer and validate JSON schemas from data - Convert between various data formats (CSV, JSON arrays, directories) - Work with data interactively via REPL or programmatically via library functions

Example

from ja import select, project data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}] young_people = select(data, "age < 30") names_only = project(young_people, ["name"])

difference(a, b)

Return rows present in the first relation but not in the second.

Performs set difference operation, removing from the first relation any rows that also appear in the second relation.

Parameters:

Name Type Description Default
a Relation

The first relation (list of dictionaries).

required
b Relation

The second relation (list of dictionaries), whose rows will be excluded from 'a'.

required

Returns:

Type Description
Relation

A new relation containing rows from 'a' that are not in 'b'.

Example

a = [{"name": "Alice"}, {"name": "Bob"}] b = [{"name": "Alice"}] difference(a, b) [{"name": "Bob"}]

Source code in ja/core.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def difference(a: Relation, b: Relation) -> Relation:
    """Return rows present in the first relation but not in the second.

    Performs set difference operation, removing from the first relation
    any rows that also appear in the second relation.

    Args:
        a: The first relation (list of dictionaries).
        b: The second relation (list of dictionaries), whose rows will be excluded from 'a'.

    Returns:
        A new relation containing rows from 'a' that are not in 'b'.

    Example:
        >>> a = [{"name": "Alice"}, {"name": "Bob"}]
        >>> b = [{"name": "Alice"}]
        >>> difference(a, b)
        [{"name": "Bob"}]
    """
    b_set = {_row_to_hashable_key(r) for r in b}
    return [r for r in a if _row_to_hashable_key(r) not in b_set]

distinct(relation)

Remove duplicate rows from a relation.

Creates a new relation with duplicate rows removed, preserving the first occurrence of each unique row.

Parameters:

Name Type Description Default
relation Relation

The input relation (list of dictionaries).

required

Returns:

Type Description
Relation

A new relation with duplicate rows removed.

Example

data = [{"name": "Alice"}, {"name": "Alice"}, {"name": "Bob"}] distinct(data) [{"name": "Alice"}, {"name": "Bob"}]

Source code in ja/core.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def distinct(relation: Relation) -> Relation:
    """Remove duplicate rows from a relation.

    Creates a new relation with duplicate rows removed, preserving the
    first occurrence of each unique row.

    Args:
        relation: The input relation (list of dictionaries).

    Returns:
        A new relation with duplicate rows removed.

    Example:
        >>> data = [{"name": "Alice"}, {"name": "Alice"}, {"name": "Bob"}]
        >>> distinct(data)
        [{"name": "Alice"}, {"name": "Bob"}]
    """
    seen = set()
    out = []
    for row in relation:
        key = _row_to_hashable_key(row)
        if key not in seen:
            seen.add(key)
            out.append(row)
    return out

groupby_agg(relation, group_by_key, aggregations)

Group rows by a key and perform specified aggregations on other columns.

This function works in two main passes: 1. Data Collection Pass: Groups rows by the group_by_key and collects the necessary data for each specified aggregation. 2. Aggregation Processing Pass: Applies aggregation functions to compute the final aggregated values for each group.

Parameters:

Name Type Description Default
relation Relation

The input relation (list of dictionaries).

required
group_by_key str

The column name to group by.

required
aggregations List[Tuple[str, ...]]

A list of tuples specifying aggregations to perform. Each tuple format: (agg_func_name, agg_col_name, *extra_args) - agg_func_name: Name of aggregation function (e.g., "sum", "count") - agg_col_name: Column name to aggregate (ignored for "count") - extra_args: Additional arguments for the aggregation function

required

Returns:

Type Description
Relation

A new relation with one row per unique group key value, containing

Relation

the group key and all requested aggregations.

Example

data = [{"category": "A", "value": 10}, {"category": "A", "value": 20}, {"category": "B", "value": 30}] groupby_agg(data, "category", [("sum", "value"), ("count", "")]) [{"category": "A", "sum_value": 30, "count": 2}, {"category": "B", "sum_value": 30, "count": 1}]

Source code in ja/groupby.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def groupby_agg(
    relation: Relation, group_by_key: str, aggregations: List[Tuple[str, ...]]
) -> Relation:
    """Group rows by a key and perform specified aggregations on other columns.

    This function works in two main passes:
    1. Data Collection Pass: Groups rows by the `group_by_key` and collects
       the necessary data for each specified aggregation.
    2. Aggregation Processing Pass: Applies aggregation functions to compute
       the final aggregated values for each group.

    Args:
        relation: The input relation (list of dictionaries).
        group_by_key: The column name to group by.
        aggregations: A list of tuples specifying aggregations to perform.
                      Each tuple format: (agg_func_name, agg_col_name, *extra_args)
                      - agg_func_name: Name of aggregation function (e.g., "sum", "count")
                      - agg_col_name: Column name to aggregate (ignored for "count")
                      - extra_args: Additional arguments for the aggregation function

    Returns:
        A new relation with one row per unique group key value, containing
        the group key and all requested aggregations.

    Example:
        >>> data = [{"category": "A", "value": 10}, {"category": "A", "value": 20}, {"category": "B", "value": 30}]
        >>> groupby_agg(data, "category", [("sum", "value"), ("count", "")])
        [{"category": "A", "sum_value": 30, "count": 2}, {"category": "B", "sum_value": 30, "count": 1}]
    """
    grouped_data: Dict[Any, Dict[str, Any]] = {}

    # Pass 1: Collect data for aggregation
    for row in relation:
        key_value = row.get(group_by_key)
        group = grouped_data.setdefault(key_value, {group_by_key: key_value})
        # _values stores the raw data needed for each aggregation within the group
        group_values = group.setdefault("_values", {})

        # Always maintain a count for the group
        group_values.setdefault("_count", 0)
        group_values["_count"] += 1

        for agg_spec in aggregations:
            agg_func = agg_spec[0]
            # Ensure agg_col is present, default to empty string if not (e.g. for count)
            agg_col = agg_spec[1] if len(agg_spec) > 1 else ""

            if agg_func == "count":
                continue  # Count is handled by _count increment above

            val = row.get(agg_col)
            # storage_key_for_agg is used to store the collected data for a specific (agg_func, agg_col) pair
            storage_key_for_agg = f"{agg_func}_{agg_col}"

            if agg_func in ["sum", "avg", "min", "max", "list"]:
                # These aggregations collect all values from agg_col into a list
                group_values.setdefault(storage_key_for_agg, []).append(val)
            elif agg_func == "first":
                # Store only the first encountered value for this agg_col in the group
                if storage_key_for_agg not in group_values:
                    group_values[storage_key_for_agg] = val
            elif agg_func == "last":
                # Always store/overwrite with the latest value for this agg_col in the group
                group_values[storage_key_for_agg] = val
            elif (
                agg_func not in AGGREGATION_DISPATCHER
            ):  # Check for unknown agg functions early
                raise ValueError(
                    f"Unsupported aggregation function during collection: {agg_func}"
                )
            # Else: If agg_func is in dispatcher but not explicitly handled above,
            # it implies it doesn't need special data collection beyond what other
            # similar functions might do, or it's an error in dispatcher setup.

    # Pass 2: Process collected data to produce final aggregations
    result_relation = []
    for key_value, group_data_content in grouped_data.items():
        processed_row: Row = {group_by_key: key_value}
        collected_group_values = group_data_content.get("_values", {})

        for agg_spec in aggregations:
            agg_func_name = agg_spec[0]
            agg_col_name = agg_spec[1] if len(agg_spec) > 1 else ""
            # extra_args = agg_spec[2:] # For future use, e.g., a general reduce

            output_col_name = (
                f"{agg_func_name}_{agg_col_name}" if agg_col_name else agg_func_name
            )

            if agg_func_name == "count":
                processed_row[output_col_name] = collected_group_values.get("_count", 0)
            elif agg_func_name in AGGREGATION_DISPATCHER:
                aggregator_func = AGGREGATION_DISPATCHER[agg_func_name]
                # Key used to retrieve the raw data collected in Pass 1
                raw_data_storage_key = f"{agg_func_name}_{agg_col_name}"

                if agg_func_name in ["first", "last"]:
                    # For 'first'/'last', the stored data is the single value itself
                    data_to_aggregate = collected_group_values.get(
                        raw_data_storage_key
                    )  # Defaults to None
                else:
                    # For list-based aggregations ('sum', 'avg', 'min', 'max', 'list')
                    data_to_aggregate = collected_group_values.get(
                        raw_data_storage_key, []
                    )

                # If aggregator_func needed extra_args (e.g. for a future 'reduce'),
                # they would be passed here:
                # processed_row[output_col_name] = aggregator_func(data_to_aggregate, *extra_args)
                processed_row[output_col_name] = aggregator_func(data_to_aggregate)
            else:
                # This case should ideally not be reached if the collection phase
                # and dispatcher are correctly set up.
                raise ValueError(
                    f"Unsupported aggregation function during processing: {agg_func_name}"
                )

        result_relation.append(processed_row)

    return result_relation

intersection(a, b)

Return rows common to both relations.

Creates a new relation containing only rows that are present in both input relations.

Parameters:

Name Type Description Default
a Relation

The first relation (list of dictionaries).

required
b Relation

The second relation (list of dictionaries).

required

Returns:

Type Description
Relation

A new relation containing only rows that are present in both 'a' and 'b'.

Example

a = [{"name": "Alice"}, {"name": "Bob"}] b = [{"name": "Alice"}, {"name": "Carol"}] intersection(a, b) [{"name": "Alice"}]

Source code in ja/core.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def intersection(a: Relation, b: Relation) -> Relation:
    """Return rows common to both relations.

    Creates a new relation containing only rows that are present in both
    input relations.

    Args:
        a: The first relation (list of dictionaries).
        b: The second relation (list of dictionaries).

    Returns:
        A new relation containing only rows that are present in both 'a' and 'b'.

    Example:
        >>> a = [{"name": "Alice"}, {"name": "Bob"}]
        >>> b = [{"name": "Alice"}, {"name": "Carol"}]
        >>> intersection(a, b)
        [{"name": "Alice"}]
    """
    b_set = {_row_to_hashable_key(r) for r in b}
    return [r for r in a if _row_to_hashable_key(r) in b_set]

join(left, right, on)

Combine rows from two relations based on specified join conditions.

Performs an inner join between two relations, matching rows where the specified columns have equal values.

Parameters:

Name Type Description Default
left Relation

The left relation (list of dictionaries).

required
right Relation

The right relation (list of dictionaries).

required
on List[Tuple[str, str]]

A list of tuples, where each tuple (left_col, right_col) specifies the columns to join on.

required

Returns:

Type Description
Relation

A new relation containing the merged rows that satisfy the join conditions.

Relation

The resulting rows contain all columns from the left row, plus columns

Relation

from the matching right row (excluding right columns used in join conditions).

Example

left = [{"id": 1, "name": "Alice"}] right = [{"user_id": 1, "score": 95}] join(left, right, [("id", "user_id")]) [{"id": 1, "name": "Alice", "score": 95}]

Source code in ja/core.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def join(left: Relation, right: Relation, on: List[Tuple[str, str]]) -> Relation:
    """Combine rows from two relations based on specified join conditions.

    Performs an inner join between two relations, matching rows where the
    specified columns have equal values.

    Args:
        left: The left relation (list of dictionaries).
        right: The right relation (list of dictionaries).
        on: A list of tuples, where each tuple (left_col, right_col)
            specifies the columns to join on.

    Returns:
        A new relation containing the merged rows that satisfy the join conditions.
        The resulting rows contain all columns from the left row, plus columns
        from the matching right row (excluding right columns used in join conditions).

    Example:
        >>> left = [{"id": 1, "name": "Alice"}]
        >>> right = [{"user_id": 1, "score": 95}]
        >>> join(left, right, [("id", "user_id")])
        [{"id": 1, "name": "Alice", "score": 95}]
    """
    right_index = {}
    for r_row_build_idx in right:
        key_tuple = tuple(r_row_build_idx[r_col] for _, r_col in on)
        right_index.setdefault(key_tuple, []).append(r_row_build_idx)

    result = []
    # Pre-calculate the set of right column names that are part of the join condition
    right_join_key_names = {r_col for _, r_col in on}

    for l_row in left:
        key_tuple = tuple(l_row[l_col] for l_col, _ in on)
        for r_row in right_index.get(key_tuple, []):
            merged_row = dict(l_row)  # Start with a copy of the left row

            # Add columns from the right row if they don't collide with left row's columns
            # and are not themselves right-side join keys.
            for r_key, r_val in r_row.items():
                if r_key not in merged_row and r_key not in right_join_key_names:
                    merged_row[r_key] = r_val
            result.append(merged_row)
    return result

product(a, b)

Compute the Cartesian product of two relations.

Creates all possible combinations of rows from the two input relations.

Parameters:

Name Type Description Default
a Relation

The first relation (list of dictionaries).

required
b Relation

The second relation (list of dictionaries).

required

Returns:

Type Description
Relation

A new relation containing all combinations of rows from 'a' and 'b'.

Example

a = [{"x": 1}] b = [{"y": 2}, {"y": 3}] product(a, b) [{"x": 1, "y": 2}, {"x": 1, "y": 3}]

Source code in ja/core.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def product(a: Relation, b: Relation) -> Relation:
    """Compute the Cartesian product of two relations.

    Creates all possible combinations of rows from the two input relations.

    Args:
        a: The first relation (list of dictionaries).
        b: The second relation (list of dictionaries).

    Returns:
        A new relation containing all combinations of rows from 'a' and 'b'.

    Example:
        >>> a = [{"x": 1}]
        >>> b = [{"y": 2}, {"y": 3}]
        >>> product(a, b)
        [{"x": 1, "y": 2}, {"x": 1, "y": 3}]
    """
    out = []
    for r1 in a:
        for r2 in b:
            merged = dict(r1)
            for k, v in r2.items():
                # avoid key collision by prefixing
                merged[f"b_{k}" if k in r1 else k] = v
            out.append(merged)
    return out

project(relation, columns)

Select specific columns from a relation.

Creates a new relation containing only the specified columns from each row. If a row doesn't contain a specified column, it's omitted for that row.

Parameters:

Name Type Description Default
relation Relation

The input relation (list of dictionaries).

required
columns List[str]

A list of column names to include in the result.

required

Returns:

Type Description
Relation

A new relation containing only the specified columns for each row.

Example

data = [{"name": "Alice", "age": 30, "city": "NYC"}] project(data, ["name", "age"]) [{"name": "Alice", "age": 30}]

Source code in ja/core.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def project(relation: Relation, columns: List[str]) -> Relation:
    """Select specific columns from a relation.

    Creates a new relation containing only the specified columns from each row.
    If a row doesn't contain a specified column, it's omitted for that row.

    Args:
        relation: The input relation (list of dictionaries).
        columns: A list of column names to include in the result.

    Returns:
        A new relation containing only the specified columns for each row.

    Example:
        >>> data = [{"name": "Alice", "age": 30, "city": "NYC"}]
        >>> project(data, ["name", "age"])
        [{"name": "Alice", "age": 30}]
    """
    return [{col: row[col] for col in columns if col in row} for row in relation]

read_jsonl(input_stream)

Read JSONL data from a file-like object.

Parses each line as JSON and returns a list of the parsed objects.

Parameters:

Name Type Description Default
input_stream

A file-like object containing JSONL data.

required

Returns:

Type Description

A list of parsed JSON objects.

Raises:

Type Description
JSONDecodeError

If any line contains invalid JSON.

Source code in ja/commands.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def read_jsonl(input_stream):
    """Read JSONL data from a file-like object.

    Parses each line as JSON and returns a list of the parsed objects.

    Args:
        input_stream: A file-like object containing JSONL data.

    Returns:
        A list of parsed JSON objects.

    Raises:
        json.JSONDecodeError: If any line contains invalid JSON.
    """
    return [json.loads(line) for line in input_stream]

rename(relation, renames)

Rename columns in a relation.

Creates a new relation with specified columns renamed according to the provided mapping.

Parameters:

Name Type Description Default
relation Relation

The input relation (list of dictionaries).

required
renames Dict[str, str]

A dictionary mapping old column names to new column names.

required

Returns:

Type Description
Relation

A new relation with specified columns renamed.

Example

data = [{"old_name": "Alice", "age": 30}] rename(data, {"old_name": "name"}) [{"name": "Alice", "age": 30}]

Source code in ja/core.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def rename(relation: Relation, renames: Dict[str, str]) -> Relation:
    """Rename columns in a relation.

    Creates a new relation with specified columns renamed according to
    the provided mapping.

    Args:
        relation: The input relation (list of dictionaries).
        renames: A dictionary mapping old column names to new column names.

    Returns:
        A new relation with specified columns renamed.

    Example:
        >>> data = [{"old_name": "Alice", "age": 30}]
        >>> rename(data, {"old_name": "name"})
        [{"name": "Alice", "age": 30}]
    """
    return [{renames.get(k, k): v for k, v in row.items()} for row in relation]

select(relation, expression)

Filter rows from a relation based on a JMESPath expression.

Uses JMESPath to filter the relation, keeping only rows that match the given expression criteria.

Parameters:

Name Type Description Default
relation Relation

The input relation (list of dictionaries).

required
expression

A JMESPath expression string or compiled JMESPath expression.

required

Returns:

Type Description
Relation

A new relation containing only the rows that match the expression.

Example

data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}] select(data, "age > 27") [{"name": "Alice", "age": 30}]

Source code in ja/core.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def select(relation: Relation, expression) -> Relation:
    """Filter rows from a relation based on a JMESPath expression.

    Uses JMESPath to filter the relation, keeping only rows that match
    the given expression criteria.

    Args:
        relation: The input relation (list of dictionaries).
        expression: A JMESPath expression string or compiled JMESPath expression.

    Returns:
        A new relation containing only the rows that match the expression.

    Example:
        >>> data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
        >>> select(data, "age > `27`")
        [{"name": "Alice", "age": 30}]
    """
    if isinstance(expression, str):
        expression = jmespath.compile(f"[?{expression}]")

    # JMESPath works on the entire relation (JSON document)
    return expression.search(relation)

sort_by(relation, keys, reverse=False)

Sort a relation by specified keys.

Sorts the relation by the specified column names in order. Missing values (None) are sorted before non-None values.

Parameters:

Name Type Description Default
relation Relation

The input relation (list of dictionaries).

required
keys List[str]

A list of column names to sort by. The sort is performed in the order of the columns specified.

required
reverse bool

If True, sort in descending order.

False

Returns:

Type Description
Relation

A new relation sorted by the specified keys.

Example

data = [{"name": "Bob", "age": 30}, {"name": "Alice", "age": 25}] sort_by(data, ["name"]) [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]

Source code in ja/core.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def sort_by(relation: Relation, keys: List[str], reverse: bool = False) -> Relation:
    """Sort a relation by specified keys.

    Sorts the relation by the specified column names in order. Missing values
    (None) are sorted before non-None values.

    Args:
        relation: The input relation (list of dictionaries).
        keys: A list of column names to sort by. The sort is performed in
              the order of the columns specified.
        reverse: If True, sort in descending order.

    Returns:
        A new relation sorted by the specified keys.

    Example:
        >>> data = [{"name": "Bob", "age": 30}, {"name": "Alice", "age": 25}]
        >>> sort_by(data, ["name"])
        [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
    """

    def sort_key_func(row: Row) -> tuple:
        key_parts = []
        for k in keys:
            value = row.get(k)
            if value is None:
                # Sort None values first by using a lower first element in the tuple part
                key_parts.append((0, None))
            else:
                key_parts.append((1, value))
        return tuple(key_parts)

    return sorted(relation, key=sort_key_func, reverse=reverse)

union(a, b)

Return all rows from two relations.

Concatenates two relations, preserving all rows including duplicates. For distinct union, pipe the result through distinct.

Parameters:

Name Type Description Default
a Relation

The first relation (list of dictionaries).

required
b Relation

The second relation (list of dictionaries).

required

Returns:

Type Description
Relation

A new relation containing all rows from both input relations.

Example

a = [{"name": "Alice"}] b = [{"name": "Bob"}] union(a, b) [{"name": "Alice"}, {"name": "Bob"}]

Source code in ja/core.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def union(a: Relation, b: Relation) -> Relation:
    """Return all rows from two relations.

    Concatenates two relations, preserving all rows including duplicates.
    For distinct union, pipe the result through `distinct`.

    Args:
        a: The first relation (list of dictionaries).
        b: The second relation (list of dictionaries).

    Returns:
        A new relation containing all rows from both input relations.

    Example:
        >>> a = [{"name": "Alice"}]
        >>> b = [{"name": "Bob"}]
        >>> union(a, b)
        [{"name": "Alice"}, {"name": "Bob"}]
    """
    return a + b