@Log4j2
@Data
@Configuration
@IntegrationComponentScan
public class Subscriber {
@Resource
MqttConfiguration configuration;
@Bean
public MqttConnectOptions connectOptions() {
return configuration.connectionOptions();
}
// Initialize the connection factory
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(connectOptions());
return factory;
}
// Create input channel
@Bean("mqttInboundChannel")
public MessageChannel mqttInboundChannel() {
return new DirectChannel() {{
this.subscribe(handler());
}};
}
private static String clusterClientId() {
try {
return "subscriber" + "-" + InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException he) {
log.warn("unknown ip address, cause: {}", he.getMessage());
}
return "subscriber" + "-" + System.currentTimeMillis();
}
// Bind TOPICs
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clusterClientId(),
mqttPahoClientFactory(),
configuration.getTopics().toArray(new String[0]));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(configuration.getQos());
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
// Message Handling Definitions
@Bean
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
log.info("HandleMessage headers: {}", message.getHeaders());
log.info("HandleMessage payload: {}", message.getPayload());
}
};
}