Author

Alejandro Alcalde

Graduado en Ingeniería Informática en la ETSIIT, Granada.

More Alejandro Alcalde's posts

I’m starting to use spark and was reading its documentation for its MLlib library. At first it seemed simple enough to use, but when reading a csv file with the training data I kept getting the below error:

Column features must be of type org.apache.spark.ml.linalg.VectorUDT

After hours of searching how to convert my features column into VectorUDT I finally found the solution. Here is how I did it.

Convert a column to VectorUDT in Spark

First, lets prepare the environment:

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

val df = spark.read.format("csv").
  option("header", "true").
  load(this.getClass.getResource("/generated_data.csv").getPath)

df.printSchema()



The code above just set up a SparkSession and loads the data from the file generated_data.csv. Last it prints the schema of that data, which is:

root
 |-- ind_var_a: string (nullable = true)
 |-- ind_var_b: string (nullable = true)
 |-- ind_var_c: string (nullable = true)
 |-- ind_var_d: string (nullable = true)
 |-- ind_var_e: string (nullable = true)
 |-- ind_var_f: string (nullable = true)
 |-- ind_var_g: string (nullable = true)
 |-- ind_var_h: string (nullable = true)
 |-- dependent_var: string (nullable = true)

As it can be seen, dependent_var’s type is String, it must be VectorUDT. In order to convert it we must use VectorAssembler:

val schema = new StructType()
 .add("features", new VectorUDT())

val toDouble = udf[Double, String]( _.toDouble)
val df2 = df.withColumn("dependent_var", toDouble(df("dependent_var")))

val assembler = new VectorAssembler().
  setInputCols(Array("dependent_var")).
  setOutputCol("features")

val out = assembler.transform(df2)

df2.printSchema()
out.printSchema()

Above a schema for the column is defined, which would be of VectorUDT type, then a udf (User Defined Function) is created in order to convert its values from String to Double. Last, a VectorAssembler is created and the dataframe is transformed to the new Scheme. By printing the schema of out we see that the type now its the correct:

root
 |-- ind_var_a: string (nullable = true)
 |-- ind_var_b: string (nullable = true)
 |-- ind_var_c: string (nullable = true)
 |-- ind_var_d: string (nullable = true)
 |-- ind_var_e: string (nullable = true)
 |-- ind_var_f: string (nullable = true)
 |-- ind_var_g: string (nullable = true)
 |-- ind_var_h: string (nullable = true)
 |-- dependent_var: double (nullable = true)
 |-- features: vector (nullable = true)

Full code

The entire code is below:

package elbauldelprogramador

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.mllib.linalg.VectorUDT
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.StructType

object SimpleApp {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.
      master("local")
      .appName("spark session example")
      .getOrCreate()

    val df = spark.read.format("csv").
      option("header", "true").
      load(this.getClass.getResource("/generated_data.csv").getPath)

    df.printSchema()

    val schema = new StructType()
     .add("features", new VectorUDT())

    val toDouble = udf[Double, String]( _.toDouble)
    val df2 = df.withColumn("dependent_var", toDouble(df("dependent_var")))

    val assembler = new VectorAssembler().
      setInputCols(Array("dependent_var")).
      setOutputCol("features")

    val out = assembler.transform(df2)

    df2.printSchema()
    out.printSchema()

    val lr = new LinearRegression()
      .setMaxIter(10)
      .setRegParam(0.3)
      .setElasticNetParam(0.8)

    // Fit the model
    val lrModel = lr.fit(out)

    spark.close()
  }
}

Other method

I’ve just discovered other way of achieving the same result:

val df = spark.read.format("csv").
  option("header", "true").
  option("inferSchema", "true").
  load(getClass.getResource("/generated_data.csv").getPath)
val df1 = df.select("dependent_var", "ind_var_d")
val formula = new RFormula().
  setFormula("dependent_var ~ ind_var_d").
  setFeaturesCol("features").
  setLabelCol("label")
val train = formula.fit(df1).transform(df1)

Now, train will have two additional columns, features and label, and features’s type would be Vector.

End notes

I am sure there is a better and cleaner way of doing this, but as I am just a beginner with spark that did the trick for me. If you know a better solution, write it in the comments!

References

Categories:Tags: