๋Œ€์šฉ๋Ÿ‰ ์ฒ˜๋ฆฌ ํ…Œ์ŠคํŠธ ๊ตฌํ˜„ ํ”„๋กœ์ ํŠธ - 2 (์ฝ”๋“œ ์ž‘์„ฑ)

2024. 6. 5. 15:47ใ†Project

๋ฐ˜์‘ํ˜•

๐Ÿ“‚ Redis ๊ด€๋ จ ํŒŒ์ผ

1. RedisConfig.java

์—ญํ•  RedisTemplate์„ ์„ค์ •ํ•˜์—ฌ Redis ์„œ๋ฒ„์™€ ์ƒํ˜ธ์ž‘์šฉ
์„ค๋ช… Redis์™€์˜ ์—ฐ๊ฒฐ์„ ์„ค์ •ํ•˜๊ณ , ๋ฐ์ดํ„ฐ๋ฅผ ๋ฌธ์ž์—ด๋กœ ์ง๋ ฌํ™” ๋ฐ ์—ญ์ง๋ ฌํ™”ํ•˜๋Š” ๋ฐ ํ•„์š”ํ•œ ์„ค์ •์„ ํฌํ•จ
package com.realTime.Insight.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer()); // ํ‚ค๋ฅผ ๋ฌธ์ž์—ด๋กœ ์ง๋ ฌํ™”
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // ๊ฐ’์„ JSON์œผ๋กœ ์ง๋ ฌํ™”
        return redisTemplate;
    }

    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("webSocketTopic")); // ํŠน์ • ์ฃผ์ œ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ 
        return container;
    }

}

2. DataService.java

์—ญํ•  Redis๋ฅผ ์ด์šฉํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๊ณ  ์กฐํšŒํ•˜๋Š” ์„œ๋น„์Šค ํด๋ž˜์Šค
์„ค๋ช… Redis์— ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๊ฐ€ํ•˜๊ณ  ์กฐํšŒํ•˜๋Š” ๋กœ์ง ํฌํ•จ
package com.realTime.Insight;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;

@Service
public class DataService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private ValueOperations<String, Object> valueOperations;

    @PostConstruct
    public void init() {
        valueOperations = redisTemplate.opsForValue();
    }

    public List getData() {
        return redisTemplate.opsForHash().values("TimeSeriesData").stream()
                .map(data -> (Data) data)
                .collect(Collectors.toList());
    }

    public void addData(Data data) {
        String key = "webSocketTopic:" + System.currentTimeMillis();
        valueOperations.set(key, data);  // ๋‹จ์ผ ๊ฐ’ ์ €์žฅ
        redisTemplate.opsForHash().put("TimeSeriesData", key, data);  // ํ•ด์‹œ ๊ฐ’ ์ €์žฅ
    }

    public List getAllData() {
        List dataList = new ArrayList<>();
        for (String key : redisTemplate.keys("webSocketTopic:*")) {
            dataList.add(valueOperations.get(key));
        }
        return dataList;
    }
}

๐Ÿ“‚ Kafka ๊ด€๋ จ ํŒŒ์ผ

1. KafkaConsumerConfig.java

์—ญํ•  Kafka Consumer ์„ค์ •์„ ๋‹ด๋‹น
์„ค๋ช… Kafka์˜ Consumer๋ฅผ ์„ค์ •ํ•˜๊ณ , ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  Listener๋ฅผ ์„ค์ •
package com.realTime.Insight.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

2. KafkaProducerConfig.java

์—ญํ•  Kafka Producer ์„ค์ •์„ ๋‹ด๋‹น
์„ค๋ช… Kafka์˜ Producer๋ฅผ ์„ค์ •ํ•˜๊ณ , KafkaTemplate์„ ์ œ๊ณต
package com.realTime.Insight.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

3. KafkaProducerService.java

