Índice
Estoy iniciándome en el mundo de spark
, ojeando su librería MLlib
decidí implementar una simple Regresión lineal. Aunque el ejemplo de su documentación parece sencillo, cuando intentaba leer mi propio csv con datos obtenía el siguiente error:
Column features must be of type org.apache.spark.ml.linalg.VectorUDT
Tras varias horas buscando sin encontrar ninguna solución, al fin conseguí convertir la columna features
a VectorUDT
, así es como lo hice:
Convertir una columna a VectorUDT en Spark
Primero preparamos el entorno:
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
val df = spark.read.format("csv").
option("header", "true").
load(this.getClass.getResource("/generated_data.csv").getPath)
df.printSchema()
El código de arriba simplemente configura una sesión Spark
y lee el fichero generated_data.csv
. Por último muestra el schema
de los datos, el cual es:
root
|-- ind_var_a: string (nullable = true)
|-- ind_var_b: string (nullable = true)
|-- ind_var_c: string (nullable = true)
|-- ind_var_d: string (nullable = true)
|-- ind_var_e: string (nullable = true)
|-- ind_var_f: string (nullable = true)
|-- ind_var_g: string (nullable = true)
|-- ind_var_h: string (nullable = true)
|-- dependent_var: string (nullable = true)
Como puede verse, dependent_var
es de tipo String
, pero debería ser de tipo VectorUDT
, para convertirla se debe usar VectorAssembler
:
val schema = new StructType()
.add("features", new VectorUDT())
val toDouble = udf[Double, String]( _.toDouble)
val df2 = df.withColumn("dependent_var", toDouble(df("dependent_var")))
val assembler = new VectorAssembler().
setInputCols(Array("dependent_var")).
setOutputCol("features")
val out = assembler.transform(df2)
df2.printSchema()
out.printSchema()
El código de arriba declara un schema
para la columna que vamos a convertir, luego creamos una udf
(User Defined Function) para convertir los valores de la columna de String
a Double
. Por último, se crea un VectorAssembler
y se transforma el dataset al nuevo esquema. Para comprobar que de hecho hemos convertido el tipo, se imprime el esquema del dataset out
:
root
|-- ind_var_a: string (nullable = true)
|-- ind_var_b: string (nullable = true)
|-- ind_var_c: string (nullable = true)
|-- ind_var_d: string (nullable = true)
|-- ind_var_e: string (nullable = true)
|-- ind_var_f: string (nullable = true)
|-- ind_var_g: string (nullable = true)
|-- ind_var_h: string (nullable = true)
|-- dependent_var: double (nullable = true)
|-- features: vector (nullable = true)
Código completo
El código completo es el siguiente:
package elbauldelprogramador
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.VectorUDT
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.StructType
object SimpleApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
val df = spark.read.format("csv").
option("header", "true").
load(this.getClass.getResource("/generated_data.csv").getPath)
df.printSchema()
val schema = new StructType()
.add("features", new VectorUDT())
val toDouble = udf[Double, String]( _.toDouble)
val df2 = df.withColumn("dependent_var", toDouble(df("dependent_var")))
val assembler = new VectorAssembler().
setInputCols(Array("dependent_var")).
setOutputCol("features")
val out = assembler.transform(df2)
df2.printSchema()
out.printSchema()
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// Fit the model
val lrModel = lr.fit(out)
spark.close()
}
}
Otro método
Acabo de descubrir otra forma de conseguir el mismo objetivo:
val df = spark.read.format("csv").
option("header", "true").
option("inferSchema", "true").
load(getClass.getResource("/generated_data.csv").getPath)
val df1 = df.select("dependent_var", "ind_var_d")
val formula = new RFormula().
setFormula("dependent_var ~ ind_var_d").
setFeaturesCol("features").
setLabelCol("label")
val train = formula.fit(df1).transform(df1)
Ahora train
tendrá dos columnas adicionales, features
y label
, siendo features
de tipo Vector
.
Conclusión
Estoy seguro de que existen formas mejores de lograr esta conversión, pero soy nuevo en spark
y esta fue la única que conseguí hacer funcionar. Si sabes de alguna mejor, escríbela en los comentarios!
Referencias
- VectorAssembler | spark.apache.org
- El dataset es el sacado del libro Test Driven Machine Learning
¿Has visto algún error?: Por favor, ayúdame a corregirlo contactando conmigo o comentando abajo.