Skip to content

xarray-sql

xarray_sql

__all__ = ['cftime', 'XarrayContext', 'read_xarray_table', 'read_xarray', 'from_map'] module-attribute

XarrayContext

Bases: SessionContext

A datafusion SessionContext that also supports xarray.Datasets.

from_dataset(table_name, input_table, chunks=None)

Register an xarray Dataset as a queryable SQL table.

For datasets with non-Gregorian cftime coordinates (e.g. 360_day, julian), a cftime() scalar UDF is automatically registered so you can write ergonomic SQL filters::

ctx.from_dataset("ds360", ds, chunks={"time": 6})
ctx.sql("SELECT * FROM ds360 WHERE time >= cftime('2000-07-01')")

The UDF converts a date string to the int64 offset used to store that calendar's time axis.

.. note::

Only one ``cftime()`` UDF is registered per context, using the
units and calendar of the *first* non-Gregorian coordinate
encountered. If you register multiple datasets with *different*
non-Gregorian calendars (e.g. one 360_day and one julian), the
UDF from the first registration will be used for all subsequent
``cftime()`` calls and may produce incorrect offsets for the
other dataset. In that case, create a separate ``XarrayContext``
for each calendar.

Parameters:

Name Type Description Default
table_name str

The SQL table name to register the dataset under.

required
input_table Dataset

An xarray Dataset. All data_vars must share the same dimensions.

required
chunks Chunks

Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks.

None

Returns:

Type Description

self, to allow chaining.

from_map(func, *iterables, args=None, **kwargs)

Create a PyArrow Table by mapping a function over iterables.

This is equivalent to dask's from_map but returns a PyArrow Table that can be used with DataFusion instead of a Dask DataFrame.

Parameters:

Name Type Description Default
func Callable

Function to apply to each element of the iterables.

required
*iterables tuple[Any, ...]

Iterable objects to map the function over.

()
args tuple | None

Additional positional arguments to pass to func.

None
**kwargs dict[str, Any]

Additional keyword arguments to pass to func.

{}

Returns:

Type Description
Table

A PyArrow Table containing the concatenated results.

read_xarray(ds, chunks=None)

Pivots an Xarray Dataset into a PyArrow Table, partitioned by chunks.

Parameters:

Name Type Description Default
ds Dataset

An Xarray Dataset. All data_vars must share the same dimensions.

required
chunks Chunks

Xarray-like chunks. If not provided, will default to the Dataset's chunks. The product of the chunk sizes becomes the standard length of each dataframe partition.

None

Returns:

Type Description
RecordBatchReader

A PyArrow RecordBatchReader, which is a table representation of the input

RecordBatchReader

Dataset.

read_xarray_table(ds, chunks=None, *, batch_size=DEFAULT_BATCH_SIZE, _iteration_callback=None)

Create a lazy DataFusion table from an xarray Dataset.

This is the simplest way to register xarray data with DataFusion. Data is only read when queries are executed, not during registration. The table can be queried multiple times.

Each chunk becomes a separate partition, enabling DataFusion's parallel execution across multiple cores.

Note

SQL queries with WHERE clauses on dimension columns (time, lat, lon, etc.) automatically prune partitions that can't contain matching rows — this is called filter pushdown. For example:

# This query will skip loading partitions with time < '2020-02-01'
result = ctx.sql('SELECT * FROM air WHERE time > "2020-02-01"').collect()

Supported operators: =, <, >, <=, >=, BETWEEN, IN, AND, OR.

Parameters:

Name Type Description Default
ds Dataset

An xarray Dataset. All data_vars must share the same dimensions.

required
chunks Chunks

Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks.

None
batch_size int

Maximum rows per Arrow RecordBatch emitted per partition. Smaller values let DataFusion start processing earlier; the default (65 536) works well for most datasets.

DEFAULT_BATCH_SIZE
_iteration_callback Callable[[Block, list[str] | None], None] | None

Internal callback for testing. Called with each block dict just before it's converted to Arrow.

None

Returns:

Type Description
'LazyArrowStreamTable'

A LazyArrowStreamTable ready for registration with DataFusion.

Example

from datafusion import SessionContext import xarray as xr from xarray_sql import read_xarray_table

ds = xr.tutorial.open_dataset('air_temperature') table = read_xarray_table(ds, chunks={'time': 240})

ctx = SessionContext() ctx.register_table('air', table)

Data is only read here, during query execution

Filters on 'time' will prune partitions automatically!

result = ctx.sql('SELECT AVG(air) FROM air').collect()

cftime

