Python Data Sources
The Python data source allows you to extend the SparkSession.read and DataFrame.write APIs to support custom formats and external system integrations. It optionally supports Arrow for zero-copy data exchange between the Python process and the Sail execution engine. This gives you flexibility in data source implementations without incurring performance penalties.
You can define a Python class that inherits from the pyspark.sql.datasource.DataSource abstract class, and register it to the Spark session to create a custom data source that can be used in the standard PySpark API. The DataSource class provides methods for defining the name and schema of the data source, as well as methods for creating readers and writers.
Currently, Sail supports Python data sources for batch reading and writing.
Examples
INFO
In the code below, spark refers to a Spark client session connected to the Sail server. You can refer to the Getting Started guide for how it works.
Batch Reader
>>> from typing import Iterator, Tuple
>>>
>>> from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
>>> from pyspark.sql.types import IntegerType, StringType, StructField, StructType
>>>
>>> class SimpleDataSource(DataSource):
... @classmethod
... def name(cls) -> str:
... return "simple"
...
... def schema(self) -> StructType:
... return StructType([
... StructField("name", StringType()),
... StructField("age", IntegerType())
... ])
...
... def reader(self, schema: StructType) -> DataSourceReader:
... return SimpleDataSourceReader()
>>>
>>> class SimpleDataSourceReader(DataSourceReader):
... def read(self, partition: InputPartition) -> Iterator[Tuple]:
... yield ("Alice", 20)
... yield ("Bob", 30)
>>>
>>> spark.dataSource.register(SimpleDataSource)
>>> spark.read.format("simple").load().show()
+-----+---+
| name|age|
+-----+---+
|Alice| 20|
| Bob| 30|
+-----+---+Batch Arrow Reader
>>> from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
>>> from pyspark.sql import SparkSession
>>> import pyarrow as pa
>>>
>>> class SimpleArrowDataSource(DataSource):
... @classmethod
... def name(cls):
... return "simplearrow"
...
... def schema(self):
... return "key int, value string"
...
... def reader(self, schema: str):
... return SimpleArrowDataSourceReader(schema, self.options)
>>>
>>> class SimpleArrowDataSourceReader(DataSourceReader):
... def __init__(self, schema, options):
... self.schema: str = schema
... self.options = options
...
... def read(self, partition):
... keys = pa.array([1, 2, 3], type=pa.int32())
... values = pa.array(["one", "two", "three"], type=pa.string())
... schema = pa.schema([("key", pa.int32()), ("value", pa.string())])
... batch = pa.RecordBatch.from_arrays([keys, values], schema=schema)
... yield batch
...
... def partitions(self):
... return [InputPartition(0)]
>>>
>>> spark.dataSource.register(SimpleArrowDataSource)
>>> df = spark.read.format("simplearrow").load()
>>> df.show()
+---+-----+
|key|value|
+---+-----+
| 1| one|
| 2| two|
| 3|three|
+---+-----+More Examples
Please refer to the Spark documentation for more Python data source examples, including how to define a batch writer. We will also add more examples to this guide in the future. Stay tuned!
