Pregel分布式计算框架

x33g5p2x  于2021-03-14 发布在 Java  
字(2.3k)|赞(0)|评价(0)|浏览(541)
  • Pregel是Google提出的用于大规模分布式图计算框架。可用于:
    • 图遍历(BFS)。
    • 单源最短路径(SSSP)。
    • PageRank计算。
  • Pregel的计算由一系列迭代组成,称为supersteps。
  • Pregel迭代过程。
    (1)每个顶点从上一个superstep接收入站消息。
    (2)计算顶点新的属性值。
    (3)在下一个superstep中向相邻的顶点发送消息。
    (4)当没有剩余消息时,迭代结束。

求下图中顶点的最小值。

Pregel的计算过程如下:

Graphx提供的Pregel API如下:

 class GraphOps[VD, ED](graph : org.apache.spark.graphx.Graph[VD, ED]){

 	def pregel[A](
 				  initialMsg : A, // 在superstep 0之前发送至顶点的初始消息
 				  maxIterations : Int, // 将要执行的最大迭代次数
 				  activeDirection : EdgeDirection // 发送消息方向(默认是出边方向:EdgeDirection.Out)
 				 )
 				 (
 				  vprog : Function3, // 用户定义函数,用于顶点接收消息
 				  sendMsg : Function1, // 用户定义的函数,用于确定下一个迭代发送的消息及发往何处
 				  mergeMsg : Function2[A, A, A] // 用户定义的函数,在vprog前,合并到达顶点的多个消息
 				 ){ }
}

Pregel求下图的最小值。

import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object Pregel {

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getName)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    import spark.implicits._

    // 1. 构建顶点的RDD
    val verts: RDD[(Long, (Int, Int))] = sc.parallelize(Array(
      (1L, (7, -1)),
      (2L, (3, -1)),
      (3L, (2, -1)),
      (4L, (6, -1))
    ))

    // 2. 构建边的RDD
    val edges: RDD[Edge[Boolean]] = sc.parallelize(Array(
      Edge(1L, 2L, true),
      Edge(1L, 4L, true),
      Edge(2L, 4L, true),
      Edge(3L, 1L, true),
      Edge(3L, 4L, true)
    ))

    // 3. 构建图
    val graph: Graph[(Int, Int), Boolean] = Graph(verts, edges)
    graph.triplets.foreach(println)
    // ((1,(7,-1)),(4,(6,-1)),true)
    // ((3,(2,-1)),(1,(7,-1)),true)
    // ((1,(7,-1)),(2,(3,-1)),true)
    // ((2,(3,-1)),(4,(6,-1)),true)
    // ((3,(2,-1)),(4,(6,-1)),true)

    // initialMsg 在superstep 0之前发送至顶点的初始消息
    // maxIterations 将要执行的最大迭代次数
    // activeDirection 发送消息方向(默认是出边方向:EdgeDirection.Out)
    // vprog 用户定义函数,用于顶点接收消息
    // sendMsg 用户定义的函数,用于确定下一个迭代发送的消息及发往何处
    // mergeMsg 用户定义的函数,在vprog前,合并到达顶点的多个消息
    val initialMsg: Int = 9999
    val maxIterations: Int = Int.MaxValue
    
    // vertexId: 当前顶点的Id
    // value: 当前顶点的attr
    // message: 当前顶点要接收的消息
    def vprog(vertexId: VertexId, value: (Int, Int), message: Int) = {
      if (message == initialMsg) value else (message min value._1, value._1)
    }

    def sendMsg(triple: EdgeTriplet[(Int, Int), Boolean]) = {
      val sourceVertex: (Int, Int) = triple.srcAttr
      if (sourceVertex._1 == sourceVertex._2) Iterator.empty
      else Iterator((triple.dstId, sourceVertex._1))
    }

    def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2

    graph.pregel(initialMsg, maxIterations, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
      .vertices.foreach(println)
    // (1,(2,7))
    // (2,(2,3))
    // (3,(2,-1))
    // (4,(2,2))
  }
}

相关文章

微信公众号

最新文章

更多