本文整理了Java中org.springframework.messaging.MessageChannel
类的一些代码示例,展示了MessageChannel
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。MessageChannel
类的具体详情如下:
包路径:org.springframework.messaging.MessageChannel
类名称:MessageChannel
[英]Defines methods for sending messages.
[中]定义发送消息的方法。
代码示例来源:origin: spring-projects/spring-framework
protected void handleInboundMessage(Message<?> message) {
if (this.isRemoteClientSession) {
this.outboundChannel.send(message);
}
}
代码示例来源:origin: spring-cloud-incubator/spring-cloud-alibaba
public <T> void sendWithTags(T msg, String tag) throws Exception {
Message message = MessageBuilder.createMessage(msg,
new MessageHeaders(Stream.of(tag).collect(Collectors
.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
source.output1().send(message);
}
代码示例来源:origin: spring-cloud-incubator/spring-cloud-alibaba
public <T> void sendObject(T msg, String tag) throws Exception {
Message message = MessageBuilder.withPayload(msg)
.setHeader(MessageConst.PROPERTY_TAGS, tag)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
source.output1().send(message);
}
代码示例来源:origin: apache/nifi
/**
*
*/
@Override
public <T> boolean send(T payload, Map<String, ?> messageHeaders, long timeout) {
if (this.toSpringChannel != null){
return this.toSpringChannel.send(MessageBuilder.withPayload(payload).copyHeaders(messageHeaders).build(), timeout);
} else {
throw new IllegalStateException("Failed to send message to '" + this.configName
+ "'. There are no 'fromNiFi' channels configured which means the Application Conetxt is not set up to receive messages from NiFi");
}
}
代码示例来源:origin: spring-cloud-incubator/spring-cloud-alibaba
public void send(String msg) throws Exception {
source.output1().send(MessageBuilder.withPayload(msg).build());
}
代码示例来源:origin: spring-projects/spring-framework
protected final void doSend(MessageChannel channel, Message<?> message, long timeout) {
Assert.notNull(channel, "MessageChannel is required");
Message<?> messageToSend = message;
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
accessor.removeHeader(this.sendTimeoutHeader);
accessor.removeHeader(this.receiveTimeoutHeader);
accessor.setImmutable();
}
else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
|| message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
messageToSend = MessageBuilder.fromMessage(message)
.setHeader(this.sendTimeoutHeader, null)
.setHeader(this.receiveTimeoutHeader, null)
.build();
}
boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to channel '" + channel + "' within timeout: " + timeout);
}
}
代码示例来源:origin: spring-projects/spring-framework
private void handleDisconnect(String sessionId, @Nullable Principal user, @Nullable Message<?> origMessage) {
this.sessions.remove(sessionId);
this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
accessor.setSessionId(sessionId);
if (user != null) {
accessor.setUser(user);
}
if (origMessage != null) {
accessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, origMessage);
}
initHeaders(accessor);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
getClientOutboundChannel().send(message);
}
代码示例来源:origin: spring-cloud-incubator/spring-cloud-alibaba
public <T> void sendTransactionalMsg(T msg, boolean error) throws Exception {
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
if (error) {
builder.setHeader("test", "1");
}
Message message = builder.build();
source.output2().send(message);
}
代码示例来源:origin: spring-projects/spring-framework
/**
* Send a {@link Message} to this channel. If the message is sent successfully,
* the method returns {@code true}. If the message cannot be sent due to a
* non-fatal reason, the method returns {@code false}. The method may also
* throw a RuntimeException in case of non-recoverable errors.
* <p>This method may block indefinitely, depending on the implementation.
* To provide a maximum wait time, use {@link #send(Message, long)}.
* @param message the message to send
* @return whether or not the message was sent
*/
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
代码示例来源:origin: spring-projects/spring-framework
headerAccessor.setLeaveMutable(true);
Object payload = message.getPayload();
Message<?> reply = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
SessionInfo info = this.sessions.get(sessionId);
if (info != null) {
try {
info.getClientOutboundChannel().send(reply);
代码示例来源:origin: spring-projects/spring-framework
private void sendInternal(Message<?> message) {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
Assert.notNull(destination, "Destination header required");
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
代码示例来源:origin: spring-projects/spring-framework
@Override
public void run() {
long now = System.currentTimeMillis();
for (SessionInfo info : sessions.values()) {
if (info.getReadInterval() > 0 && (now - info.getLastReadTime()) > info.getReadInterval()) {
handleDisconnect(info.getSessionId(), info.getUser(), null);
}
if (info.getWriteInterval() > 0 && (now - info.getLastWriteTime()) > info.getWriteInterval()) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
accessor.setSessionId(info.getSessionId());
Principal user = info.getUser();
if (user != null) {
accessor.setUser(user);
}
initHeaders(accessor);
accessor.setLeaveMutable(true);
MessageHeaders headers = accessor.getMessageHeaders();
info.getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers));
}
}
}
}
代码示例来源:origin: spring-projects/spring-framework
private void sendNextMessage() {
for (;;) {
Message<?> message = this.messages.poll();
if (message != null) {
try {
addCompletionCallback(message);
if (this.channel.send(message)) {
return;
}
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to send " + message, ex);
}
}
}
else {
// We ran out of messages..
this.sendInProgress.set(false);
trySend();
break;
}
}
}
代码示例来源:origin: spring-projects/spring-framework
MessageChannel brokerChannel = context.getBean("brokerChannel", MessageChannel.class);
inChannel.send(createConnectMessage("sess1", new long[] {0,0}));
headers.setSubscriptionId("subs1");
headers.setDestination("/user/queue.q1");
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
inChannel.send(message);
headers.setSessionId("sess1");
headers.setDestination("/user/sess1/queue.q1");
message = MessageBuilder.createMessage("123".getBytes(), headers.getMessageHeaders());
inChannel.send(message);
代码示例来源:origin: spring-projects/spring-security
private ThrowableAssert.ThrowingCallable send(Message<?> message) {
return () -> this.clientInboundChannel.send(message);
}
代码示例来源:origin: spring-projects/spring-framework
@Test // SPR-12170
public void sendToWithDestinationPlaceholders() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
Map<String, String> vars = new LinkedHashMap<>(1);
vars.put("roomName", "roomA");
String sessionId = "sess1";
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
accessor.setSessionId(sessionId);
accessor.setSubscriptionId("sub1");
accessor.setHeader(DestinationVariableMethodArgumentResolver.DESTINATION_TEMPLATE_VARIABLES_HEADER, vars);
Message<?> message = MessageBuilder.createMessage(PAYLOAD, accessor.getMessageHeaders());
this.handler.handleReturnValue(PAYLOAD, this.sendToWithPlaceholdersReturnType, message);
verify(this.messageChannel, times(1)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor actual = getCapturedAccessor(0);
assertEquals(sessionId, actual.getSessionId());
assertEquals("/topic/chat.message.filtered.roomA", actual.getDestination());
}
代码示例来源:origin: dyc87112/SpringCloud-Learning
/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void eventPublicationWithExceptions() {
ApplicationEventPublisher publisher = mock(ApplicationEventPublisher.class);
this.protocolHandler.setApplicationEventPublisher(publisher);
this.protocolHandler.afterSessionStarted(this.session, this.channel);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
TextMessage textMessage = new TextMessage(new StompEncoder().encode(message));
this.protocolHandler.handleMessageFromClient(this.session, textMessage, this.channel);
verify(this.channel).send(this.messageCaptor.capture());
Message<?> actual = this.messageCaptor.getValue();
assertNotNull(actual);
assertEquals(StompCommand.CONNECT, StompHeaderAccessor.wrap(actual).getCommand());
reset(this.channel);
headers = StompHeaderAccessor.create(StompCommand.CONNECTED);
message = MessageBuilder.createMessage(EMPTY_PAYLOAD, headers.getMessageHeaders());
this.protocolHandler.handleMessageToClient(this.session, message);
assertEquals(1, this.session.getSentMessages().size());
textMessage = (TextMessage) this.session.getSentMessages().get(0);
assertEquals("CONNECTED\n" + "user-name:joe\n" + "\n" + "\u0000", textMessage.getPayload());
this.protocolHandler.afterSessionEnded(this.session, CloseStatus.BAD_DATA, this.channel);
verify(this.channel).send(this.messageCaptor.capture());
actual = this.messageCaptor.getValue();
assertNotNull(actual);
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(actual);
assertEquals(StompCommand.DISCONNECT, accessor.getCommand());
assertEquals("s1", accessor.getSessionId());
assertEquals("joe", accessor.getUser().getName());
}
代码示例来源:origin: dyc87112/SpringCloud-Learning
/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
代码示例来源:origin: spring-projects/spring-framework
@Test
public void subscribeDisconnectPublish() {
String sess1 = "sess1";
String sess2 = "sess2";
startSession(sess1);
startSession(sess2);
this.messageHandler.handleMessage(createSubscriptionMessage(sess1, "sub1", "/foo"));
this.messageHandler.handleMessage(createSubscriptionMessage(sess1, "sub2", "/foo"));
this.messageHandler.handleMessage(createSubscriptionMessage(sess1, "sub3", "/bar"));
this.messageHandler.handleMessage(createSubscriptionMessage(sess2, "sub1", "/foo"));
this.messageHandler.handleMessage(createSubscriptionMessage(sess2, "sub2", "/foo"));
this.messageHandler.handleMessage(createSubscriptionMessage(sess2, "sub3", "/bar"));
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT);
headers.setSessionId(sess1);
headers.setUser(new TestPrincipal("joe"));
Message<byte[]> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
this.messageHandler.handleMessage(message);
this.messageHandler.handleMessage(createMessage("/foo", "message1"));
this.messageHandler.handleMessage(createMessage("/bar", "message2"));
verify(this.clientOutChannel, times(4)).send(this.messageCaptor.capture());
Message<?> captured = this.messageCaptor.getAllValues().get(2);
assertEquals(SimpMessageType.DISCONNECT_ACK, SimpMessageHeaderAccessor.getMessageType(captured.getHeaders()));
assertSame(message, captured.getHeaders().get(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER));
assertEquals(sess1, SimpMessageHeaderAccessor.getSessionId(captured.getHeaders()));
assertEquals("joe", SimpMessageHeaderAccessor.getUser(captured.getHeaders()).getName());
assertTrue(messageCaptured(sess2, "sub1", "/foo"));
assertTrue(messageCaptured(sess2, "sub2", "/foo"));
assertTrue(messageCaptured(sess2, "sub3", "/bar"));
}
内容来源于网络,如有侵权,请联系作者删除!