通过aws lambda使用python代码的emr spark作业

5us2dqdw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(376)

我想在触发s3事件后,通过aws lambda用python代码触发emr spark作业。如果有人能共享配置/命令,从aws lambda函数调用emr spark作业,我将不胜感激。

tsm1rwdh

tsm1rwdh1#

因为这个问题非常一般,所以我将尝试给出一个示例代码。您必须根据实际值更改某些参数。
通常的做法是将主处理函数放在一个名为 lambda_handler.py 以及emr的所有配置和步骤 emr_configuration_and_steps.py .
请检查下面的代码片段 lambda_handler.py ```
import boto3
import emr_configuration_and_steps
import logging
import traceback

logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(levelname)s:%(name)s:%(message)s')

def create_emr(name):
try:
emr = boto3.client('emr')
cluster_id = emr.run_job_flow(
Name=name,
VisibleToAllUsers=emr_configuration_and_steps.visible_to_all_users,
LogUri=emr_configuration_and_steps.log_uri,
ReleaseLabel=emr_configuration_and_steps.release_label,
Applications=emr_configuration_and_steps.applications,
Tags=emr_configuration_and_steps.tags,
Instances=emr_configuration_and_steps.instances,
Steps=emr_configuration_and_steps.steps,
Configurations=emr_configuration_and_steps.configurations,
ScaleDownBehavior=emr_configuration_and_steps.scale_down_behavior,
ServiceRole=emr_configuration_and_steps.service_role,
JobFlowRole=emr_configuration_and_steps.job_flow_role
)
logger.info("EMR is created successfully")
return cluster_id['JobFlowId']
except Exception as e:
traceback.print_exc()
raise Exception(e)

def lambda_handler(event, context):
logger.info("starting the lambda function for spawning EMR")
try:
emr_cluster_id = create_emr('Name of Your EMR')
logger.info("emr_cluster_id is = " + emr_cluster_id)
except Exception as e:
logger.error("Exception at some step in the process " + str(e))

现在是第二个文件( `emr_configuration_and_steps.py` )所有的配置都是这样的。

visible_to_all_users = True
log_uri = 's3://your-s3-log-path-here/'
release_label = 'emr-5.29.0'
applications = [{'Name': 'Spark'}, {'Name': 'Hadoop'}]
tags = [
{'Key': 'Project', 'Value': 'Your-Project Name'},
{'Key': 'Service', 'Value': 'Your-Service Name'},
{'Key': 'Environment', 'Value': 'Development'}
]

instances = {
'Ec2KeyName': 'Your-key-name',
'Ec2SubnetId': 'your-subnet-name',
'InstanceFleets': [
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"TargetSpotCapacity": 0,
"InstanceTypeConfigs": [
{
"WeightedCapacity": 1,
"BidPriceAsPercentageOfOnDemandPrice": 100,
"InstanceType": "m3.xlarge"
}
],
"Name": "Master Node"
},
{
"InstanceFleetType": "CORE",
"TargetSpotCapacity": 8,
"InstanceTypeConfigs": [
{
"WeightedCapacity": 8,
"BidPriceAsPercentageOfOnDemandPrice": 50,
"InstanceType": "m3.xlarge"
}
],
"Name": "Core Node"
},

],
'KeepJobFlowAliveWhenNoSteps': False

}
steps = [
{
'Name': 'Setup Hadoop Debugging',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['state-pusher-script']
}
},
{
"Name": "Active Marker for digital panel",
"ActionOnFailure": 'TERMINATE_CLUSTER',
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"--driver-memory", "4g",
"--executor-memory", "4g",
"--executor-cores", "2",
"--class", "your-main-class-full-path-name",
"s3://your-jar-path-SNAPSHOT-jar-with-dependencies.jar"
]
}

}

]

configurations = [
{
"Classification": "spark-log4j",
"Properties": {
"log4j.logger.root": "INFO",
"log4j.logger.org": "INFO",
"log4j.logger.com": "INFO"
}
}
]
scale_down_behavior = 'TERMINATE_AT_TASK_COMPLETION'
service_role = 'EMR_DefaultRole'
job_flow_role = 'EMR_EC2_DefaultRole'

请根据您的用例调整特定的路径和名称。要部署它,您需要安装boto3并将这两个文件打包/zip到一个zip文件中,然后将其上载到lambda函数。这样你就可以产生电子病历了。

相关问题