多线程—以原子方式读取、更新和保存缓存值

toiithl6  于 2021-06-09  发布在  Hbase
关注(0)|答案(0)|浏览(172)

我有一个多流(n)应该更新相同的缓存。所以,假设至少有n个线程。每个线程可以处理具有相似键的值。问题是,如果我更新如下:

1. Read old value from cache (multiple threads get the same old value)
2. Merge new value with old value (each thread update old value)
3. Save updated value back to the cache (only the last update was saved, another one is lost)

如果多个线程同时尝试更新同一条记录,我可能会丢失一些更新。乍一看,有一种解决方案可以使所有更新都原子化:例如,使用 Increment 糖化血红蛋白或 add 在aerospike(目前,我正在考虑为我的案件这些缓存)。如果value只包含数字基元类型,那么就可以了,因为两个缓存实现都支持原子inc/dec。

1. Inc/dec each value (cache will resolve sequence of this ops by it's self)

但是如果值不仅仅由原语组成呢?然后我必须读取值并在代码中更新它。在这种情况下,我仍然可以丢失一些更新。
如我所写,目前我正在考虑hbase和aerospeike,但两者都不完全适合我的情况。据我所知,在hbase中,无法从客户端锁定行(>~0.98),因此必须使用 checkAndPut 每个复杂类型的操作。在aerospike中,我可以使用lua udfs实现类似于基于行的锁的功能,但我希望避免使用它们。redis允许 watch 记录,如果有来自另一个线程的更新,事务将失败,我可以捕获此错误并重试。
所以,我的问题是如何实现像基于行的锁这样的更新,基于行的锁是一种正确的方法吗?也许还有别的办法?

def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sample")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Duration(500))

    val source = Source()
    val stream = source.stream(ssc)

    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(partition => {
          if (partition.nonEmpty) {
            val cache = Cache()

            partition.foreach(entity=> {
// in this block if 2 distributed workers (in case of apache spark, for example) 
//will process entities with the same keys i can lose one of this update
// worker1 and worker2 will get the same value
               val value = cache.get(entity.key)
// both workers will update this value but may get different results
               val updatedValue = ??? // some non-trivial update depends on entity
// for example, worker1 put new value, then worker2 put new value. In this case only updates from worker2 are visible and updates from worker1 are lost
               cache.put(entity.key, updatedValue)
            })
          }
        })
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

所以,如果我使用kafka作为源代码,如果消息是按键分区的,我可以解决这个问题。在这种情况下,我可以依赖这样一个事实:在任何时间点上,只有一个工人将处理特定的记录。但是当消息被随机分区时(密钥在消息体中),如何处理同样的情况呢?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题