如何在aws athena中自动执行msck修复表

f3temu5u  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(397)

我有一个Spark批工作是每小时执行。每次运行都会在中生成和存储新数据 S3 使用目录命名模式 DATA/YEAR=?/MONTH=?/DATE=?/datafile .
将数据上载到 S3 ,我想用 Athena . 另外,我想把它们形象化 QuickSight 通过连接雅典娜作为数据源。
问题是,每次运行spark批处理之后,新生成的数据都存储在 S3 不会被雅典娜发现,除非我手动运行查询 MSCK REPAIR TABLE .
有没有办法让雅典娜自动更新数据,这样我就可以创建一个全自动的数据可视化管道?

sczxawaw

sczxawaw1#

有许多方法可以安排此任务。如何安排工作流程?您使用的系统是airflow、luigi、azkaban、cron还是aws数据管道?
从这些命令中,您应该能够启动以下cli命令。 $ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE" 另一个选择是aws lambda。你可以有一个函数 MSCK REPAIR TABLE some_database.some_table 作为对s3新上传的回应。
示例lambda函数可以这样编写:

import boto3

def lambda_handler(event, context):
    bucket_name = 'some_bucket'

    client = boto3.client('athena')

    config = {
        'OutputLocation': 's3://' + bucket_name + '/',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}

    }

    # Query Execution Parameters
    sql = 'MSCK REPAIR TABLE some_database.some_table'
    context = {'Database': 'some_database'}

    client.start_query_execution(QueryString = sql, 
                                 QueryExecutionContext = context,
                                 ResultConfiguration = config)

然后,您将配置一个触发器,以便在将新数据添加到 DATA/ 在你的桶里。
最终,在使用作业调度器运行spark作业之后显式地重建分区具有自文档化的优势。另一方面,aws lambda对这样的工作很方便。

guicsvcw

guicsvcw2#

你应该跑步 ADD PARTITION 取而代之的是:

aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."

它在新创建的分区中添加一个 S3 位置athena利用hive对数据进行分区。要创建带有分区的表,必须在 CREATE TABLE 声明。使用 PARTITIONED BY 定义用于划分数据的键。

相关问题