Python API
PyIceberg is based around catalogs to load tables. First step is to instantiate a catalog that loads tables. Let's use the following configuration:
catalog:
prod:
uri: http://rest-catalog/ws/
credential: t-1234:secret
Then load the prod
catalog:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("prod")
catalog.list_namespaces()
Returns two namespaces:
[("default",), ("nyc",)]
Listing the tables in the nyc
namespace:
catalog.list_tables("nyc")
Returns as list with tuples, containing a single table taxis
:
[("nyc", "taxis")]
Load a table
Loading the taxis
table:
catalog.load_table("nyc.taxis")
# Equivalent to:
catalog.load_table(("nyc", "taxis"))
# The tuple syntax can be used if the namespace or table contains a dot.
This returns a Table
that represents an Iceberg table:
Table(
identifier=('nyc', 'taxis'),
metadata_location='s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json',
metadata=TableMetadataV2(
location='s3a://warehouse/wh/nyc.db/taxis',
table_uuid=UUID('ebd5d172-2162-453d-b586-1cdce52c1116'),
last_updated_ms=1662633437826,
last_column_id=19,
schemas=[Schema(
NestedField(field_id=1, name='VendorID', field_type=LongType(), required=False),
NestedField(field_id=2, name='tpep_pickup_datetime', field_type=TimestamptzType(), required=False),
NestedField(field_id=3, name='tpep_dropoff_datetime', field_type=TimestamptzType(), required=False),
NestedField(field_id=4, name='passenger_count', field_type=DoubleType(), required=False),
NestedField(field_id=5, name='trip_distance', field_type=DoubleType(), required=False),
NestedField(field_id=6, name='RatecodeID', field_type=DoubleType(), required=False),
NestedField(field_id=7, name='store_and_fwd_flag', field_type=StringType(), required=False),
NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False),
NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False),
NestedField(field_id=10, name='payment_type', field_type=LongType(), required=False),
NestedField(field_id=11, name='fare_amount', field_type=DoubleType(), required=False),
NestedField(field_id=12, name='extra', field_type=DoubleType(), required=False),
NestedField(field_id=13, name='mta_tax', field_type=DoubleType(), required=False),
NestedField(field_id=14, name='tip_amount', field_type=DoubleType(), required=False),
NestedField(field_id=15, name='tolls_amount', field_type=DoubleType(), required=False),
NestedField(field_id=16, name='improvement_surcharge', field_type=DoubleType(), required=False),
NestedField(field_id=17, name='total_amount', field_type=DoubleType(), required=False),
NestedField(field_id=18, name='congestion_surcharge', field_type=DoubleType(), required=False),
NestedField(field_id=19, name='airport_fee', field_type=DoubleType(), required=False)
),
schema_id=0,
identifier_field_ids=[]
)],
current_schema_id=0,
partition_specs=[PartitionSpec(spec_id=0)],
default_spec_id=0,
last_partition_id=999,
properties={
'owner': 'root',
'write.format.default': 'parquet'
},
current_snapshot_id=8334458494559715805,
snapshots=[
Snapshot(
snapshot_id=7910949481055846233,
parent_snapshot_id=None,
sequence_number=None,
timestamp_ms=1662489306555,
manifest_list='s3a://warehouse/wh/nyc.db/taxis/metadata/snap-7910949481055846233-1-3eb7a2e1-5b7a-4e76-a29a-3e29c176eea4.avro',
summary=Summary(
Operation.APPEND,
**{
'spark.app.id': 'local-1662489289173',
'added-data-files': '1',
'added-records': '2979431',
'added-files-size': '46600777',
'changed-partition-count': '1',
'total-records': '2979431',
'total-files-size': '46600777',
'total-data-files': '1',
'total-delete-files': '0',
'total-position-deletes': '0',
'total-equality-deletes': '0'
}
),
schema_id=0
),
],
snapshot_log=[
SnapshotLogEntry(
snapshot_id='7910949481055846233',
timestamp_ms=1662489306555
)
],
metadata_log=[
MetadataLogEntry(
metadata_file='s3a://warehouse/wh/nyc.db/taxis/metadata/00000-b58341ba-6a63-4eea-9b2f-e85e47c7d09f.metadata.json',
timestamp_ms=1662489306555
)
],
sort_orders=[SortOrder(order_id=0)],
default_sort_order_id=0,
refs={
'main': SnapshotRef(
snapshot_id=8334458494559715805,
snapshot_ref_type=SnapshotRefType.BRANCH,
min_snapshots_to_keep=None,
max_snapshot_age_ms=None,
max_ref_age_ms=None
)
},
format_version=2,
last_sequence_number=1
)
)
Create a table
To create a table from a catalog:
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import TimestampType, DoubleType, StringType, NestedField
schema = Schema(
NestedField(
field_id=1, name="datetime", field_type=TimestampType(), required=False
),
NestedField(field_id=2, name="bid", field_type=DoubleType(), required=False),
NestedField(field_id=3, name="ask", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="symbol", field_type=StringType(), required=False),
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
sort_order = SortOrder(SortField(source_id=4, transform=IdentityTransform()))
catalog = load_catalog("prod")
catalog.create_table(
identifier="default.bids",
location="/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids/",
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
)
Which returns a newly created table:
Table(
identifier=('default', 'bids'),
metadata_location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids//metadata/00000-c8cd93ab-f784-474d-a167-b1a86b05195f.metadata.json',
metadata=TableMetadataV2(
location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids/',
table_uuid=UUID('38d4cb39-4945-4bf2-b374-984b5c4984d2'),
last_updated_ms=1661847562069,
last_column_id=4,
schemas=[
Schema(
NestedField(field_id=1, name='datetime', field_type=TimestampType(), required=False),
NestedField(field_id=2, name='bid', field_type=DoubleType(), required=False),
NestedField(field_id=3, name='ask', field_type=DoubleType(), required=False),
NestedField(field_id=4, name='symbol', field_type=StringType(), required=False)),
schema_id=1,
identifier_field_ids=[])
],
current_schema_id=1,
partition_specs=[
PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name='datetime_day'),))
],
default_spec_id=0,
last_partition_id=1000,
properties={},
current_snapshot_id=None,
snapshots=[],
snapshot_log=[],
metadata_log=[],
sort_orders=[
SortOrder(order_id=1, fields=[SortField(source_id=4, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST)])
],
default_sort_order_id=1,
refs={},
format_version=2,
last_sequence_number=0
)
)
Query a table
To query a table, a table scan is needed. A table scan accepts a filter, columns and optionally a snapshot ID:
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual
catalog = load_catalog("default")
table = catalog.load_table("nyc.taxis")
scan = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
)
[task.file.file_path for task in scan.plan_files()]
The low level API plan_files
methods returns a set of tasks that provide the files that might contain matching rows:
['s3a://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet']
In this case it is up to the engine itself to filter the file itself. Below, to_arrow()
and to_duckdb()
that already do this for you.
Apache Arrow
Requirements
This requires PyArrow to be installed
Using PyIceberg it is filter out data from a huge table and pull it into a PyArrow table:
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow()
This will return a PyArrow table:
pyarrow.Table
VendorID: int64
tpep_pickup_datetime: timestamp[us, tz=+00:00]
tpep_dropoff_datetime: timestamp[us, tz=+00:00]
----
VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]]
tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]]
tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]]
This will only pull in the files that that might contain matching rows.
DuckDB
Requirements
This requires DuckDB to be installed.
A table scan can also be converted into a in-memory DuckDB table:
con = table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")
Using the cursor that we can run queries on the DuckDB table:
print(
con.execute(
"SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4"
).fetchall()
)
[
(datetime.timedelta(seconds=1194),),
(datetime.timedelta(seconds=1118),),
(datetime.timedelta(seconds=1697),),
(datetime.timedelta(seconds=1581),),
]