web123456

SpringBoot-based implementation of MQTT message sending and receiving

@Log4j2 @Data @Configuration @IntegrationComponentScan public class Subscriber { @Resource MqttConfiguration configuration; @Bean public MqttConnectOptions connectOptions() { return configuration.connectionOptions(); } // Initialize the connection factory @Bean public MqttPahoClientFactory mqttPahoClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(connectOptions()); return factory; } // Create input channel @Bean("mqttInboundChannel") public MessageChannel mqttInboundChannel() { return new DirectChannel() {{ this.subscribe(handler()); }}; } private static String clusterClientId() { try { return "subscriber" + "-" + InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException he) { log.warn("unknown ip address, cause: {}", he.getMessage()); } return "subscriber" + "-" + System.currentTimeMillis(); } // Bind TOPICs @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clusterClientId(), mqttPahoClientFactory(), configuration.getTopics().toArray(new String[0])); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(configuration.getQos()); adapter.setOutputChannel(mqttInboundChannel()); return adapter; } // Message Handling Definitions @Bean public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("HandleMessage headers: {}", message.getHeaders()); log.info("HandleMessage payload: {}", message.getPayload()); } }; }