com.hazelcast.jet.pipeline.Sinks.list()方法的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(8.3k)|赞(0)|评价(0)|浏览(122)

本文整理了Java中com.hazelcast.jet.pipeline.Sinks.list()方法的一些代码示例,展示了Sinks.list()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Sinks.list()方法的具体详情如下:
包路径:com.hazelcast.jet.pipeline.Sinks
类名称:Sinks
方法名:list

Sinks.list介绍

[英]Returns a sink that adds the items it receives to a Hazelcast IList with the specified name.

NOTE: Jet only remembers the name of the list you supply and acquires a list with that name on the local cluster. If you supply a list instance from another cluster, no error will be thrown to indicate this.

No state is saved to snapshot for this sink. After the job is restarted, the items will likely be duplicated, providing an at-least-once guarantee.

The default local parallelism for this sink is 1.
[中]返回一个接收器,该接收器将收到的项添加到具有指定名称的Hazelcast IList。
注意:Jet只记住您提供的列表的名称,并在本地集群上获取具有该名称的列表。如果您从另一个集群提供一个列表实例,则不会抛出任何错误来指示这一点。
没有将状态保存到此接收器的快照。重新启动作业后,这些项目可能会被复制,从而提供至少一次的保证。
此接收器的默认本地并行度为1。

代码示例

代码示例来源:origin: hazelcast/hazelcast-jet

/**
 * Returns a sink that adds the items it receives to a Hazelcast {@code
 * IList} with the specified name.
 * <p>
 * <strong>NOTE:</strong> Jet only remembers the name of the list you
 * supply and acquires a list with that name on the local cluster. If you
 * supply a list instance from another cluster, no error will be thrown to
 * indicate this.
 * <p>
 * No state is saved to snapshot for this sink. After the job is restarted,
 * the items will likely be duplicated, providing an <i>at-least-once</i>
 * guarantee.
 * <p>
 * The default local parallelism for this sink is 1.
 */