Bridge between cftime calendars and Arrow/DataFusion types.

cftime (https://unidata.github.io/cftime/) provides datetime objects for calendars used in climate science — noleap, 360-day, all-leap, julian, etc. Arrow and DataFusion have no native concept of non-Gregorian calendars, so this module handles the conversion in two tiers:

  • Gregorian-like calendars (standard, gregorian, proleptic_gregorian, noleap/365_day, all_leap/366_day): mapped to pa.timestamp('us') so that string-based SQL filters like WHERE time > '1980-01-01' work naturally. Microsecond resolution avoids the 1678–2262 overflow of nanoseconds while preserving sub-second precision.

  • Non-Gregorian calendars (360_day, julian): mapped to pa.int64() with xarray:units and xarray:calendar metadata on the Arrow field. This preserves the original CF-convention encoding losslessly. A cftime() DataFusion UDF (registered automatically by XarrayContext.from_dataset) provides ergonomic SQL filtering.

DEFAULT_UNITS = 'microseconds since 1970-01-01T00:00:00' module-attribute

GREGORIAN_LIKE_CALENDARS = frozenset({'standard', 'gregorian', 'proleptic_gregorian', 'noleap', '365_day', 'all_leap', '366_day'}) module-attribute

arrow_field(name, units, cal)

Build a pa.Field for a cftime coordinate.

Gregorian-like → pa.timestamp('us'); non-Gregorian → pa.int64(). Both carry xarray:calendar and xarray:units metadata for round-trip fidelity.

calendar(ds, coord_name)

Return the calendar name for a cftime coordinate, or None.

Checks the xarray index first (no data materialization), then falls back to inspecting element 0 of the coordinate values.

convert_for_field(values, field)

Convert cftime values to the numeric type dictated by field.

Reads xarray:calendar and xarray:units from the field's Arrow metadata to choose between the timestamp path and the integer-offset path.

encoding(ds, coord_name)

Return (units, calendar) for a cftime coordinate.

Reads xarray .encoding metadata (from the originating NetCDF file) first, falling back to :data:DEFAULT_UNITS.

is_cftime(values)

Check if a numpy array contains cftime datetime objects.

is_cftime_index(ds, coord_name)

Check if a coordinate uses a CFTimeIndex without materializing data.

is_gregorian_like(calendar)

Return True if calendar is close enough to Gregorian for pa.timestamp.

make_cftime_udf(units, calendar)

Create a DataFusion scalar UDF that converts date strings to int64 offsets.

This enables ergonomic SQL filtering on non-Gregorian cftime columns::

SELECT * FROM ds360 WHERE time > cftime('0500-01-01')

The UDF parses the input string as a cftime datetime in the given calendar system and returns the corresponding int64 offset in the specified units.

partition_bounds(values)

Return (min, max, dtype_tag) for a cftime coordinate slice.

Gregorian-like calendars return nanosecond bounds tagged "timestamp_ns" (compatible with ScalarBound::TimestampNanos in the Rust pruning layer). Non-Gregorian calendars return int64 offsets tagged "int64".

to_microseconds(values)

Convert cftime objects to int64 microseconds since Unix epoch.

Used for Gregorian-like calendars. Vectorised via cftime.date2num (implemented in C).

to_offsets(values, units, cal)

Convert cftime objects to int64 offsets in the given units/calendar.

Used for non-Gregorian calendars where data is stored as pa.int64().

core

Row = list[Any] module-attribute

get_columns(ds)

unbounded_unravel(ds)

Unravel with unbounded memory (as a NumPy Array).

unravel(ds)

df

Block = dict[Hashable, slice] module-attribute

Chunks = dict[str, int] | None module-attribute

DEFAULT_BATCH_SIZE = 65536 module-attribute

PartitionBounds = dict[str, tuple[Any, Any, str]] module-attribute

block_slices(ds, chunks=None)

Compute block slices for a chunked Dataset.

dataset_to_record_batch(ds, schema)

Convert an xarray Dataset partition to an Arrow RecordBatch.

Builds the RecordBatch directly from numpy arrays, bypassing the pandas round-trip (to_dataframe → reset_index → from_pandas) used by pivot(). For large partitions this reduces peak memory from ~5× to ~2× the partition size.

Dimension coordinates are broadcast to the full partition shape and ravelled. np.broadcast_to() is zero-copy; the ravel() forces one copy per coordinate (unavoidable, since broadcast arrays are non-contiguous). Data variable arrays are ravelled in-place — a zero-copy view when the underlying array is already C-contiguous (the common case for numpy-backed xarray datasets).

Parameters:

Name Type Description Default
ds Dataset

A partition-sized xarray Dataset (already sliced via isel).

required
schema Schema

The Arrow schema for the output, as produced by _parse_schema. Column order in the output matches schema field order.

required

Returns:

Type Description
RecordBatch

A RecordBatch with one column per dimension coordinate and data

RecordBatch

variable, in schema order.

explode(ds, chunks=None)

Explodes a dataset into its chunks.

from_map(func, *iterables, args=None, **kwargs)

Create a PyArrow Table by mapping a function over iterables.

This is equivalent to dask's from_map but returns a PyArrow Table that can be used with DataFusion instead of a Dask DataFrame.

Parameters:

Name Type Description Default
func Callable

Function to apply to each element of the iterables.

required
*iterables tuple[Any, ...]

Iterable objects to map the function over.

()
args tuple | None

Additional positional arguments to pass to func.

None
**kwargs dict[str, Any]

Additional keyword arguments to pass to func.

{}

Returns:

Type Description
Table

A PyArrow Table containing the concatenated results.

from_map_batched(func, *iterables, args=None, schema=None, **kwargs)

Create a PyArrow RecordBatchReader by mapping a function over iterables.

This is equivalent to dask's from_map but returns a PyArrow RecordBatchReader that can be used with DataFusion. It iterates over RecordBatches which are created via the func one-at-a-time.

Parameters:

Name Type Description Default
func Callable[..., DataFrame]

Function to apply to each element of the iterables. Currently, the function must return a Pandas DataFrame.

required
*iterables tuple[Any, ...]

Iterable objects to map the function over.

()
schema Schema

Optional schema needed for the RecordBatchReader.

None
args tuple | None

Additional positional arguments to pass to func.

None
**kwargs dict[str, Any]

Additional keyword arguments to pass to func.

{}

Returns:

Type Description
RecordBatchReader

A PyArrow RecordBatchReader containing the stream of RecordBatches.

iter_record_batches(ds, schema, batch_size=DEFAULT_BATCH_SIZE)

Yield RecordBatches of at most batch_size rows from a partition Dataset.

Unlike dataset_to_record_batch, which materialises the entire partition as one batch, this generator emits smaller batches so that DataFusion can begin filtering and aggregating before the full partition is loaded. Peak memory per batch is O(batch_size) for coordinate columns and O(partition_size) for data-variable columns (which must be loaded in full from storage).

Coordinate values are computed per batch via strided index arithmetic — no broadcast array spanning the whole partition is ever allocated. Data variable flat arrays are loaded once (triggering any remote I/O) and then sliced as zero-copy views for each batch.

Parameters:

Name Type Description Default
ds Dataset

A partition-sized xarray Dataset (already sliced via isel).

required
schema Schema

The Arrow schema for the output, as produced by _parse_schema.

required
batch_size int

Maximum number of rows per yielded RecordBatch.

DEFAULT_BATCH_SIZE

Yields:

Type Description
RecordBatch

RecordBatches in schema column order, covering all rows of the

RecordBatch

partition exactly once.

partition_metadata(ds, blocks)

Compute min/max coordinate values for each partition.

This metadata enables filter pushdown: SQL queries with WHERE clauses on dimension columns can prune partitions that can't contain matching rows.

Parameters:

Name Type Description Default
ds Dataset

The xarray Dataset containing coordinate values.

required
blocks list[Block]

List of block slices from block_slices().

required

Returns:

Type Description
list[PartitionBounds]

List of dicts mapping dimension name to

list[PartitionBounds]

(min_value, max_value, dtype_str) tuples.

  • For datetime64, values are nanoseconds since Unix epoch (int64), dtype_str is "timestamp_ns"
  • For numeric types, values are Python int or float, dtype_str is "int64" or "float64"
Note

If a partition has an empty slice for a dimension, that dimension is omitted from the partition's metadata. The Rust pruning logic treats missing dimensions conservatively (never prunes on them).

pivot(ds)

Converts an xarray Dataset to a pandas DataFrame.

reader

Lazy Arrow stream reader for xarray Datasets.

This module provides XarrayRecordBatchReader, which implements the Arrow PyCapsule Interface (arrow_c_stream) to enable zero-copy, lazy streaming of xarray data to DataFusion and other Arrow consumers.

The implementation delegates to PyArrow's RecordBatchReader for the actual stream implementation, wrapping xarray block iteration in a generator.

XarrayRecordBatchReader

A lazy Arrow stream reader for xarray Datasets.

Implements the Arrow PyCapsule Interface (arrow_c_stream) to enable zero-copy, lazy streaming of xarray data to DataFusion and other Arrow consumers.

The key property is that xarray blocks are only converted to Arrow RecordBatches when the consumer calls get_next (e.g., during DataFusion's collect()), NOT when the reader is created or registered.

Attributes:

Name Type Description
schema Schema

The Arrow schema for the stream.

Example

import xarray as xr from xarray_sql import XarrayRecordBatchReader ds = xr.tutorial.open_dataset('air_temperature') reader = XarrayRecordBatchReader(ds, chunks={'time': 240})

At this point, NO data has been read from xarray
Data is only read when consumed:

import pyarrow as pa pa_reader = pa.RecordBatchReader.from_stream(reader) for batch in pa_reader: ... print(batch.num_rows) # Data read here

schema property

The Arrow schema for this stream.

read_xarray(ds, chunks=None)

Pivots an Xarray Dataset into a PyArrow Table, partitioned by chunks.

Parameters:

Name Type Description Default
ds Dataset

An Xarray Dataset. All data_vars must share the same dimensions.

required
chunks Chunks

Xarray-like chunks. If not provided, will default to the Dataset's chunks. The product of the chunk sizes becomes the standard length of each dataframe partition.

None

Returns:

Type Description
RecordBatchReader

A PyArrow RecordBatchReader, which is a table representation of the input

RecordBatchReader

Dataset.

read_xarray_table(ds, chunks=None, *, batch_size=DEFAULT_BATCH_SIZE, _iteration_callback=None)

Create a lazy DataFusion table from an xarray Dataset.

This is the simplest way to register xarray data with DataFusion. Data is only read when queries are executed, not during registration. The table can be queried multiple times.

Each chunk becomes a separate partition, enabling DataFusion's parallel execution across multiple cores.

Note

SQL queries with WHERE clauses on dimension columns (time, lat, lon, etc.) automatically prune partitions that can't contain matching rows — this is called filter pushdown. For example:

# This query will skip loading partitions with time < '2020-02-01'
result = ctx.sql('SELECT * FROM air WHERE time > "2020-02-01"').collect()

Supported operators: =, <, >, <=, >=, BETWEEN, IN, AND, OR.

Parameters:

Name Type Description Default
ds Dataset

An xarray Dataset. All data_vars must share the same dimensions.

required
chunks Chunks

Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks.

None
batch_size int

Maximum rows per Arrow RecordBatch emitted per partition. Smaller values let DataFusion start processing earlier; the default (65 536) works well for most datasets.

DEFAULT_BATCH_SIZE
_iteration_callback Callable[[Block, list[str] | None], None] | None

Internal callback for testing. Called with each block dict just before it's converted to Arrow.

None

Returns:

Type Description
'LazyArrowStreamTable'

A LazyArrowStreamTable ready for registration with DataFusion.

Example

from datafusion import SessionContext import xarray as xr from xarray_sql import read_xarray_table

ds = xr.tutorial.open_dataset('air_temperature') table = read_xarray_table(ds, chunks={'time': 240})

ctx = SessionContext() ctx.register_table('air', table)

Data is only read here, during query execution
Filters on 'time' will prune partitions automatically!

result = ctx.sql('SELECT AVG(air) FROM air').collect()

sql

XarrayContext

Bases: SessionContext

A datafusion SessionContext that also supports xarray.Datasets.

from_dataset(table_name, input_table, chunks=None)

Register an xarray Dataset as a queryable SQL table.

For datasets with non-Gregorian cftime coordinates (e.g. 360_day, julian), a cftime() scalar UDF is automatically registered so you can write ergonomic SQL filters::

ctx.from_dataset("ds360", ds, chunks={"time": 6})
ctx.sql("SELECT * FROM ds360 WHERE time >= cftime('2000-07-01')")

The UDF converts a date string to the int64 offset used to store that calendar's time axis.

.. note::

Only one ``cftime()`` UDF is registered per context, using the
units and calendar of the *first* non-Gregorian coordinate
encountered. If you register multiple datasets with *different*
non-Gregorian calendars (e.g. one 360_day and one julian), the
UDF from the first registration will be used for all subsequent
``cftime()`` calls and may produce incorrect offsets for the
other dataset. In that case, create a separate ``XarrayContext``
for each calendar.

Parameters:

Name Type Description Default
table_name str

The SQL table name to register the dataset under.

required
input_table Dataset

An xarray Dataset. All data_vars must share the same dimensions.

required
chunks Chunks

Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks.

None

Returns:

Type Description

self, to allow chaining.