Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-660] Add GeoArrow export from Spark DataFrame #1767

Merged
merged 20 commits into from
Jan 27, 2025

Conversation

paleolimbot
Copy link
Member

@paleolimbot paleolimbot commented Jan 22, 2025

Did you read the Contributor Guide?

Is this PR related to a JIRA ticket?

Closes #1756.

What changes were proposed in this PR?

Added Adapter.toArrow() to collect an RDD or DataFrame to an Arrow Table where Geometry columns are represented as geoarrow.wkb extension types

How was this patch tested?

Tests will beadded to python/tests/utils/test_geoarrow.py.

Did this PR include necessary documentation updates?

  • A new API is added to the Python bindings and will be documented

@paleolimbot
Copy link
Member Author

paleolimbot commented Jan 22, 2025

For future me:

import os
import pyspark
from sedona.spark import SedonaContext
if "SPARK_HOME" in os.environ:
    del os.environ["SPARK_HOME"]
pyspark_version = pyspark.__version__[:pyspark.__version__.rfind(".")]

config = (
    SedonaContext.builder()
    .config(
        "spark.jars.packages",
        f"org.apache.sedona:sedona-spark-{pyspark_version}_2.12:1.7.0,"
        "org.datasyslab:geotools-wrapper:1.7.0-28.5",
    )
    .config(
        "spark.jars.repositories",
        "https://artifacts.unidata.ucar.edu/repository/unidata-all",
    )
    .getOrCreate()
)
sedona = SedonaContext.create(config)
import pyarrow as pa
from pyspark.sql.types import StringType, StructType

from sedona.utils.geoarrow import dataframe_to_arrow

test_wkt = ["POINT (0 1)", "LINESTRING (0 1, 2 3)"]

schema = StructType().add("wkt", StringType())
wkt_df = sedona.createDataFrame(zip(test_wkt), schema)

# No geometry
dataframe_to_arrow(wkt_df)
#> pyarrow.Table
#> wkt: string
#> ----
#> wkt: [["POINT (0 1)"],["LINESTRING (0 1, 2 3)"]]

# With geometry (not yet implemented)
geo_df = wkt_df.selectExpr("ST_GeomFromText(wkt) AS geom")
dataframe_to_arrow(geo_df)

@james-willis
Copy link
Contributor

Why not do all this in scala?