@Nonnull
public static <T> Sink<T> list(@Nonnull IList<? super T> list) {
  return list(list.getName());
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
    System.setProperty("hazelcast.logging.type", "log4j");

    NettyServer nettyServer = new NettyServer(PORT, channel -> {
      for (int i; (i = COUNTER.getAndDecrement()) > 0; ) {
        channel.writeAndFlush(i + "\n");
      }
      channel.close();
    }, DistributedConsumer.noop());
    nettyServer.start();

    JetInstance jet = Jet.newJetInstance();
    Jet.newJetInstance();

    try {
      Pipeline p = Pipeline.create();
      p.drawFrom(Sources.socket(HOST, PORT, UTF_8))
       .drainTo(Sinks.list(SINK_NAME));

      jet.newJob(p).join();

      System.out.println("Jet received " + jet.getList(SINK_NAME).size() + " items from the socket");
    } finally {
      nettyServer.stop();
      Jet.shutdownAll();
    }

  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
    System.setProperty("hazelcast.logging.type", "log4j");
    JetInstance jet = Jet.newJetInstance();

    try {
      IList<Integer> inputList = jet.getList(INPUT_LIST);
      for (int i = 0; i < ITEM_COUNT; i++) {
        inputList.add(i);
      }

      Pipeline p = Pipeline.create();
      p.drawFrom(Sources.<Integer>list(INPUT_LIST))
       .map(i -> "item" + i)
       .drainTo(Sinks.list(RESULT_LIST));

      jet.newJob(p).join();

      IList<String> outputList = jet.getList(RESULT_LIST);
      System.out.println("Result list items: " + new ArrayList<>(outputList));
    } finally {
      Jet.shutdownAll();
    }
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  JetConfig jetConfig = getJetConfig();
  JetInstance jet = Jet.newJetInstance(jetConfig);
  Jet.newJetInstance(jetConfig);
  try {
    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<Integer, Integer>cacheJournal(CACHE_NAME, START_FROM_OLDEST))
     .map(Entry::getValue)
     .drainTo(Sinks.list(SINK_NAME));
    jet.newJob(p);
    ICache<Integer, Integer> cache = jet.getCacheManager().getCache(CACHE_NAME);
    for (int i = 0; i < 1000; i++) {
      cache.put(i, i);
    }
    TimeUnit.SECONDS.sleep(3);
    System.out.println("Read " + jet.getList(SINK_NAME).size() + " entries from cache journal.");
  } finally {
    Jet.shutdownAll();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  JetConfig jetConfig = getJetConfig();
  JetInstance jet = Jet.newJetInstance(jetConfig);
  Jet.newJetInstance(jetConfig);
  try {
    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<Integer, Integer>mapJournal(MAP_NAME, START_FROM_OLDEST))
     .map(Entry::getValue)
     .drainTo(Sinks.list(SINK_NAME));
    jet.newJob(p);
    IMapJet<Integer, Integer> map = jet.getMap(MAP_NAME);
    for (int i = 0; i < 1000; i++) {
      map.set(i, i);
    }
    TimeUnit.SECONDS.sleep(3);
    System.out.println("Read " + jet.getList(SINK_NAME).size() + " entries from map journal.");
  } finally {
    Jet.shutdownAll();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

.drainTo(Sinks.list("sink"));

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  Config hzConfig = getConfig();
  HazelcastInstance remoteHz = startRemoteHzCluster(hzConfig);
  JetInstance localJet = startLocalJetCluster();
  try {
    ClientConfig clientConfig = new ClientConfig();
    clientConfig.getNetworkConfig().addAddress(getAddress(remoteHz));
    clientConfig.setGroupConfig(hzConfig.getGroupConfig());
    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<Integer, Integer>remoteMapJournal(
        MAP_NAME, clientConfig, START_FROM_OLDEST)
    ).map(Entry::getValue)
     .drainTo(Sinks.list(SINK_NAME));
    localJet.newJob(p);
    IMap<Integer, Integer> map = remoteHz.getMap(MAP_NAME);
    for (int i = 0; i < 1000; i++) {
      map.set(i, i);
    }
    TimeUnit.SECONDS.sleep(3);
    System.out.println("Read " + localJet.getList(SINK_NAME).size() + " entries from remote map journal.");
  } finally {
    Hazelcast.shutdownAll();
    Jet.shutdownAll();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

.drainTo(Sinks.list("sink"));

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

GenericPredicates.lessThan("price", 10),
    Projections.singleAttribute("ticker"))
).drainTo(Sinks.list(SINK_NAME));
System.out.println("\n\nExecuting job 1...\n");
jet.newJob(p1).join();
    e -> e.getValue().getPrice() < 10,
    e -> e.getValue().getTicker())
).drainTo(Sinks.list(SINK_NAME));
System.out.println("\n\nExecuting job 2...\n");
jet.newJob(p2).join();

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  JetInstance localJet = Jet.newJetInstance();
  try {
    HazelcastInstance externalHz = startExternalHazelcast();
    IList<Integer> sourceList = externalHz.getList(LIST_1);
    for (int i = 0; i < ITEM_COUNT; i++) {
      sourceList.add(i);
    }
    ClientConfig clientConfig = clientConfigForExternalHazelcast();
    // pipeline that copies the remote list to a local with the same name
    Pipeline p1 = Pipeline.create();
    p1.drawFrom(Sources.remoteList(LIST_1, clientConfig))
     .drainTo(Sinks.list(LIST_1));
    // pipeline that copies the local list to a remote with a different name
    Pipeline p2 = Pipeline.create();
    p2.drawFrom(Sources.list(LIST_1))
     .drainTo(Sinks.remoteList(LIST_2, clientConfig));
    localJet.newJob(p1).join();
    System.out.println("Local list-1 contents: " + new ArrayList<>(localJet.getList(LIST_1)));
    localJet.newJob(p2).join();
    System.out.println("Remote list-2 contents: " + new ArrayList<>(externalHz.getList(LIST_2)));
  } finally {
    Jet.shutdownAll();
    Hazelcast.shutdownAll();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

.drainTo(list("sink"));

代码示例来源:origin: hazelcast/hazelcast-jet-code-samples

public static void main(String[] args) throws Exception {
  System.setProperty("hazelcast.logging.type", "log4j");
  Config hzConfig = getConfig();
  HazelcastInstance remoteHz = startRemoteHzCluster(hzConfig);
  JetInstance localJet = startLocalJetCluster();
  try {
    ClientConfig clientConfig = new ClientConfig();
    clientConfig.getNetworkConfig().addAddress(getAddress(remoteHz));
    clientConfig.setGroupConfig(hzConfig.getGroupConfig());
    Pipeline p = Pipeline.create();
    p.drawFrom(Sources.<Integer, Integer>remoteCacheJournal(
        CACHE_NAME, clientConfig, START_FROM_OLDEST)
    ).map(Entry::getValue)
     .drainTo(Sinks.list(SINK_NAME));
    localJet.newJob(p);
    ICache<Integer, Integer> cache = remoteHz.getCacheManager().getCache(CACHE_NAME);
    for (int i = 0; i < 1000; i++) {
      cache.put(i, i);
    }
    TimeUnit.SECONDS.sleep(3);
    System.out.println("Read " + localJet.getList(SINK_NAME).size() + " entries from remote cache journal.");
  } finally {
    Hazelcast.shutdownAll();
    Jet.shutdownAll();
  }
}

代码示例来源:origin: hazelcast/hazelcast-jet

private Job newJob() {
  Pipeline p = Pipeline.create();
  p.drawFrom(Sources.mapJournal(SOURCE_NAME, START_FROM_OLDEST))
      .withoutTimestamps()
      .drainTo(Sinks.list(SINK_NAME));
  return jet.newJob(p, new JobConfig().setName("job-infinite-pipeline"));
}

相关文章