์—ญํ•  Kafka๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•˜๋Š” ์„œ๋น„์Šค ํด๋ž˜์Šค
์„ค๋ช… Kafka์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ๋กœ์ง์„ ํฌํ•จ, sendMessage ๋ฉ”์„œ๋“œ๋Š” ์ง€์ •๋œ ํ† ํ”ฝ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†ก
package com.realTime.Insight;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * Kafka๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•˜๋Š” ๋ฉ”์„œ๋“œ
     * @param topic ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•  Kafka ํ† ํ”ฝ
     * @param message ์ „์†กํ•  ๋ฉ”์‹œ์ง€
     */
    public void sendMessage(String topic, Object message) {
        kafkaTemplate.send(topic, message);
    }
}

4. KafkaConsumerService.java

์—ญํ•  Kafka๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๋Š” ์„œ๋น„์Šค ํด๋ž˜์Šค
์„ค๋ช… Kafka ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๊ณ , ์ด๋ฅผ Redis ๋ฐ WebSocket์œผ๋กœ ์ „์†กํ•˜๋Š” ๋กœ์ง ํฌํ•จ
package com.realTime.Insight;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @KafkaListener(topics = "webSocketTopic", groupId = "group_id")
    public void consume(String message) {
        redisTemplate.opsForList().rightPush("real_time_data", message);
        System.out.println("message => " + message);
        messagingTemplate.convertAndSend("/topic/data", message);
        System.out.println("Consumed message: " + message);
    }
}

๐Ÿ’ป Front ํŒŒ์ผ (React)

DataDisplay.js ํŒŒ์ผ์„ ์•„๋ž˜์™€ ๊ฐ™์ด ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

import React, { useEffect, useState } from 'react';
import axios from 'axios';
import SockJS from 'sockjs-client';
import { Stomp } from '@stomp/stompjs';

const DataDisplay = () => {
    const [data, setData] = useState([]);

    useEffect(() => {
        // Fetch initial data
        axios.get('http://localhost:8080/api/data')
            .then(response => {
                setData(response.data);
            })
            .catch(error => {
                console.error('There was an error fetching the data!', error);
            });

        // Setup WebSocket connection
        const socket = new SockJS('http://localhost:8080/ws');
        const stompClient = Stomp.over(socket);

        stompClient.connect({}, frame => {
            console.log('Connected: ' + frame);
            stompClient.subscribe('/topic/data', message => {
                const newData = JSON.parse(message.body);
                setData(prevData => [...prevData, newData]);
            });
        });

        return () => {
            if (stompClient) {
                stompClient.disconnect();
            }
        };
    }, []);

    return (Data from Backend
{data.map((item) => (
ID: {item.id}Title: {item.title}
Description: {item.description}
Timestamp: {item.timestamp}
))}
    );
};

export default DataDisplay;
 

์ค‘์š” ์ฝ”๋“œ๋Š” ์œ„์™€ ๊ฐ™์ด ์ž‘์„ฑํ•˜๊ณ , ๋‚˜๋จธ์ง€ ์ฝ”๋“œ๋Š” ์•„๋ž˜ GitHub๋ฅผ ์ฐธ๊ณ ํ•ฉ๋‹ˆ๋‹ค.

GitHub - sonhoil/Largevolume: ๋Œ€์šฉ๋Ÿ‰ ์ฒ˜๋ฆฌ ์‹œ์Šคํ…œ ํ…Œ์ŠคํŠธ ํ™˜๊ฒฝ ๊ตฌํ˜„ ํ”„๋กœ์ ํŠธ

GitHub - sonhoil/Largevolume: ๋Œ€์šฉ๋Ÿ‰ ์ฒ˜๋ฆฌ ์‹œ์Šคํ…œ ํ…Œ์ŠคํŠธ ํ™˜๊ฒฝ ๊ตฌํ˜„ ํ”„๋กœ์ ํŠธ
GitHub - sonhoil/Largevolume: ๋Œ€์šฉ๋Ÿ‰ ์ฒ˜๋ฆฌ ์‹œ์Šคํ…œ ํ…Œ์ŠคํŠธ ํ™˜๊ฒฝ ๊ตฌํ˜„ ํ”„๋กœ์ ํŠธ
๋ฐ˜์‘ํ˜•