Comment on lines 30 to 32
col_is_geometry = [
isinstance(f.dataType, GeometryType) for f in spark_schema.fields
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesnt seem to handle geometries in complex types (arrays, structs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tools for dealing with this on the pyarrow side are not great and might be quite a lot of work. Is this an important use case or is there a way to structure a select() (or push something into Scala) that I'm missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is important. I use geometries in complex types frequently. At least arrays of geometries is something that sedona itself outputs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! I may defer to a follow-up since that's a bit of a learning curve for me. I think I can figure out how to walk the PySpark schema to get the nested paths corresponding to the Geometry nodes, and I think I can figure out how to do the node search/replace from the pyarrow side. For conversion, we can either:

  • Go back to the version here where we transform the result on the Python side in C
  • Figure out how to replace the FieldSerializer/ArrowSerializer on the Scala side to do what we need it to do there (basically: write WKB and add field metadata)
  • Figure out how to issue a select() call in pyspark that does the transformation using the pyspark dataframe API

@paleolimbot
Copy link
Member Author

Why not do all this in scala?

Great point! It's pyspark, but I pushed a version that does the WKB in Spark which is way better than what I had in mind 🙂

import pyarrow as pa
from pyspark.sql.types import StringType, StructType

from sedona.utils.geoarrow import dataframe_to_arrow

test_wkt = ["POINT (0 1)", "LINESTRING (0 1, 2 3)"]

schema = StructType().add("wkt", StringType())
wkt_df = sedona.createDataFrame(zip(test_wkt), schema)

geo_df = wkt_df.selectExpr("ST_GeomFromText(wkt) AS geom")
dataframe_to_arrow(geo_df)
#> pyarrow.Table
#> geom: extension<geoarrow.wkb<WkbType>>
#> ----
#> geom: [[01010000000000000000000000000000000000F03F],[0102000000020000000000000000000000000000000000F03F00000000000000400000000000000840]]

@jiayuasu
Copy link
Member

Maybe relevant: can this solve the toPandas() issue of PySpark DataFrame (https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html#enabling-for-conversion-to-from-pandas)?

In Sedona, when you call toPandas(), it gives you a GeoPandas DF but this will throw an exception if spark.sql.execution.arrow.pyspark.enabled set to true because Sedona Python does not work with GeoArrow.

CC @Kontinuation

@paleolimbot
Copy link
Member Author

can this solve the toPandas() issue of PySpark DataFrame

That probably needs something in Java land to define the Arrow serialization of a user-defined type?

In Python-land it won't quite get us to a GeoSeries today but it's a lot closer. I think that needs the GeoPandasDtype to define a method (and possibly something in Pandas to get a GeoSeries instead of a Series) but I think everybody from GeoPandas is on board to make that happen.

@paleolimbot paleolimbot marked this pull request as ready for review January 23, 2025 20:29
@paleolimbot paleolimbot requested a review from jiayuasu as a code owner January 23, 2025 20:29
@paleolimbot
Copy link
Member Author

paleolimbot commented Jan 23, 2025

I still have some questions here, but it does work!

Edit (more questions + example):

  • We need to propagate the CRS. We can add an argument so that if already known one can pass crs=... to avoid calculating it, but otherwise we probably should to ensure it gets propagated. We can update the AsBinary() call to be AsEWKB() and do that from Python or issue a Spark call?

Example converting to GeoPandas:

import geopandas

from sedona.utils.geoarrow import dataframe_to_arrow

# 200 million points
df = sedona.read.format("geoparquet").load("microsoft-buildings-point.parquet")

# 17s
table = dataframe_to_arrow(df.limit(10_000_000))

# 8s
geopandas.GeoDataFrame.from_arrow(table)

I think this is possibly a substantial speedup over following the existing documentation for Pandas conversion ( https://sedona.apache.org/1.7.0/tutorial/geopandas-shapely/#from-sedona-dataframe-to-geopandas ).

# 47 s
pdf = df.limit(10_000_000).toPandas()
geopandas.GeoDataFrame(pdf, geometry="geom")

@jiayuasu
Copy link
Member

@paleolimbot do you have a benchmark number to show the performance gain? Will this benefit our integration with lonboard?

@paleolimbot
Copy link
Member Author

do you have a benchmark number to show the performance gain?

My example above (end-to-end 10 million points from Sedona to GeoPandas) takes 25s after this PR (and 47s before, starting from a fresh session each time), although I am too new to Sedona to know if I am doing something obviously wrong or whether I'm benchmarking an unrepresentative workload.

Will this benefit our integration with lonboard?

I'll double check, but this should work out of the box (viz() accepts anything that implements __arrow_c_stream__, like a pyarrow.Table).

from lonboard import viz

table = dataframe_to_arrow(df.limit(10_000_000))
viz(table)

@jiayuasu
Copy link
Member

@paleolimbot

This is awesome.

Let me know if this works. Then I will merge this PR.

I'll double check, but this should work out of the box (viz() accepts anything that implements arrow_c_stream, like a pyarrow.Table).

@paleolimbot
Copy link
Member Author

It works! The two TODOs here are:

  • Document the function and make sure it can be imported from sedona.spark import to_arrow
  • Ensure the CRS on the output is set properly
Screenshot 2025-01-24 at 1 54 45 PM

@paleolimbot
Copy link
Member Author

I'd forgotten about CRS handling (which works in theory, but needs some explicit tests for some of the corner cases)!

Copy link
Member

@jiayuasu jiayuasu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also add a short paragraph at the bottom of this page? https://sedona.apache.org/latest/tutorial/sql/#convert-between-dataframe-and-spatialrdd

@jiayuasu jiayuasu added this to the sedona-1.7.1 milestone Jan 27, 2025
@github-actions github-actions bot added the docs label Jan 27, 2025
@paleolimbot
Copy link
Member Author

can you also add a short paragraph at the bottom of this page?

I added some documentation to the GeoPandas section (I'll add to the section you linked here for the partitioning PR!)

I'm comfortable with the tests and CRS propagation here! The EWKB bit was a bit of a ride but I think I was able to test the implementation adequately here.

@jiayuasu jiayuasu merged commit 7d32fe0 into apache:master Jan 27, 2025
25 checks passed
@paleolimbot paleolimbot deleted the python-geoarrow-serde branch February 3, 2025 16:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add GeoArrow IO to Sedona/Python
3 participants