scala 从数据库中的所有执行器访问Azure KeyVault,可以吗?

kadbb459  于 8个月前  发布在  Scala
关注(0)|答案(1)|浏览(87)

我怀疑这不能开箱即用,想知道一种方法来做到这一点。我正在努力做这件事,但没有成功。到目前为止,我已经尝试过:
基于这个链接,我已经创建了一个Class和一个object(companion和not,两种方式),用于在集群的每个worker中加密文本。它有一些第三方库隐含,所以这就是为什么我不能使用序列化。为了初始化它的值,我从Azure Keyvault中获取了一些值(这似乎适用于驱动程序,但不适用于执行器)。
当我执行声明类和对象的单元格时,它运行正常,我认为这是因为它是这里的驱动程序,它从KeyVault中删除了它的值。但是当用它来加密一个表时,我得到了一个例外。
在我的代码中,我有一个Spark UDF,它使用该类进行加密,如下所示:

val encrypt_long = udf[Long, Long](plain_long => {
  val plain_texted = plain_long.toString
  val plain_bytes = AESFPE.instance.numericAlphabetMapper.convertToIndexes(plain_texted.toCharArray)
  val cipher_chars = AESFPE.instance.numericAlphabetMapper.convertToChars(AESFPE.instance.encrypt(AESFPE.instance.cipher, AESFPE.instance.sKey, AESFPE.instance.tweak, AESFPE.instance.numericRadix, plain_bytes))
  new String(cipher_chars).toLong
}

调用这个UDF,我得到了这个异常:

Job aborted due to stage failure: [FAILED_EXECUTE_UDF] Failed to execute user defined function ($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$9794/149626391: (bigint) => bigint).
Caused by: [FAILED_EXECUTE_UDF] Failed to execute user defined function ($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$9794/149626391: (bigint) => bigint).
Caused by: NullPointerException:

有了这个异常的原因,在我看来,问题在于类的sKey成员的初始化(这是从Azure KeyVault中获取值的成员):

Caused by: java.lang.NullPointerException
    at $line1b09d7456fac420b8b404a7387bae2f8123.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$AESFPE.sKey$lzycompute(command-4335102701464202:16)
    at $line1b09d7456fac420b8b404a7387bae2f8123.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$AESFPE.sKey(command-4335102701464202:16)
    at $line1b09d7456fac420b8b404a7387bae2f8127.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$encrypt_long$1(command-4335102701464206:22)
    at scala.runtime.java8.JFunction1$mcJJ$sp.apply(JFunction1$mcJJ$sp.java:23)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:464)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$3(FileFormatWriter.scala:316)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:142)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:142)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:97)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:904)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1713)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:907)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:761)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

这里需要在cipher类的每个worker中都有一些相同的值,这样它们就可以使用相同的配置进行加密和解密。
作为另一种方法,但不是一个有效的解决方案,我让它在cipher类中硬编码这些配置值。但这是无效的,因为在代码中使用这种配置是不安全的,它必须在像KeyVault这样的地方。使用KeyVault和序列化密码类不是一个选择,因为它有第三方代码,并抛出不可序列化的异常。

vsikbqxv

vsikbqxv1#

我找到了一个解决问题的方法,可以在所有的执行程序中使用KeyVault中的秘密。到目前为止,我只在笔记本上测试了这个,我想在以后的工作中尝试一下。
首先,这里是一个指向官方文档的链接,该文档强调了dbutils API的一些限制,而这正是Databricks推荐用于读取secret的API。
现在的解决方案:
由于不能从execturos中使用dbutils,我遵循以下步骤将KeyVault secret的值带入每个执行器中的对象初始化:
1.在事件变量中引用一个秘密,就像这样。
1.在任何其他单元执行之前,在驱动程序中设置相同变量名称中的值,但在执行器的环境中,如下所示:
spark.conf.set(“spark.executorEnv.anon_pwd”,sys.env(“anon_pwd”))spark.conf.set(“spark.executorEnv.anon_salt”,sys.env(“anon_salt”))spark.conf.set(“spark.executorEnv.anon_tweak”,sys.env(“anon_tweak”))
1.使用UDF函数在每个执行器中使用单例对象。

相关问题