Title: | A 'sparklyr' Extension for Nested Data |
Version: | 0.0.4 |
Maintainer: | Matt Pollock <mpollock@mitre.org> |
Description: | A 'sparklyr' extension adding the capability to work easily with nested data. |
Depends: | R (≥ 3.3) |
Imports: | sparklyr, jsonlite, listviewer, dplyr, rlang, purrr, tidyselect |
Suggests: | testthat, reactR |
License: | Apache License 2.0 | file LICENSE |
SystemRequirements: | Spark: 1.6.x or 2.x |
Encoding: | UTF-8 |
RoxygenNote: | 7.2.3 |
BugReports: | https://github.com/mitre/sparklyr.nested/issues |
NeedsCompilation: | no |
Packaged: | 2023-02-20 21:39:05 UTC; frodo |
Author: | Matt Pollock [aut, cre], The MITRE Corporation [cph] |
Repository: | CRAN |
Date/Publication: | 2023-02-20 22:00:03 UTC |
Explode data along a column
Description
Exploding an array column of length N
will replicate the top level record N
times.
The i^th replicated record will contain a struct (not an array) corresponding to the i^th element
of the exploded array. Exploding will not promote any fields or otherwise change the schema of
the data.
Usage
sdf_explode(x, column, is_map = FALSE, keep_all = FALSE)
Arguments
x |
An object (usually a |
column |
The field to explode |
is_map |
Logical. The (scala) |
keep_all |
Logical. If |
Details
Two types of exploding are possible. The default method calls the scala explode
method.
This operation is supported in both Spark version > 1.6. It will however drop records where the
exploding field is empty/null. Alternatively keep_all=TRUE
will use the explode_outer
scala method introduced in spark 2 to not drop any records.
Examples
## Not run:
# first get some nested data
iris_tbl <- copy_to(sc, iris, name="iris")
iris_nst <- iris_tbl %>%
sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") %>%
group_by(Species) %>%
summarize(data=collect_list(data))
# then explode it
iris_nst %>% sdf_explode(data)
## End(Not run)
Nest data in a Spark Dataframe
Description
This function is like tidyr::nest
. Calling this function will not
aggregate over other columns. Rather the output has the same number of
rows/records as the input. See examples of how to achieve row reduction
by aggregating elements using collect_list
, which is a Spark SQL function
Usage
sdf_nest(x, ..., .key = "data")
Arguments
x |
A Spark dataframe. |
... |
Columns to nest. |
.key |
Character. A name for the new column containing nested fields |
Examples
## Not run:
# produces a dataframe with an array of characteristics nested under
# each unique species identifier
iris_tbl <- copy_to(sc, iris, name="iris")
iris_tbl %>%
sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") %>%
group_by(Species) %>%
summarize(data=collect_list(data))
## End(Not run)
Work with the schema
Description
These functions support flexible schema inspection both algorithmically and in human-friendly ways.
Usage
sdf_schema_json(
x,
parse_json = TRUE,
simplify = FALSE,
append_complex_type = TRUE
)
sdf_schema_viewer(
x,
simplify = TRUE,
append_complex_type = TRUE,
use_react = FALSE
)
Arguments
x |
An |
parse_json |
Logical. If |
simplify |
Logical. If |
append_complex_type |
Logical. This only matters if |
use_react |
Logical. If |
See Also
Examples
## Not run:
library(testthat)
library(jsonlite)
library(sparklyr)
library(sparklyr.nested)
sample_json <- paste0(
'{"aircraft_id":["string"],"phase_sequence":["string"],"phases (array)":{"start_point (struct)":',
'{"segment_phase":["string"],"agl":["double"],"elevation":["double"],"time":["long"],',
'"latitude":["double"],"longitude":["double"],"altitude":["double"],"course":["double"],',
'"speed":["double"],"source_point_keys (array)":["[string]"],"primary_key":["string"]},',
'"end_point (struct)":{"segment_phase":["string"],"agl":["double"],"elevation":["double"],',
'"time":["long"],"latitude":["double"],"longitude":["double"],"altitude":["double"],',
'"course":["double"],"speed":["double"],"source_point_keys (array)":["[string]"],',
'"primary_key":["string"]},"phase":["string"],"primary_key":["string"]},"primary_key":["string"]}'
)
with_mock(
# I am mocking functions so that the example works without a real spark connection
spark_read_parquet = function(x, ...){return("this is a spark dataframe")},
sdf_schema_json = function(x, ...){return(fromJSON(sample_json))},
spark_connect = function(...){return("this is a spark connection")},
# the meat of the example is here
sc <- spark_connect(),
spark_data <- spark_read_parquet(sc, path="path/to/data/*.parquet", name="some_name"),
sdf_schema_viewer(spark_data)
)
## End(Not run)
Select nested items
Description
The select
function works well for keeping/dropping top level fields. It does not
however support access to nested data. This function will accept complex field names
such as x.y.z
where z
is a field nested within y
which is in turn
nested within x
. Since R uses "$" to access nested elements and java/scala use ".",
sdf_select(data, x.y.z)
and sdf_select(data, x$y$z)
are equivalent.
Usage
sdf_select(x, ..., .aliases, .drop_parents = TRUE, .full_name = FALSE)
Arguments
x |
An object (usually a |
... |
Fields to select |
.aliases |
Character. Optional. If provided these names will be matched positionally with
selected fields provided in |
.drop_parents |
Logical. If |
.full_name |
Logical. If |
Selection Helpers
dplyr
allows the use of selection helpers (e.g., see everything
).
These helpers only work for top level fields however. For now all nested fields that should
be promoted need to be explicitly identified.
Examples
## Not run:
# produces a dataframe with an array of characteristics nested under
# each unique species identifier
iris_tbl <- copy_to(sc, iris, name="iris")
iris_nst <- iris_tbl %>%
sdf_nest(Sepal_Length, Sepal_Width, .key="Sepal")
# using java-like dot-notation
iris_nst %>%
sdf_select(Species, Petal_Width, Sepal.Sepal_Width)
# using R-like dollar-sign-notation
iris_nst %>%
sdf_select(Species, Petal_Width, Sepal$Sepal_Width)
# using dplyr selection helpers
iris_nst %>%
sdf_select(Species, matches("Petal"), Sepal$Sepal_Width)
## End(Not run)
Unnest data along a column
Description
Unnesting is an (optional) explode operation coupled with a nested select to promote the sub-fields of
the exploded top level array/map/struct to the top level. Hence, given a
, an array with fields
a1, a2, a3
, then codesdf_explode(df, a) will produce output with each record replicated
for every element in the a
array and with the fields a1, a2, a3
(but not a
)
at the top level. Similar to tidyr::unnest
.
Usage
sdf_unnest(x, column, keep_all = FALSE)
Arguments
x |
An object (usually a |
column |
The field to explode |
keep_all |
Logical. If |
Details
Note that this is a less precise tool than using sdf_explode
and sdf_select
directly because all fields of the exploded array will be kept and promoted. Direct calls to these
methods allows for more targeted use of sdf_select
to promote only those fields that
are wanted to the top level of the data frame.
Additionally, though sdf_select
allows users to reach arbitrarily far into a nested
structure, this function will only reach one layer deep. It may well be that the unnested fields
are themselves nested structures that need to be dealt with accordingly.
Note that map types are supported, but there is no is_map
argument. This is because the
function is doing schema interrogation of the input data anyway to determine whether an explode
operation is required (it is of maps and arrays, but not for bare structs). Given this the result
of the schema interrogation drives the value o is_map
provided to sdf_explode
.
Examples
## Not run:
# first get some nested data
iris_tbl <- copy_to(sc, iris, name="iris")
iris_nst <- iris_tbl %>%
sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") %>%
group_by(Species) %>%
summarize(data=collect_list(data))
# then explode it
iris_nst %>% sdf_unnest(data)
## End(Not run)
Spark Data Types
Description
These function support supplying a spark read schema. This is particularly useful when reading data with nested arrays when you are not interested in several of the nested fields.
Usage
struct_type(sc, struct_fields)
struct_field(sc, name, data_type, nullable = FALSE)
array_type(sc, data_type, nullable = FALSE)
binary_type(sc)
boolean_type(sc)
byte_type(sc)
date_type(sc)
double_type(sc)
float_type(sc)
integer_type(sc)
numeric_type(sc)
long_type(sc)
map_type(sc, key_type, value_type, nullable = FALSE)
string_type(sc)
character_type(sc)
timestamp_type(sc)
Arguments
sc |
A |
struct_fields |
A vector or fields obtained from |
name |
A field name to use in the output struct type |
data_type |
A (java) data type (e.g., |
nullable |
Logical. Describes whether field can be missing for some rows. |
key_type |
A (java) data type describing the map keys (usually |
value_type |
A (java) data type describing the map values |