前两篇,一篇说在 spark 里用 SQL 方便,一篇说 updatestateByKey 可以保留状态做推算。那么怎么综合起来呢?目前看到的 spark streaming 和 spark SQL 的示例全都是在 output 阶段的 foreachRDD 里才调用 SQL。实际在 output 之前,也是可以对 DStream 里的 RDD 做复杂的转换操作的,这就是 transform 方法。

通过 transform 方法,可以做到 SQL 请求的结果依然是 DStream 数据,这样就可以使用 updateStateByKey 方法了。下面是示例:

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object LogStash {

  case class Status(avg:Double = 0.0, count:Int = 0) {
    var countTrend = 0.0
    var avgTrend = 0.0
    def %(prev:Status): Status = {
      if (prev.count > 0) {
        this.countTrend = (this.count - prev.count).toDouble / prev.count
      }
      if (prev.avg > 0) {
        this.avgTrend = (this.avg - prev.avg) / prev.avg
      }
      this
    }
    override def toString = {
      s"Trend($avg, $count, $avgTrend, $countTrend)"
    }
  }

  def updatestatefunc(newValue: Seq[Status], oldValue: Option[Status]): Option[Status] = {
    val prev = oldValue.getOrElse(Status())
    val current = if (newValue.size > 0) newValue.last % prev else Status()
    Some(current)
  }

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("LogStash")
    val sc  = new SparkContext(sparkConf)

    val ssc = new StreamingContext(sc, Seconds(10))
    ssc.checkpoint("/tmp/spark-streaming-logstash")

    val sqc = new SQLContext(sc)
    import sqc._

    val lines = ssc.socketTextStream("localhost", 8888)

    lines.transform( rdd => {
      if (rdd.count > 0) {
        sqc.jsonRDD(rdd).registerTempTable("logstash")
        val sqlreport = sqc.sql("SELECT message, COUNT(message) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
        sqlreport.map(r => (r(0).toString -> Status(r(2).toString.toDouble, r(1).toString.toInt)))
      } else {
        rdd.map(l => ("" -> Status()))
      }
    }).updateStateByKey(updatestatefunc).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

这里有一点需要注意,也是耽误我时间最多的地方:transform 方法的参数和返回,代码里的定义是 RDD[T]RDD[U]。我不懂 Java/Scala,以为是只要是 RDD 对象即可。实践证明,其实要任意场合下返回的 RDD 里的数据类型也保持一致。

在上例中,就是 if 条件下返回的是 RDD[(String, Status)],那么 else 条件下,也必须返回一个 RDD[(String, Status)],如果直接返回原始的 rdd(也就是 RDD[String]),就会报错。