org.apache.hadoop.yarn.event.AsyncDispatcher类的使用及代码示例

x33g5p2x  于2022-01-16 转载在 其他  
字(9.0k)|赞(0)|评价(0)|浏览(137)

本文整理了Java中org.apache.hadoop.yarn.event.AsyncDispatcher类的一些代码示例,展示了AsyncDispatcher类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。AsyncDispatcher类的具体详情如下:
包路径:org.apache.hadoop.yarn.event.AsyncDispatcher
类名称:AsyncDispatcher

AsyncDispatcher介绍

[英]Dispatches Events in a separate thread. Currently only single thread does that. Potentially there could be multiple channels for each event type class and a thread pool can be used to dispatch the events.
[中]在单独的线程中调度事件。目前只有一个线程可以做到这一点。每个事件类型类可能有多个通道,可以使用线程池来分派事件。

代码示例

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

@Override
protected void serviceInit(Configuration conf) throws Exception{
 // create async handler
 dispatcher = new AsyncDispatcher();
 dispatcher.init(conf);
 dispatcher.register(RMStateStoreEventType.class, 
           new ForwardingEventHandler());
 dispatcher.setDrainEventsOnStop();
 initInternal(conf);
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@SuppressWarnings("rawtypes")
 protected EventHandler getRMStateStoreEventHandler() {
  return dispatcher.getEventHandler();
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-common

protected void startDispatcher() {
 // start dispatcher
 AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
 asyncDispatcher.start();
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

protected void initDispatcher(Configuration conf) {
 // create async handler
 dispatcher = new AsyncDispatcher("AttributeNodeLabelsManager dispatcher");
 AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
 asyncDispatcher.init(conf);
 asyncDispatcher.setDrainEventsOnStop();
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapreduce-client-app

@Test(timeout=20000)
public void testCommitJobFailsJob() throws Exception {
 Configuration conf = new Configuration();
 conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
 AsyncDispatcher dispatcher = new AsyncDispatcher();
 dispatcher.init(conf);
 dispatcher.start();
 CyclicBarrier syncBarrier = new CyclicBarrier(2);
 OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
 CommitterEventHandler commitHandler =
   createCommitterEventHandler(dispatcher, committer);
 commitHandler.init(conf);
 commitHandler.start();
 JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
 completeJobTasks(job);
 assertJobState(job, JobStateInternal.COMMITTING);
 // let the committer fail and verify the job fails
 syncBarrier.await();
 assertJobState(job, JobStateInternal.FAILED);
 dispatcher.stop();
 commitHandler.stop();
}

代码示例来源:origin: apache/tajo

@Override
public void serviceInit(Configuration conf) throws Exception {
 try {
  this.dispatcher = new AsyncDispatcher();
  addService(this.dispatcher);
  this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
  TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
  this.historyCache = new LRUMap(tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_CACHE_SIZE));
 } catch (Exception e) {
  LOG.error("Failed to init service " + getName() + " by exception " + e, e);
 }
 super.serviceInit(conf);
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-nodemanager

/**
 * Unit test friendly.
 */
protected AsyncDispatcher createNMDispatcher() {
 return new AsyncDispatcher("NM Event dispatcher");
}

代码示例来源:origin: apache/tajo

@BeforeClass
public static void setUp() throws Exception {
 util = new TajoTestingCluster();
 util.startCatalogCluster();
 conf = util.getConfiguration();
 conf.set(TajoConf.ConfVars.$TEST_BROADCAST_JOIN_ENABLED.varname, "false");
 catalog = util.getCatalogService();
 catalog.createTablespace(DEFAULT_TABLESPACE_NAME, "hdfs://localhost:!234/warehouse");
 catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
 TPCH tpch = new TPCH();
 tpch.loadSchemas();
 tpch.loadOutSchema();
 for (String table : tpch.getTableNames()) {
  TableMeta m = CatalogUtil.newTableMeta(BuiltinStorages.TEXT, util.getConfiguration());
  TableDesc d = CatalogUtil.newTableDesc(
    IdentifierUtil.buildFQName(DEFAULT_DATABASE_NAME, table), tpch.getSchema(table), m, CommonTestingUtil.getTestDir());
  TableStats stats = new TableStats();
  stats.setNumBytes(TPCH.tableVolumes.get(table));
  d.setStats(stats);
  catalog.createTable(d);
 }
 analyzer = new SQLAnalyzer();
 logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
 dispatcher = new AsyncDispatcher();
 dispatcher.init(conf);
 dispatcher.start();
 planner = new GlobalPlanner(conf, catalog);
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapreduce-client-app

EventHandler<JobHistoryEvent> timelineEventHandler =
  new ForwardingEventHandler();
atsEventDispatcher.register(EventType.class, timelineEventHandler);
atsEventDispatcher.setDrainEventsOnStop();
atsEventDispatcher.init(conf);

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public void register(Class<? extends Enum> eventType, EventHandler handler) {
 for (AsyncDispatcher dispatcher : dispatchers) {
  dispatcher.register(eventType, handler);
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-common

protected void stopDispatcher() {
 AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
 if (null != asyncDispatcher) {
  asyncDispatcher.stop();
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-common

@Override
 public void run() {
  while (!stopped && !Thread.currentThread().isInterrupted()) {
   drained = eventQueue.isEmpty();
   // blockNewEvents is only set when dispatcher is draining to stop,
   // adding this check is to avoid the overhead of acquiring the lock
   // and calling notify every time in the normal run of the loop.
   if (blockNewEvents) {
    synchronized (waitForDrained) {
     if (drained) {
      waitForDrained.notify();
     }
    }
   }
   Event event;
   try {
    event = eventQueue.take();
   } catch(InterruptedException ie) {
    if (!stopped) {
     LOG.warn("AsyncDispatcher thread interrupted", ie);
    }
    return;
   }
   if (event != null) {
    dispatch(event);
   }
  }
 }
};

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

public void setDrainEventsOnStop() {
 for (AsyncDispatcher dispatcher : dispatchers) {
  dispatcher.setDrainEventsOnStop();
 }
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-server-resourcemanager

@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testSchedulerEventDispatcherForPreemptionEvents() {
 AsyncDispatcher rmDispatcher = new AsyncDispatcher();
 CapacityScheduler sched = spy(new CapacityScheduler());
 YarnConfiguration conf = new YarnConfiguration();
 SchedulerEventDispatcher schedulerDispatcher =
   new SchedulerEventDispatcher(sched);
 rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
 rmDispatcher.init(conf);
 rmDispatcher.start();
 schedulerDispatcher.init(conf);
 schedulerDispatcher.start();
  ContainerPreemptEvent event1 = new ContainerPreemptEvent(
    appAttemptId, container, SchedulerEventType.DROP_RESERVATION);
  rmDispatcher.getEventHandler().handle(event1);
  ContainerPreemptEvent event2 = new ContainerPreemptEvent(
     appAttemptId, container, SchedulerEventType.KILL_CONTAINER);
  rmDispatcher.getEventHandler().handle(event2);
  ContainerPreemptEvent event3 = new ContainerPreemptEvent(
    appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER);
  rmDispatcher.getEventHandler().handle(event3);
 } finally {
  schedulerDispatcher.stop();
  rmDispatcher.stop();

代码示例来源:origin: com.github.jiayuhan-it/hadoop-yarn-common

protected void initDispatcher(Configuration conf) {
 // create async handler
 dispatcher = new AsyncDispatcher();
 AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
 asyncDispatcher.init(conf);
 asyncDispatcher.setDrainEventsOnStop();
}

代码示例来源:origin: org.apache.tajo/tajo-core

@Override
public void serviceInit(Configuration conf) throws Exception {
 try {
  this.dispatcher = new AsyncDispatcher();
  addService(this.dispatcher);
  this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
  TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
  this.historyCache = new LRUMap(tajoConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_CACHE_SIZE));
 } catch (Exception e) {
  LOG.error("Failed to init service " + getName() + " by exception " + e, e);
 }
 super.serviceInit(conf);
}

代码示例来源:origin: org.apache.tajo/tajo-core

public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
            QueryId queryId, Session session, QueryContext queryContext,
            String jsonExpr,
            NodeResource allocation) {
 this(queryMasterContext, queryId, session, queryContext, jsonExpr, allocation, new AsyncDispatcher());
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

@Override
public void register(Class<? extends Enum> eventType,
  EventHandler handler) {
 for (AsyncDispatcher dispatcher : dispatchers) {
  dispatcher.register(eventType, handler);
 }
}

代码示例来源:origin: org.apache.hadoop/hadoop-yarn-server-resourcemanager

protected void stopDispatcher() {
 AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
 if (null != asyncDispatcher) {
  asyncDispatcher.stop();
 }
}

代码示例来源:origin: ch.cern.hadoop/hadoop-yarn-common

@Override
 public void run() {
  while (!stopped && !Thread.currentThread().isInterrupted()) {
   drained = eventQueue.isEmpty();
   // blockNewEvents is only set when dispatcher is draining to stop,
   // adding this check is to avoid the overhead of acquiring the lock
   // and calling notify every time in the normal run of the loop.
   if (blockNewEvents) {
    synchronized (waitForDrained) {
     if (drained) {
      waitForDrained.notify();
     }
    }
   }
   Event event;
   try {
    event = eventQueue.take();
   } catch(InterruptedException ie) {
    if (!stopped) {
     LOG.warn("AsyncDispatcher thread interrupted", ie);
    }
    return;
   }
   if (event != null) {
    dispatch(event);
   }
  }
 }
};

相关文章