728x90
build.gradle kafka-streams 추가
dependencies {
implementation 'org.apache.kafka:kafka-streams'
}
Kafka Streams
strams_log 토픽 → stream() 소스프로세서 → to() 싱크프로세서 → stream_log_copy 토픽
@SpringBootApplication
public class KafkaStreamsApplication {
private static String APPLICATION_NAME = "streams-filter-application";
private static String BOOTSTRAP_SERVERS = "localhost:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streamLog = builder.stream(STREAM_LOG);
streamLog.filter((key, value) -> value.length() > 5).to(STREAM_LOG_FILTER);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
kafka-topics.sh --list --bootstrap-server localhost:9092
# 토픽 생성
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --topic stream_log
# 프로듀서, 컨슈머
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stream_log
kafka-console-consumer.sh --topic stream_log_copy --bootstrap-server localhost:9092 --from-beginning
스트림즈dsl filter()
strams_log 토픽 → stream() 소스프로세서 → filter() 스트림프로세서 → to() 싱크프로세서 → stream_log_copy 토픽
kafka-console-consumer.sh --topic stream_log_filter --bootstrap-server localhost:9092 --from-beginning
728x90