Autor

Alejandro Alcalde

Data Scientist and Computer Scientist. Creator of this blog.

Más artículos de Alejandro Alcalde | Porfolio

Índice

Estoy iniciándome en el mundo de spark, ojeando su librería MLlib decidí implementar una simple Regresión lineal. Aunque el ejemplo de su documentación parece sencillo, cuando intentaba leer mi propio csv con datos obtenía el siguiente error:

Column features must be of type org.apache.spark.ml.linalg.VectorUDT

Tras varias horas buscando sin encontrar ninguna solución, al fin conseguí convertir la columna features a VectorUDT, así es como lo hice:

Convertir una columna a VectorUDT en Spark

Primero preparamos el entorno:

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

val df = spark.read.format("csv").
  option("header", "true").
  load(this.getClass.getResource("/generated_data.csv").getPath)

df.printSchema()

El código de arriba simplemente configura una sesión Spark y lee el fichero generated_data.csv. Por último muestra el schema de los datos, el cual es:

root
 |-- ind_var_a: string (nullable = true)
 |-- ind_var_b: string (nullable = true)
 |-- ind_var_c: string (nullable = true)
 |-- ind_var_d: string (nullable = true)
 |-- ind_var_e: string (nullable = true)
 |-- ind_var_f: string (nullable = true)
 |-- ind_var_g: string (nullable = true)
 |-- ind_var_h: string (nullable = true)
 |-- dependent_var: string (nullable = true)

Como puede verse, dependent_var es de tipo String, pero debería ser de tipo VectorUDT, para convertirla se debe usar VectorAssembler:

val schema = new StructType()
 .add("features", new VectorUDT())

val toDouble = udf[Double, String]( _.toDouble)
val df2 = df.withColumn("dependent_var", toDouble(df("dependent_var")))

val assembler = new VectorAssembler().
  setInputCols(Array("dependent_var")).
  setOutputCol("features")

val out = assembler.transform(df2)

df2.printSchema()
out.printSchema()

El código de arriba declara un schema para la columna que vamos a convertir, luego creamos una udf (User Defined Function) para convertir los valores de la columna de String a Double. Por último, se crea un VectorAssembler y se transforma el dataset al nuevo esquema. Para comprobar que de hecho hemos convertido el tipo, se imprime el esquema del dataset out:

root
 |-- ind_var_a: string (nullable = true)
 |-- ind_var_b: string (nullable = true)
 |-- ind_var_c: string (nullable = true)
 |-- ind_var_d: string (nullable = true)
 |-- ind_var_e: string (nullable = true)
 |-- ind_var_f: string (nullable = true)
 |-- ind_var_g: string (nullable = true)
 |-- ind_var_h: string (nullable = true)
 |-- dependent_var: double (nullable = true)
 |-- features: vector (nullable = true)

Código completo

El código completo es el siguiente:

package elbauldelprogramador

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.VectorUDT
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.StructType

object SimpleApp {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    val df = spark.read.format("csv").
      option("header", "true").
      load(this.getClass.getResource("/generated_data.csv").getPath)

    df.printSchema()

    val schema = new StructType()
     .add("features", new VectorUDT())

    val toDouble = udf[Double, String]( _.toDouble)
    val df2 = df.withColumn("dependent_var", toDouble(df("dependent_var")))

    val assembler = new VectorAssembler().
      setInputCols(Array("dependent_var")).
      setOutputCol("features")

    val out = assembler.transform(df2)

    df2.printSchema()
    out.printSchema()

    val lr = new LinearRegression()
      .setMaxIter(10)
      .setRegParam(0.3)
      .setElasticNetParam(0.8)

    // Fit the model
    val lrModel = lr.fit(out)

    spark.close()
  }
}

Otro método

Acabo de descubrir otra forma de conseguir el mismo objetivo:

val df = spark.read.format("csv").
  option("header", "true").
  option("inferSchema", "true").
  load(getClass.getResource("/generated_data.csv").getPath)
val df1 = df.select("dependent_var", "ind_var_d")
val formula = new RFormula().
  setFormula("dependent_var ~ ind_var_d").
  setFeaturesCol("features").
  setLabelCol("label")
val train = formula.fit(df1).transform(df1)

Ahora train tendrá dos columnas adicionales, features y label, siendo features de tipo Vector.

Conclusión

Estoy seguro de que existen formas mejores de lograr esta conversión, pero soy nuevo en spark y esta fue la única que conseguí hacer funcionar. Si sabes de alguna mejor, escríbela en los comentarios!

Referencias

¿Has visto algún error?: Por favor, ayúdame a corregirlo contactando conmigo o comentando abajo.

Categorías:Etiquetas: