xarray-sql¶
xarray_sql
¶
__all__ = ['cftime', 'XarrayContext', 'read_xarray_table', 'read_xarray', 'from_map']
module-attribute
¶
XarrayContext
¶
Bases: SessionContext
A datafusion SessionContext that also supports xarray.Datasets.
from_dataset(table_name, input_table, chunks=None)
¶
Register an xarray Dataset as a queryable SQL table.
For datasets with non-Gregorian cftime coordinates (e.g. 360_day,
julian), a cftime() scalar UDF is automatically registered so
you can write ergonomic SQL filters::
ctx.from_dataset("ds360", ds, chunks={"time": 6})
ctx.sql("SELECT * FROM ds360 WHERE time >= cftime('2000-07-01')")
The UDF converts a date string to the int64 offset used to store that calendar's time axis.
.. note::
Only one ``cftime()`` UDF is registered per context, using the
units and calendar of the *first* non-Gregorian coordinate
encountered. If you register multiple datasets with *different*
non-Gregorian calendars (e.g. one 360_day and one julian), the
UDF from the first registration will be used for all subsequent
``cftime()`` calls and may produce incorrect offsets for the
other dataset. In that case, create a separate ``XarrayContext``
for each calendar.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
The SQL table name to register the dataset under. |
required |
input_table
|
Dataset
|
An xarray Dataset. All data_vars must share the same dimensions. |
required |
chunks
|
Chunks
|
Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks. |
None
|
Returns:
| Type | Description |
|---|---|
|
self, to allow chaining. |
from_map(func, *iterables, args=None, **kwargs)
¶
Create a PyArrow Table by mapping a function over iterables.
This is equivalent to dask's from_map but returns a PyArrow Table that can be used with DataFusion instead of a Dask DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
Function to apply to each element of the iterables. |
required |
*iterables
|
tuple[Any, ...]
|
Iterable objects to map the function over. |
()
|
args
|
tuple | None
|
Additional positional arguments to pass to func. |
None
|
**kwargs
|
dict[str, Any]
|
Additional keyword arguments to pass to func. |
{}
|
Returns:
| Type | Description |
|---|---|
Table
|
A PyArrow Table containing the concatenated results. |
read_xarray(ds, chunks=None)
¶
Pivots an Xarray Dataset into a PyArrow Table, partitioned by chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
An Xarray Dataset. All |
required |
chunks
|
Chunks
|
Xarray-like chunks. If not provided, will default to the Dataset's chunks. The product of the chunk sizes becomes the standard length of each dataframe partition. |
None
|
Returns:
| Type | Description |
|---|---|
RecordBatchReader
|
A PyArrow RecordBatchReader, which is a table representation of the input |
RecordBatchReader
|
Dataset. |
read_xarray_table(ds, chunks=None, *, batch_size=DEFAULT_BATCH_SIZE, _iteration_callback=None)
¶
Create a lazy DataFusion table from an xarray Dataset.
This is the simplest way to register xarray data with DataFusion. Data is only read when queries are executed, not during registration. The table can be queried multiple times.
Each chunk becomes a separate partition, enabling DataFusion's parallel execution across multiple cores.
Note
SQL queries with WHERE clauses on dimension columns (time, lat, lon, etc.) automatically prune partitions that can't contain matching rows — this is called filter pushdown. For example:
# This query will skip loading partitions with time < '2020-02-01'
result = ctx.sql('SELECT * FROM air WHERE time > "2020-02-01"').collect()
Supported operators: =, <, >, <=, >=, BETWEEN, IN, AND, OR.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
An xarray Dataset. All data_vars must share the same dimensions. |
required |
chunks
|
Chunks
|
Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks. |
None
|
batch_size
|
int
|
Maximum rows per Arrow RecordBatch emitted per partition. Smaller values let DataFusion start processing earlier; the default (65 536) works well for most datasets. |
DEFAULT_BATCH_SIZE
|
_iteration_callback
|
Callable[[Block, list[str] | None], None] | None
|
Internal callback for testing. Called with each block dict just before it's converted to Arrow. |
None
|
Returns:
| Type | Description |
|---|---|
'LazyArrowStreamTable'
|
A LazyArrowStreamTable ready for registration with DataFusion. |
Example
from datafusion import SessionContext import xarray as xr from xarray_sql import read_xarray_table
ds = xr.tutorial.open_dataset('air_temperature') table = read_xarray_table(ds, chunks={'time': 240})
ctx = SessionContext() ctx.register_table('air', table)
Data is only read here, during query execution¶
Filters on 'time' will prune partitions automatically!¶
result = ctx.sql('SELECT AVG(air) FROM air').collect()
cftime
¶
Bridge between cftime calendars and Arrow/DataFusion types.
cftime (https://unidata.github.io/cftime/) provides datetime objects for calendars used in climate science — noleap, 360-day, all-leap, julian, etc. Arrow and DataFusion have no native concept of non-Gregorian calendars, so this module handles the conversion in two tiers:
-
Gregorian-like calendars (standard, gregorian, proleptic_gregorian, noleap/365_day, all_leap/366_day): mapped to
pa.timestamp('us')so that string-based SQL filters likeWHERE time > '1980-01-01'work naturally. Microsecond resolution avoids the 1678–2262 overflow of nanoseconds while preserving sub-second precision. -
Non-Gregorian calendars (360_day, julian): mapped to
pa.int64()withxarray:unitsandxarray:calendarmetadata on the Arrow field. This preserves the original CF-convention encoding losslessly. Acftime()DataFusion UDF (registered automatically byXarrayContext.from_dataset) provides ergonomic SQL filtering.
DEFAULT_UNITS = 'microseconds since 1970-01-01T00:00:00'
module-attribute
¶
GREGORIAN_LIKE_CALENDARS = frozenset({'standard', 'gregorian', 'proleptic_gregorian', 'noleap', '365_day', 'all_leap', '366_day'})
module-attribute
¶
arrow_field(name, units, cal)
¶
Build a pa.Field for a cftime coordinate.
Gregorian-like → pa.timestamp('us'); non-Gregorian → pa.int64().
Both carry xarray:calendar and xarray:units metadata for
round-trip fidelity.
calendar(ds, coord_name)
¶
Return the calendar name for a cftime coordinate, or None.
Checks the xarray index first (no data materialization), then falls back to inspecting element 0 of the coordinate values.
convert_for_field(values, field)
¶
Convert cftime values to the numeric type dictated by field.
Reads xarray:calendar and xarray:units from the field's Arrow
metadata to choose between the timestamp path and the integer-offset path.
encoding(ds, coord_name)
¶
Return (units, calendar) for a cftime coordinate.
Reads xarray .encoding metadata (from the originating NetCDF file)
first, falling back to :data:DEFAULT_UNITS.
is_cftime(values)
¶
Check if a numpy array contains cftime datetime objects.
is_cftime_index(ds, coord_name)
¶
Check if a coordinate uses a CFTimeIndex without materializing data.
is_gregorian_like(calendar)
¶
Return True if calendar is close enough to Gregorian for pa.timestamp.
make_cftime_udf(units, calendar)
¶
Create a DataFusion scalar UDF that converts date strings to int64 offsets.
This enables ergonomic SQL filtering on non-Gregorian cftime columns::
SELECT * FROM ds360 WHERE time > cftime('0500-01-01')
The UDF parses the input string as a cftime datetime in the given calendar system and returns the corresponding int64 offset in the specified units.
partition_bounds(values)
¶
Return (min, max, dtype_tag) for a cftime coordinate slice.
Gregorian-like calendars return nanosecond bounds tagged
"timestamp_ns" (compatible with ScalarBound::TimestampNanos
in the Rust pruning layer). Non-Gregorian calendars return int64
offsets tagged "int64".
to_microseconds(values)
¶
Convert cftime objects to int64 microseconds since Unix epoch.
Used for Gregorian-like calendars. Vectorised via cftime.date2num
(implemented in C).
to_offsets(values, units, cal)
¶
Convert cftime objects to int64 offsets in the given units/calendar.
Used for non-Gregorian calendars where data is stored as pa.int64().
core
¶
df
¶
Block = dict[Hashable, slice]
module-attribute
¶
Chunks = dict[str, int] | None
module-attribute
¶
DEFAULT_BATCH_SIZE = 65536
module-attribute
¶
PartitionBounds = dict[str, tuple[Any, Any, str]]
module-attribute
¶
block_slices(ds, chunks=None)
¶
Compute block slices for a chunked Dataset.
dataset_to_record_batch(ds, schema)
¶
Convert an xarray Dataset partition to an Arrow RecordBatch.
Builds the RecordBatch directly from numpy arrays, bypassing the pandas round-trip (to_dataframe → reset_index → from_pandas) used by pivot(). For large partitions this reduces peak memory from ~5× to ~2× the partition size.
Dimension coordinates are broadcast to the full partition shape and ravelled. np.broadcast_to() is zero-copy; the ravel() forces one copy per coordinate (unavoidable, since broadcast arrays are non-contiguous). Data variable arrays are ravelled in-place — a zero-copy view when the underlying array is already C-contiguous (the common case for numpy-backed xarray datasets).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
A partition-sized xarray Dataset (already sliced via isel). |
required |
schema
|
Schema
|
The Arrow schema for the output, as produced by _parse_schema. Column order in the output matches schema field order. |
required |
Returns:
| Type | Description |
|---|---|
RecordBatch
|
A RecordBatch with one column per dimension coordinate and data |
RecordBatch
|
variable, in schema order. |
explode(ds, chunks=None)
¶
Explodes a dataset into its chunks.
from_map(func, *iterables, args=None, **kwargs)
¶
Create a PyArrow Table by mapping a function over iterables.
This is equivalent to dask's from_map but returns a PyArrow Table that can be used with DataFusion instead of a Dask DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
Function to apply to each element of the iterables. |
required |
*iterables
|
tuple[Any, ...]
|
Iterable objects to map the function over. |
()
|
args
|
tuple | None
|
Additional positional arguments to pass to func. |
None
|
**kwargs
|
dict[str, Any]
|
Additional keyword arguments to pass to func. |
{}
|
Returns:
| Type | Description |
|---|---|
Table
|
A PyArrow Table containing the concatenated results. |
from_map_batched(func, *iterables, args=None, schema=None, **kwargs)
¶
Create a PyArrow RecordBatchReader by mapping a function over iterables.
This is equivalent to dask's from_map but returns a PyArrow
RecordBatchReader that can be used with DataFusion. It iterates over
RecordBatches which are created via the func one-at-a-time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., DataFrame]
|
Function to apply to each element of the iterables. Currently, the function must return a Pandas DataFrame. |
required |
*iterables
|
tuple[Any, ...]
|
Iterable objects to map the function over. |
()
|
schema
|
Schema
|
Optional schema needed for the RecordBatchReader. |
None
|
args
|
tuple | None
|
Additional positional arguments to pass to func. |
None
|
**kwargs
|
dict[str, Any]
|
Additional keyword arguments to pass to func. |
{}
|
Returns:
| Type | Description |
|---|---|
RecordBatchReader
|
A PyArrow RecordBatchReader containing the stream of RecordBatches. |
iter_record_batches(ds, schema, batch_size=DEFAULT_BATCH_SIZE)
¶
Yield RecordBatches of at most batch_size rows from a partition Dataset.
Unlike dataset_to_record_batch, which materialises the entire
partition as one batch, this generator emits smaller batches so that
DataFusion can begin filtering and aggregating before the full partition
is loaded. Peak memory per batch is O(batch_size) for coordinate columns
and O(partition_size) for data-variable columns (which must be loaded in
full from storage).
Coordinate values are computed per batch via strided index arithmetic — no broadcast array spanning the whole partition is ever allocated. Data variable flat arrays are loaded once (triggering any remote I/O) and then sliced as zero-copy views for each batch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
A partition-sized xarray Dataset (already sliced via isel). |
required |
schema
|
Schema
|
The Arrow schema for the output, as produced by _parse_schema. |
required |
batch_size
|
int
|
Maximum number of rows per yielded RecordBatch. |
DEFAULT_BATCH_SIZE
|
Yields:
| Type | Description |
|---|---|
RecordBatch
|
RecordBatches in schema column order, covering all rows of the |
RecordBatch
|
partition exactly once. |
partition_metadata(ds, blocks)
¶
Compute min/max coordinate values for each partition.
This metadata enables filter pushdown: SQL queries with WHERE clauses on dimension columns can prune partitions that can't contain matching rows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
The xarray Dataset containing coordinate values. |
required |
blocks
|
list[Block]
|
List of block slices from block_slices(). |
required |
Returns:
| Type | Description |
|---|---|
list[PartitionBounds]
|
List of dicts mapping dimension name to |
list[PartitionBounds]
|
(min_value, max_value, dtype_str) tuples.
|
Note
If a partition has an empty slice for a dimension, that dimension is omitted from the partition's metadata. The Rust pruning logic treats missing dimensions conservatively (never prunes on them).
pivot(ds)
¶
Converts an xarray Dataset to a pandas DataFrame.
reader
¶
Lazy Arrow stream reader for xarray Datasets.
This module provides XarrayRecordBatchReader, which implements the Arrow PyCapsule Interface (arrow_c_stream) to enable zero-copy, lazy streaming of xarray data to DataFusion and other Arrow consumers.
The implementation delegates to PyArrow's RecordBatchReader for the actual stream implementation, wrapping xarray block iteration in a generator.
XarrayRecordBatchReader
¶
A lazy Arrow stream reader for xarray Datasets.
Implements the Arrow PyCapsule Interface (arrow_c_stream) to enable zero-copy, lazy streaming of xarray data to DataFusion and other Arrow consumers.
The key property is that xarray blocks are only converted to Arrow RecordBatches when the consumer calls get_next (e.g., during DataFusion's collect()), NOT when the reader is created or registered.
Attributes:
| Name | Type | Description |
|---|---|---|
schema |
Schema
|
The Arrow schema for the stream. |
Example
import xarray as xr from xarray_sql import XarrayRecordBatchReader ds = xr.tutorial.open_dataset('air_temperature') reader = XarrayRecordBatchReader(ds, chunks={'time': 240})
At this point, NO data has been read from xarray¶
Data is only read when consumed:¶
import pyarrow as pa pa_reader = pa.RecordBatchReader.from_stream(reader) for batch in pa_reader: ... print(batch.num_rows) # Data read here
schema
property
¶
The Arrow schema for this stream.
read_xarray(ds, chunks=None)
¶
Pivots an Xarray Dataset into a PyArrow Table, partitioned by chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
An Xarray Dataset. All |
required |
chunks
|
Chunks
|
Xarray-like chunks. If not provided, will default to the Dataset's chunks. The product of the chunk sizes becomes the standard length of each dataframe partition. |
None
|
Returns:
| Type | Description |
|---|---|
RecordBatchReader
|
A PyArrow RecordBatchReader, which is a table representation of the input |
RecordBatchReader
|
Dataset. |
read_xarray_table(ds, chunks=None, *, batch_size=DEFAULT_BATCH_SIZE, _iteration_callback=None)
¶
Create a lazy DataFusion table from an xarray Dataset.
This is the simplest way to register xarray data with DataFusion. Data is only read when queries are executed, not during registration. The table can be queried multiple times.
Each chunk becomes a separate partition, enabling DataFusion's parallel execution across multiple cores.
Note
SQL queries with WHERE clauses on dimension columns (time, lat, lon, etc.) automatically prune partitions that can't contain matching rows — this is called filter pushdown. For example:
# This query will skip loading partitions with time < '2020-02-01'
result = ctx.sql('SELECT * FROM air WHERE time > "2020-02-01"').collect()
Supported operators: =, <, >, <=, >=, BETWEEN, IN, AND, OR.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset
|
An xarray Dataset. All data_vars must share the same dimensions. |
required |
chunks
|
Chunks
|
Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks. |
None
|
batch_size
|
int
|
Maximum rows per Arrow RecordBatch emitted per partition. Smaller values let DataFusion start processing earlier; the default (65 536) works well for most datasets. |
DEFAULT_BATCH_SIZE
|
_iteration_callback
|
Callable[[Block, list[str] | None], None] | None
|
Internal callback for testing. Called with each block dict just before it's converted to Arrow. |
None
|
Returns:
| Type | Description |
|---|---|
'LazyArrowStreamTable'
|
A LazyArrowStreamTable ready for registration with DataFusion. |
Example
from datafusion import SessionContext import xarray as xr from xarray_sql import read_xarray_table
ds = xr.tutorial.open_dataset('air_temperature') table = read_xarray_table(ds, chunks={'time': 240})
ctx = SessionContext() ctx.register_table('air', table)
Data is only read here, during query execution¶
Filters on 'time' will prune partitions automatically!¶
result = ctx.sql('SELECT AVG(air) FROM air').collect()
sql
¶
XarrayContext
¶
Bases: SessionContext
A datafusion SessionContext that also supports xarray.Datasets.
from_dataset(table_name, input_table, chunks=None)
¶
Register an xarray Dataset as a queryable SQL table.
For datasets with non-Gregorian cftime coordinates (e.g. 360_day,
julian), a cftime() scalar UDF is automatically registered so
you can write ergonomic SQL filters::
ctx.from_dataset("ds360", ds, chunks={"time": 6})
ctx.sql("SELECT * FROM ds360 WHERE time >= cftime('2000-07-01')")
The UDF converts a date string to the int64 offset used to store that calendar's time axis.
.. note::
Only one ``cftime()`` UDF is registered per context, using the
units and calendar of the *first* non-Gregorian coordinate
encountered. If you register multiple datasets with *different*
non-Gregorian calendars (e.g. one 360_day and one julian), the
UDF from the first registration will be used for all subsequent
``cftime()`` calls and may produce incorrect offsets for the
other dataset. In that case, create a separate ``XarrayContext``
for each calendar.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
The SQL table name to register the dataset under. |
required |
input_table
|
Dataset
|
An xarray Dataset. All data_vars must share the same dimensions. |
required |
chunks
|
Chunks
|
Xarray-like chunks specification. If not provided, uses the Dataset's existing chunks. |
None
|
Returns:
| Type | Description |
|---|---|
|
self, to allow chaining. |