无法从aws kinesis接收数据

wmtdaxz3  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(299)

我使用以下代码构建flink kinesis连接器:

public class DemoKinesisCA {
    public static void main(String[] args) throws Exception {
        ParameterTool pt = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.setParallelism(1);

        Properties consumerConfig = new Properties();
        consumerConfig.put(AWSConfigConstants.AWS_REGION, "cn-north-1");
        consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "${my_key}");
        consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "${my_secret_key}");
        consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

        DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
                "${my_stream}",
                new SimpleStringSchema(),
                consumerConfig));

        kinesis.print();

        see.execute();
    }
}

代码运行正常,日志中未发现异常。我已将初始位置设置为 trim_horizon 这意味着使用最早的数据,但我可以从kinesis接收0字节。我被告知这不是一条空流,所以有什么不对。。。
感谢您的帮助。
日志如下所示:

2019-12-20 09:21:22,814 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Receive slot request 00b96cfdd8c98b09635926e3643210f3 for job c5449594afa59fde3826b82d6034a71b from resource manager with leader id 00000000000000000000000000000000.
2019-12-20 09:21:22,815 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated slot for 00b96cfdd8c98b09635926e3643210f3.
2019-12-20 09:21:22,815 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Add job c5449594afa59fde3826b82d6034a71b for job leader monitoring.
2019-12-20 09:21:22,815 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Try to register at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 with leader id 00000000-0000-0000-0000-000000000000.
2019-12-20 09:21:22,819 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Resolved JobManager address, beginning registration
2019-12-20 09:21:22,819 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Registration at JobManager attempt 1 (timeout=100ms)
2019-12-20 09:21:22,826 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Successful registration at job manager akka.tcp://flink@localhost:6123/user/jobmanager_6 for job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,826 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Establish JobManager connection for job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,826 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Offer reserved slots to the leader of job c5449594afa59fde3826b82d6034a71b.
2019-12-20 09:21:22,830 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Activate slot 00b96cfdd8c98b09635926e3643210f3.
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task Source: Custom Source -> Sink: Print to Std. Out (1/1).
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from CREATED to DEPLOYING.
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING]
2019-12-20 09:21:22,837 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
2019-12-20 09:21:22,838 INFO  org.apache.flink.runtime.blob.BlobClient                      - Downloading c5449594afa59fde3826b82d6034a71b/p-c6e596cf391265b05e5e166e9448a1b36bc8fb74-756c152250698cb8c1c63a809f05d4b3 from localhost/127.0.0.1:44568
2019-12-20 09:21:23,011 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) [DEPLOYING].
2019-12-20 09:21:23,012 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Print to Std. Out (1/1) (6febbaa53f94b70650cfce8281831aaf) switched from DEPLOYING to RUNNING.
2019-12-20 09:21:23,013 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
2019-12-20 09:21:23,037 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber does not contain a setter for field sequenceNumber
2019-12-20 09:21:23,037 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2019-12-20 09:21:23,038 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - No restore state for FlinkKinesisConsumer.
2019-12-20 09:21:23,779 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
2019-12-20 09:21:23,780 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher  - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='my_stream', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49599347585804022687811337727581452022704930306761162754,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
ylamdve6

ylamdve61#

欢迎来到java世界 "${my_stream}" 未评估,可能指向不存在的主题。
如果 my_stream 是参数,则需要使用 pt.get("my_stream") .

相关问题