In this new era of information, geospatial data is becoming more and more relevant in the Data Engineering and Data Analytics work… but what do we mean when we talk about Geospatial Data?
In this post, I would like to introduce some basic concepts about Geospatial Processing using Spark, one of the most popular data processing frameworks of 2020.
Let’s start with the basics, what is geospatial data?
Geospatial data is about objects, events or phenomena that have a location on the surface of the Earth (let’s not worry about other planets yet… Sorry, Mars Rover)
And what does this mean in terms of data? Well… We can represent Geospatial data with just a String, and the two most popular formats for doing so are:
- GeoJson: { “geometry”: { “type”: “Polygon”, “coordinates”: [ [ [100.0, 0.0], [101.0, 0.0],[101.0, 1.0],[100.0, 1.0],[100.0, 0.0]] ]} }
- Well-Known Text (WKT): POLYGON ((100.0 0.0, 101.0 0.0, 101.0 1.0, 100.0 1.0, 100.0 0.0))
The examples above are representing this polygon:
But data as a String is not very useful because we would like to use some spatial operators on them, and doing that with a String can be really hard… hence why we use something called a Geometry Type. This data type, which is not available in every system, allows to use spatial operators like Containts, Insersect, and many more:
Of course, we can do that without any Geometry Type, but it’s much easier to do Intersection of (Polygon A and Polygon B), than to code all the math behind that operation.
So… How can I do all of this in Spark?
Spark doesn’t have a Geometry Type embedded, but luckily there are some people out there that have done, and continue doing, that work for us (Thanks open source community!)… So there are a few choices on how to do this:
- Use a third-party library: there are a few options available like GeoSpark, GeoMesa… this is suitable if you can find the transformations that you need there… So don’t reinvent the wheel and use what other people have already developed and tested.
- Wrap an existing core library: if the available Spark libraries don’t fit your requirements, you can go one level deeper and wrap one of the existing geospatial implementations in Spark. Normally JTS is the most low-level library that you would find. All the others use this one as a starting point. It only provides the data types and some basic operations between geometries. It’s a java library, so if you want to use it with Spark you need to adapt it yourself.
- Implement everything for scratch: this will probably require years of work…
In this introduction I’m going to show the basics of GeoSpark …and we might go deeper in an upcoming post :)
Assuming that you’re working on a Databricks environment, using GeoSpark is straightforward. You just need to add two Maven libraries to your cluster and that’s it:
Once you have the libraries installed, you can open a notebook and start using those geospatial functions! It’s seriously that simple.
Let’s start by defining a table with some geospatial data: (I will be copying the code of the cells here and adding a screenshot of the result)
// Create a new DataFrame with a column representing geospatial data as strings
val data = Seq(("ID 1", "POLYGON ((0.0 0.0, 10.0 0.0, 10.0 10.0, 0.0 10.0, 0.0 0.0))"),
("ID 2", "POLYGON ((5.0 5.0, 15.0 5.0, 15.0 15.0, 5.0 15.0, 5.0 5.0))"),
("ID 3", "POLYGON ((16.0 0.0, 20.0 0.0, 20.0 5.0, 16.0 5.0, 16.0 0.0))")).toDF("ID", "geometry")
display(data)
These rows represent some polygons:
In order to use the UDFs that GeoSpark provides, we can run the following:
import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
GeoSparkSQLRegistrator.registerAll(spark)
We need to convert our strings in a proper geometry type, for that we need to do the following:
val data2 = data.selectExpr("ID", "ST_GeomFromWKT(geometry) as geometry")
// Checking the schema
data2.printSchema
So now our geometry column is of type geometry:
We are ready for running some spatial operations… But to make this easier, let’s create a temporary table so we can run SQL queries.
data2.createOrReplaceTempView("GeoTable")
Let’s get the area of each polygon:
SELECT *, ST_Area(geometry) AS area FROM GeoTable
Let’s get the Centroid of each polygon:
SELECT *, ST_Centroid (geometry) AS centroid FROM GeoTable
Those functions, the ones that you apply directly to the geometry column are called just Functions.
There are other types, like Predicates, that allows you to check conditions between different polygons.
To use them, you need to have at least two geometries, so let’s get to that situation by creating a new table:
val data3 = Seq(("ID 1", "POLYGON ((0.0 0.0, 10.0 0.0, 10.0 10.0, 0.0 10.0, 0.0 0.0))", "ID 2", "POLYGON ((5.0 5.0, 15.0 5.0, 15.0 15.0, 5.0 15.0, 5.0 5.0))"),
("ID 1", "POLYGON ((0.0 0.0, 10.0 0.0, 10.0 10.0, 0.0 10.0, 0.0 0.0))", "ID 3", "POLYGON ((16.0 0.0, 20.0 0.0, 20.0 5.0, 16.0 5.0, 16.0 0.0))"),
("ID 2", "POLYGON ((5.0 5.0, 15.0 5.0, 15.0 15.0, 5.0 15.0, 5.0 5.0))", "ID 1", "POLYGON ((0.0 0.0, 10.0 0.0, 10.0 10.0, 0.0 10.0, 0.0 0.0))"),
("ID 2", "POLYGON ((5.0 5.0, 15.0 5.0, 15.0 15.0, 5.0 15.0, 5.0 5.0))", "ID 3", "POLYGON ((16.0 0.0, 20.0 0.0, 20.0 5.0, 16.0 5.0, 16.0 0.0))")
).toDF("ID1", "geometry1", "ID2", "geometry2")
val data4 = data3.selectExpr("ID1", "ST_GeomFromWKT(geometry1) as geometry1", "ID2", "ST_GeomFromWKT(geometry2) as geometry2")
data4.createOrReplaceTempView("GeoTable2")
With this new DataFrame we can run a few Spatial Queries to check for some conditions, like for example:
- Checking if two geometries intersect each other.
- Checking if the geometry is within other.
SELECT ID1,
ID2,
ST_Intersects(geometry1, geometry2)AS Intersects,
ST_Within(geometry1, geometry2)AS Within
FROM GeoTable2
You can see that using Geospark can be really easy thanks to its SQL API.
In future posts, I will go deeper into more geospatial processing.
Thanks for reading!