TLDR: As of version 2.2.0, Spark’s VectorAssembler implementation cannot be used with VectorUDT in streaming contexts. This is because VectorAssembler uses the first row in the dataset to look up metadata, which is an unsupported operation while streaming. I’ve included a sample UDF at the bottom of the post that can be used instead of VectorAssembler until the issue is fixed. For more information, see this Spark JIRA issue.


I recently encoutered the following intimidating exception while hacking on Spectrum’s ML pipeline:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
textSocket
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        // ...snip a bunch of superfluous lines...
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2160)
        at org.apache.spark.sql.Dataset.first(Dataset.scala:2167)
        at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
        at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
        at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
        at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
        at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
        at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
        at com.spectrumio.data.models.toxicity.VectorAssemblerBug$.main(VectorAssemblerBug.scala:34)
        at com.spectrumio.data.models.toxicity.VectorAssemblerBug.main(VectorAssemblerBug.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Note that the above stack trace is taken from a job created solely to reproduce the issue, not our production streaming job. See the appendix for its source.

Numerous people on Stack Overflow posted similar exceptions, however surprisingly none of the accepted solutions solved my particular problem. So, I fired up my debugger and began painstakingly stepping through the Spark codebase in order to find the cause of the exception.

Big Endian Data has a great tutorial on configuring Spark for remote debugging in IntelliJ.

As the stack trace implies, the VectorAssembler stage of the pipeline was the culprit. VectorAssemblers, for the unfamiliar, perform column-wise concatenation of feature vectors prior to passing them into a classification algorithm. Interestingly, Spark’s VectorAssembler calls first() for each inputted dataset, which is unsupported in streaming contexts. The following snippet shows why:

override def transform(dataset: Dataset[_]): DataFrame = {
  // Schema transformation.
  val schema = dataset.schema
  lazy val first = dataset.toDF.first()
  val attrs = $(inputCols).flatMap { c =>
    val field = schema(c)
    val index = schema.fieldIndex(c)
    field.dataType match {
      // ...unrelated cases elided
      case _: VectorUDT =>
        val group = AttributeGroup.fromStructField(field)
        if (group.attributes.isDefined) {
          // If attributes are defined, copy them with updated names.
          group.attributes.get.zipWithIndex.map { case (attr, i) =>
            if (attr.name.isDefined) {
              // TODO: Define a rigorous naming scheme.
              attr.withName(c + "_" + attr.name.get)
            } else {
              attr.withName(c + "_" + i)
            }
          }
        } else {
          // Otherwise, treat all attributes as numeric. If we cannot get the number of attributes
          // from metadata, check the first row.
          val numAttrs = group.numAttributes.getOrElse(first.getAs[Vector](index).size)
          Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName(c + "_" + i))
        }
      // unrelated cases elided
    }
  }

  // ...rest of the implementation
}

Here, we see Spark is trying to rename the vector’s ML attributes based on their indices. If there are no attributes defined, Spark assigns each column a NumericAttribute named after its index in the vector. However, those column names are populated using Scala’s tabulate method, which requires an array length. Since the array length in this case is calculated using the number of columns in the first row of the dataset, the lazy val first = dataset.toDF.first() expression is evaluated and the exception is thrown.

This issue can be worked around. If you’re concatenating VectorUDT dataframes - which you likely are if you’re using any of Spark’s default feature extraction classes - you can extract the underlying UDF that VectorAssembler uses and use it instead:

val assemblerUdf = udf((vv: Any*) => {
  val indices = ArrayBuilder.make[Int]
  val values = ArrayBuilder.make[Double]
  var cur = 0

  vv.foreach {
    case vec: Vector =>
      vec.foreachActive { case (i, v) =>
        if (v != 0.0) {
          indices += cur + i
          values += v
        }
      }
      cur += vec.size
    case null =>
      throw new SparkException("Values to assemble cannot be null.")
    case o =>
      throw new SparkException(s"$o of type ${o.getClass.getName} is not supported.")
  }

  Vectors.sparse(cur, indices.result(), values.result()).compressed
})

You shouldn’t be affected by this issue if you’re concatenating non-VectorUDT dataframes, so the above UDF does not include the original logic that handles DoubleType columns. It should be easy enough to add back in if you need it.

The Spark team is aware of this issue. It took me several hours of searching, but eventually I found SPARK-21926, which tracks this issue as well as some others involving Transformers and streaming dataframes. When I have the chance, I’ll see if I can contribute a fix to Spark core.

As always, I’d love to hear your feedback on this post. Shoot me an e-mail at inquiries@matthewslipper.com and I’ll be sure to get back to you.

Appendix

If you’re interested, here’s a quick-and-dirty Spark job that reproduces the issue consistently:

import org.apache.spark.SparkException
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

import scala.collection.mutable.ArrayBuilder

object VectorAssemblerBug {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .appName("VectorAssemblerBug")
      .getOrCreate

    import spark.implicits._

    val counter = udf((s: String) => Vectors.dense(s.split(" ").length))

    val assembler = new VectorAssembler()
      .setInputCols(Array("count"))
      .setOutputCol("assembled")
	
    // fit pipeline to empty data to get a PipelineModel - VectorAssembler doesn't need training
    val pipeline = new Pipeline()
      .setStages(Array(assembler))
      .fit(Seq.empty[Int].toDF("count"))

    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    val wordCounts = pipeline.transform(lines.withColumn("count", counter(col("value"))))

    val query = wordCounts.writeStream
      .outputMode("append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}