본문 바로가기
IT/Server

[Server] gRPC를 이용한 요청 분석 시스템

by kyu-nahc 2025. 3. 16.
반응형

 

 

gRPC

지난 포스트에서 REST API와 비교하여 gRPC의 특징 및 장점을 살펴보았다.

gRPC외부 사용자가 액세스 할 수 없는 내부 시스템 또는 구조를 만들 때 주로 사용하며,

짧은 대시 시간과 빠른 대역폭 통신 등 메시지 전달의 효율성이 중요한 경우에 유용하게 사용할 수 있다.

따라서 외부 클라이언트에게 데이터를 반환하기보다는 내부 시스템에서

클라이언트 요청에 대한 분석이나 추적 시스템을 구현할 때 유용한 프로토콜이 될 수 있다.

해당 포스트에서는 gRPC를 이용한 간단한 요청 분석 시스템을 보여준다.

REST API와 gRPC에 대한 비교는 아래 포스트를 통해 자세히 살펴볼 수 있다.

 

[Server] REST API & gRPC

REST API / gRPCREST API와 gRPC는 서버와 클라이언트 간의 통신을 위해 사용되는 두 가지 대표적인 프로토콜이다.REST API를 사용하는 경우 gRPC와 비교 시 좀 더 광범위하게 사용되고 선호되는 것을 볼 수

kyu-nahc.tistory.com

 

gRPC System Flow Chart

[ Application flow chart ]

 

gRPC 요청 분석 시스템의 흐름도는 위의 그림과 같고,

Java gRPC Client와 Java gRPC Server는 클라이언트 스트리밍을 통해 

요청 및 응답 데이터를 전달하고, Java gRPC Server가 수집한 데이터를 

Python gRPC Server로 스트리밍 하여 분석하는 구조로 이루어져 있다. 

 

gRPC는 서로 다른 언어 간의 통신이 가능하고 요청을 지속적으로 추적하고 

분석해야 하는 내부 시스템에 적합하기 때문에 해당 애플리케이션에서 핵심 역할을 하며, 

Python은 Java보다 데이터 분석과 그래프 시각화에 유리하다는 장점을 살려 

분석 작업을 Python gRPC Server에서 진행한다. 

분석된 데이터는 Flask Server로 전달되어 최신 상태로 갱신되며, 사용자에게 시각적으로 제공된다.

흐름도를 순서에 맞게 정리하면 다음과 같다.

 

  1. Java gRPC Client가 Request Trace Data를 생성한다.
  2. Java gRPC Client가 Java gRPC Server에게
    클라이언트 스트리밍으로 생성된 Trace Data를 모두 전달한다.
  3. Java gRPC Server는 Client Streaming으로 전달되는 모든 데이터를 받고,
    Java gRPC Client에게 하나의 응답 메시지를 전달한다.
  4. Java gRPC Server에서는 별도의 스레드가 존재하고 해당 스레드는 20초마다
    Python gRPC Server에게 Client Streaming으로 수집된 데이터를 모두 전달한다.
  5. Python gRPC Server는 Client Streaming으로 전달되는 모든 데이터를 받고,
    Java gRPC Server에게 하나의 응답 메시지를 전달한다.
  6. Python gRPC Server는 전달받은 데이터를 기반으로 데이터 분석을 진행한다.
    전달받은 데이터의 길이가 1 이상이라면, POST 요청을 통해 Flask Server에게
    지금까지 누적된 모든 Trace Data를 요청 body에 포함하여 전달한다.
  7. POST 요청을 받은 Flask Server는 요청 body에 존재하는 데이터로 갱신 작업을 진행하고,
    http://localhost:8080/dashboard의 GET 요청을 받으면 
    그래프를 생성하여 View를 렌더링 해서 보여준다.

 

Java gRPC Server / Client

Java의 gRPC Client / Server의 코드와 .proto 파일부터 살펴보면 다음과 같다.

아래 코드는 Java gRPC ClientJava gRPC Server와의 스트리밍을 위한 proto 파일이다.

syntax = "proto3";

option java_multiple_files = false;
option java_package = "PACKAGE NAME";

service TraceService {
  rpc StreamTrace (stream TraceRequest) returns (TraceResponse);
}

message TraceRequest {
  int64 request_id = 1;
  string service_name = 2;
  int64 timestamp = 3;
  string method = 4;
  int64 status = 5;
  string message = 6;
}

message TraceResponse {
  int64 status = 1;
  string message = 2;
}

 

TraceService

  • 서비스 이름 : TraceService
  • 메서드 : StreamTrace
    • 클라이언트에서 스트리밍 방식으로 요청을 보내고, 서버단일 응답을 반환
    • 클라이언트가 다수의 로그 데이터를 서버로 전송
    • 서버는 전체 데이터를 처리한 후 요약된 응답을 반환

TraceRequest

  • 클라이언트가 서버로 전송하는 요청 메시지의 데이터 구조이다.
  • 필드
    1. request_id : 요청의 고유 ID
    2. service_name : 요청을 받은 웹 애플리케이션 이름
    3. timestamp : 요청이 생성된 타임스탬프
    4. method : 호출된 HTTP 메서드
    5. status :  HTTP 상태 코드
    6. message : 관련된 설명이나 에러 메시지

TraceResponse

  • 서버가 클라이언트에 반환하는 응답 메시지의 데이터 구조이다.
  • 필드
    1. status : 요청 처리 결과를 나타내는 상태 코드
    2. message : 요청 처리에 대한 요약 메시지

 

다음으로 Java gRPC ServerPython gRPC Server와의 스트리밍을 위한 proto 파일이다.

syntax = "proto3";

option java_multiple_files = false;
option java_package = "PACKAGE NAME";

service DashboardService {
  rpc SendTraceInfo (stream DashboardRequest) returns (DashboardResponse);
}

message DashboardRequest {
  string service_name = 1;
  int64 timestamp = 2;
  string method = 3;
  int64 status = 4;
  string message = 5;
}

message DashboardResponse {
  int64 status = 1;
  string message = 2;
}

DashboardService

  • 서비스 이름 : DashboardService
  • 메서드 : SendTraceInfo
    • Java gRPC 서버 측에서 스트리밍 방식으로 요청을 보내고, 
      Python gRPC 서버단일 응답을 반환
    • Java gRPC 서버가 클라이언트로부터 받은
      다수의 로그 데이터를 Python gRPC 서버로 전송
    • 서버는 전체 데이터를 처리 및 분석을 진행한 후 응답을 반환

DashboardRequest

  • Python gRPC 서버로 전송하는 요청 메시지의 데이터 구조이다.
  • 필드
    1. service_name : 요청 분석을 진행하는 웹 애플리케이션 이름
    2. timestamp : 요청이 생성된 타임스탬프
    3. method : 호출된 HTTP 메서드
    4. status :  HTTP 상태 코드
    5. message : 관련된 설명이나 에러 메시지

DashboardResponse

  • Java gRPC 서버에 반환하는 응답 메시지의 데이터 구조이다.
  • 필드
    1. status : 요청 처리 결과를 나타내는 상태 코드
    2. message : 요청 처리에 대한 요약 메시지

 

CentralTraceServer

CentralTraceServer는 gRPC를 기반으로 클라이언트로부터

로그 데이터를 스트리밍으로 수신하고, 이를 Python 서버로 전송하는 Java 서버이다.

Python 서버와의 송수신 또한 gRPC 스트리밍으로 진행된다. 전체 코드를 먼저 살펴보면 다음과 같다.

public class CentralTraceServer {

    private static final List<Trace.TraceRequest> traceData = new ArrayList<>();
    private static final int PYTHON_SERVER_PORT = 9091;


    public static void main(String[] args) throws IOException, InterruptedException {

        Server server = ServerBuilder.forPort(9090)
                .addService(new CentralTraceServer.TraceServiceImpl())
                .build();

        System.out.println("Central Trace Server is running on port 9090");
        server.start();
        Timer timer = new Timer(true);
        timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    sendAllDataToPythonServer();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, 0, 20 * 1000);
        server.awaitTermination();
    }
    private static void sendAllDataToPythonServer() throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", PYTHON_SERVER_PORT)
                .usePlaintext()
                .build();
        DashboardServiceGrpc.DashboardServiceStub asyncStub = DashboardServiceGrpc.newStub(channel);

        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<Dashboard.DashboardResponse> responseObserver = new StreamObserver<>() {

            @Override
            public void onNext(Dashboard.DashboardResponse response) {
                System.out.println(">>> Python Server Response : " + response);
            }

            @Override
            public void onError(Throwable t) {
                Status status = Status.fromThrowable(t);
                System.err.println(">>> Warning => [" + status.toString() + "]");
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {
                System.out.println(">>> Finished.");
                finishLatch.countDown();
            }
        };

        StreamObserver<Dashboard.DashboardRequest> requestObserver = asyncStub.sendTraceInfo(responseObserver);
        for(Trace.TraceRequest trace : traceData){
            try {
                Dashboard.DashboardRequest request = Dashboard.DashboardRequest.newBuilder()
                        .setServiceName(trace.getServiceName())
                        .setTimestamp(trace.getTimestamp())
                        .setMethod(trace.getMethod())
                        .setStatus(trace.getStatus())
                        .setMessage(trace.getMessage())
                        .build();
                requestObserver.onNext(request);
            } catch (Exception e) {
                System.err.println("Failed to send data to Python Server");
                requestObserver.onError(e);
            }
        }
        traceData.clear();
        requestObserver.onCompleted();
        if (finishLatch.await(1, TimeUnit.MINUTES)){
            System.out.println(">>> End Successful.");
        }
        channel.shutdown();
    }

    static class TraceServiceImpl extends TraceServiceGrpc.TraceServiceImplBase {

        @Override
        public StreamObserver<Trace.TraceRequest> streamTrace(StreamObserver<Trace.TraceResponse> responseObserver) {
            return new StreamObserver<>() {
                @Override
                public void onNext(Trace.TraceRequest request) {

                    String traceInfo = String.format("Service: %s | Timestamp: %d | Method : %s | Status: %s | Message: %s\n",
                            request.getServiceName(), request.getTimestamp(),
                            request.getMethod(), request.getStatus(), request.getMessage());

                    traceData.add(request);
                    System.out.println(traceInfo);
                }

                @Override
                public void onError(Throwable t) {
                    System.err.println("Error during streamTrace: " + t);
                    responseObserver.onError(t);
                }

                @Override
                public void onCompleted() {
                    Trace.TraceResponse response = Trace.TraceResponse.newBuilder()
                            .setStatus(200)
                            .setMessage("All traces received successfully")
                            .build();
                    responseObserver.onNext(response);
                    responseObserver.onCompleted();
                }
            };
        }
    }

}

 

Java 서버는 9090 포트에서 클라이언트 요청을 수신하고,

Python 서버는 9091 포트로 데이터를 전송한다. 로그 데이터는 traceData 리스트에 저장되며,

20초 간격으로 sendAllDataToPythonServer()를 실행해 Python 서버로 전송한 후 리스트를 비운다.

 

main 메서드에서는 ServerBuilder.forPort(9090)을 통해 gRPC 서버를 생성하고,

addService(new TraceServiceImpl())로 gRPC 서비스를 추가하며,

Timer를 사용해 주기적으로 데이터를 전송하는 타이머를 설정한다.

서버는 server.start()로 실행되며, server.awaitTermination()을 통해 종료될 때까지 대기한다.

 

sendAllDataToPythonServer()는 ManagedChannel을 생성해

Python 서버(localhost:9091)와 연결한 후, 비동기 방식으로 gRPC 호출을 수행하며,

응답이 도착하면 onNext, 오류 발생onError, 작업 완료onCompleted를 실행한다.

요청 전송은 traceData 리스트의 데이터를 DashboardRequest로 변환하여 전송하며,

전송 후 traceData.clear()로 리스트를 초기화한다.

CountDownLatch를 사용해 1분 내에 데이터 전송을 완료하며,

channel.shutdown()으로 Python 서버와의 연결을 종료한다.

 

TraceServiceImpl은 클라이언트로부터

데이터를 스트리밍 방식으로 수신하는 gRPC 서비스로,

streamTrace 메서드를 통해 클라이언트 요청을 처리하고,

수신한 데이터를 traceData 리스트에 저장하며, 요청 데이터를 콘솔에 출력한다.

스트리밍 중 오류가 발생하면 onError를 호출하여 클라이언트에 에러를 전달하며,

모든 요청 데이터를 성공적으로 수신한 후 상태 코드 200과 메시지를 포함한 응답을 반환한다.

 

전체 동작 흐름은 클라이언트가 streamTrace 메서드를 호출해

로그 데이터를 스트리밍 방식으로 전송하면,

서버가 이를 traceData 리스트에 저장하고, 타이머를 사용해 20초 간격으로

Python 서버로 데이터를 전송하는 구조이다. Python 서버는 데이터를 처리한 후 응답을 반환하며,

서버는 응답을 출력하거나 에러 발생 시 상태 메시지를 출력한다.

주요 필드 및 메서드 설명을 표로 정리하면 다음과 같다.

Field / Method Description
traceData 클라이언트로부터 수신한 로그 데이터를 저장하는 리스트.
Timer 주기적으로 Python 서버로 데이터를 전송하기 위한 타이머.
TraceServiceImpl.
streamTrace
클라이언트가 스트리밍 방식으로 요청 데이터를
서버로 전송할 때 호출되는 메서드.
sendAllDataTo
PythonServer
traceData 리스트의 데이터를 Python 서버로 전송.
ManagedChannel Python 서버와의 연결을 관리하는 객체.
CountDownLatch 비동기 데이터 전송이 완료될 때까지 대기하기 위한 동기화 도구.
onNext 서버가 클라이언트로부터 데이터를 수신하거나
Python 서버로부터 응답을 받을 때 호출.
onError 데이터 전송 중 에러 발생 시 호출.
onCompleted 데이터 수신 또는 전송 작업이 완료되었을 때 호출.

 

 

TraceClientCaller

public class TraceClientCaller {

    private final TraceServiceGrpc.TraceServiceStub asyncStub;
    private Long requestId = 0L;
    private static final Map<Integer, String> statusMap = new HashMap<>();

    static {
        statusMap.put(200, "Request completed successfully.");
        statusMap.put(404, "Resource not found.");
        statusMap.put(403, "Access denied: Unauthorized action.");
        statusMap.put(500, "Internal server error encountered.");
        statusMap.put(401, "Unauthorized: Please provide valid credentials.");
        statusMap.put(400, "Bad request: Check input parameters.");
        statusMap.put(201, "Resource created successfully.");
        statusMap.put(503, "Service unavailable: Please try again later.");
    }

    private final String[] methods = {
            "GET",
            "POST",
            "PUT",
            "PATCH",
            "DELETE"
    };
    public TraceClientCaller(ManagedChannel managedChannel) {
        asyncStub = TraceServiceGrpc.newStub(managedChannel);
    }

    public void sendClientStreamingAsync() throws InterruptedException {

        System.out.println(">>> Send Call");
        List<Trace.TraceRequest> traceRequestList = new ArrayList<>();
        for (int i=0;i<50;i++){
            Random random = new Random();
            int index = random.nextInt(methods.length);
            String selectedMethod = methods[index];
            long timestamp = System.currentTimeMillis();
            Object[] keys = statusMap.keySet().toArray();
            int randomStatus = (int) keys[random.nextInt(keys.length)];
            String randomMessage = statusMap.get(randomStatus);
            traceRequestList.add(
                    Trace.TraceRequest.newBuilder()
                            .setRequestId(++requestId)
                            .setServiceName("Web Application")
                            .setTimestamp(timestamp)
                            .setMethod(selectedMethod)
                            .setStatus(randomStatus)
                            .setMessage(randomMessage)
                            .build()
            );
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                System.err.println(e.getMessage());
            }
        }
        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<Trace.TraceResponse> responseObserver = new StreamObserver<>() {
            @Override
            public void onNext(Trace.TraceResponse traceResponse) {
                System.out.println(">>> Response Data : "+traceResponse);
            }

            @Override
            public void onError(Throwable t) {
                Status status = Status.fromThrowable(t);
                System.err.println(">>> Warning => [" + status.toString() + "]");
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {
                System.out.println(">>> Finished.");
                finishLatch.countDown();
            }
        };

        StreamObserver<Trace.TraceRequest> requestObserver = asyncStub.streamTrace(responseObserver);
        try {
            for (Trace.TraceRequest request: traceRequestList) {
                requestObserver.onNext(request);
                System.out.println(">>> Request Info : "
                        + request.getMethod() + " / " + request.getStatus() + " / " + request.getMessage());

                Thread.sleep(1000);
                if (finishLatch.getCount() == 0) {
                    System.out.println(">>> Stop the next request");
                    return;
                }
            }
        } catch (RuntimeException e) {
            System.err.println("Failed to send data to Java Server");
            requestObserver.onError(e);
        }
        requestObserver.onCompleted();
        if (finishLatch.await(1, TimeUnit.MINUTES)){
            System.out.println(">>> End Successful.");
        }
    }
}

 

TraceClientCaller 클래스는 gRPC 클라이언트 구현체로,

CentralTraceServer와 상호작용하여 여러 TraceRequest 객체를 비동기로 전송하고

서버의 응답을 처리하는 역할을 한다. Java gRPC 클라이언트 애플리케이션은 클라이언트가

실시간으로 추적 데이터를 서버로 전송하는 구조를 가진다.

ManagedChannel을 통해 비동기 gRPC 스텁을 생성하도록 한다.

HTTP 상태 코드와 메시지를 매핑한 statusMap과 GET, POST 등의 HTTP 메서드 배열을 이용해

랜덤한 요청 데이터를 생성한다. sendClientStreamingAsync() 메서드는 50개의 요청을 생성하여

gRPC 서버로 스트리밍 하며, StreamObserver를 사용해 서버 응답을 비동기적으로 처리한다.

요청 전송 중 예외 발생 시 오류 메시지를 출력하고,

CountDownLatch를 활용해 요청이 완료될 때까지 대기한다.

 

 

Python gRPC Server

다음으로 Java gRPC ServerPython gRPC Server와의 스트리밍을 위한 proto 파일이다.

아래 proto 파일은 위에서 사용한 proto 파일과 동일한 형태로,

파이썬 플러그인으로 컴파일하여 생성한다는 차이점만 존재한다.

syntax = "proto3";

message DashboardRequest {
    string service_name = 1;
    int64 timestamp = 2;
    string method = 3;
    int32 status = 4;
    string message = 5;
}

message DashboardResponse {
    int32 status = 1;
    string message = 2;
}

service DashboardService {
    rpc SendTraceInfo (stream DashboardRequest) returns (DashboardResponse);
}

 

아래는 Java gRPC Server로부터 데이터를 받아서 분석하는 Python gRPC Server 코드이다.

import pandas as pd
import grpc
import dashboard_pb2
import dashboard_pb2_grpc
from concurrent import futures
import threading
import requests


class DashboardServicer(dashboard_pb2_grpc.DashboardServiceServicer):

    def __init__(self):
        self.trace_data = []

    def SendTraceInfo(self, request_iterator, context):

        for request in request_iterator:
            try:
                trace_info = {
                    'service_name': request.service_name,
                    'timestamp': request.timestamp,
                    'method': request.method,
                    'status': request.status,
                    'message': request.message
                }
                self.trace_data.append(trace_info)
            except Exception as e:
                print(f"Error processing request: {e}")
                context.set_details('Error processing trace data')
                context.set_code(grpc.StatusCode.INTERNAL)
                return dashboard_pb2.DashboardResponse(status=500, message="Error processing trace data")

        response = dashboard_pb2.DashboardResponse(status=200, message="Trace data received successfully")
        threading.Thread(target=self.process_and_plot_data).start()

        return response

    def process_and_plot_data(self):
        if not self.trace_data:
            print("No trace data available to process and plot.")
            return

        df = pd.DataFrame(self.trace_data)
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

        print("\n===================")
        print("Trace Data Update!")
        print("===================")

        status_counts = df['status'].value_counts(normalize=True) * 100
        print("\n[ Status Code Proportion (%) ]")
        print("--------------------------------------------------")
        for status, percentage in status_counts.items():
            print(f"Status {status}: {percentage:.2f}%")

        method_counts = df['method'].value_counts()
        print("\n[ Request Method Frequency ] ")
        print("--------------------------------------------------")
        for method, count in method_counts.items():
            print(f"Method {method}: {count} requests")

        df = df.sort_values(by='timestamp')
        df['response_time_diff'] = df['timestamp'].diff().dt.total_seconds()
        average_response_time = df['response_time_diff'].mean()
        print("\n[ Average Response Time (seconds) ] ")
        print("--------------------------------------------------")
        print(f"Average Response Time: {average_response_time:.2f} seconds")

        def categorize_status(status):
            if 200 <= status < 300:
                return "2xx (Success)"
            elif 400 <= status < 500:
                return "4xx (Client Error)"
            elif 500 <= status < 600:
                return "5xx (Server Error)"
            else:
                return "Other"

        df['status_category'] = df['status'].apply(categorize_status)
        status_category_counts = df['status_category'].value_counts()
        print("\n[ Status Code Grouping ] ")
        print("--------------------------------------------------")
        for category, count in status_category_counts.items():
            print(f"{category}: {count} occurrences")

        message_counts = df['message'].value_counts()
        print("\n[ Message Frequency Analysis ] ")
        print("--------------------------------------------------")
        for message, count in message_counts.items():
            print(f"Message: '{message}' - {count} occurrences")

        response = requests.post('http://localhost:8080/dashboard', json=self.trace_data)
        print(f"\nPython Api server : {response.text}\n")


def server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
    dashboard_pb2_grpc.add_DashboardServiceServicer_to_server(DashboardServicer(), server)
    server.add_insecure_port('0.0.0.0:9091')
    print("Python Server started on port 9091")
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    server()

 

해당 서버는 gRPC 서버로서 동작하며, 클라이언트로부터 스트리밍 방식으로 로그 데이터를 수신하고,

이를 처리하여 분석한 뒤 특정 정보를 출력하거나 다른 서버로 데이터를 전송한다.

 

DashboardServicer 클래스는 gRPC 서비스 인터페이스를 구현하여 클라이언트의 요청을 처리하며,

수신된 로그 데이터를 리스트에 저장하고 이를 분석하여 통계를 출력한 후 외부 API로 전송한다.

gRPC 서버는 DashboardServicer 객체를 등록하고 9091 포트에서 요청을 수신하도록 설정되며,

클라이언트로부터 스트리밍 방식으로 로그 데이터를 수신하여

service_name, timestamp, method, status, message 등의 정보를 추출해 리스트에 추가한 뒤,

데이터 분석을 위한 process_and_plot_data 메서드를 별도의 스레드에서 실행한다.

 

해당 메서드는 trace_data를 Pandas DataFrame으로 변환한 후 상태 코드 비율,

HTTP 메서드 빈도, 평균 응답 시간 등을 계산하여 출력하며,

이를 JSON 형식으로 변환해 로컬 API 서버

http://localhost:8080/dashboard로 POST 요청을 전송한다. 

gRPC 서버는 ThreadPoolExecutor를 사용하여 동시 요청을 처리하며,

실행된 후 wait_for_termination()을 통해 지속적으로 요청을 수신한다.

 

전체적인 실행 흐름은 클라이언트가 여러 DashboardRequest 메시지를 스트리밍 방식으로 전송하면

서버가 이를 수신 및 저장하고, 데이터 분석 후 결과를 출력 및 외부 API로 전송하는 방식으로 동작하며,

이는 Java 클라이언트가 여러 서비스의 로그 데이터를 수집하여 Python 서버로 전송하고,

Python 서버가 이를 분석 및 시각화하여 상태 보고에 활용하는 구조로 적용될 수 있다.

 

 

Python Flask Server

from flask import Flask, request, render_template
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
import matplotlib

matplotlib.use('Agg')

app = Flask(__name__)
df = pd.DataFrame()

def update_data(new_data):
    global df
    new_df = pd.DataFrame(new_data)
    new_df['timestamp'] = pd.to_datetime(new_df['timestamp'], unit='ms')
    df = new_df

def plot_status_distribution():
    global df
    if df.empty:
        return None
    status_counts = df['status'].value_counts(normalize=True) * 100
    plt.figure(figsize=(8, 6))
    sns.barplot(x=status_counts.index, y=status_counts.values, hue=status_counts.index, palette="Blues_d", legend=False)
    plt.title('Status Code Proportion', fontsize=16)
    plt.xlabel('Status Code', fontsize=12)
    plt.ylabel('Percentage (%)', fontsize=12)
    plt.grid(axis='y', linestyle='--', alpha=0.7)

    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    img = base64.b64encode(buf.read()).decode('utf-8')
    buf.close()
    return img


def plot_method_frequency():
    global df
    if df.empty:
        return None
    method_counts = df['method'].value_counts()
    plt.figure(figsize=(8, 6))
    sns.barplot(x=method_counts.index, y=method_counts.values, hue=method_counts.index, palette="Oranges_d", legend=False)
    plt.title('Request Method Frequency', fontsize=16)
    plt.xlabel('Method', fontsize=12)
    plt.ylabel('Frequency', fontsize=12)
    plt.grid(axis='y', linestyle='--', alpha=0.7)

    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    img = base64.b64encode(buf.read()).decode('utf-8')
    buf.close()
    return img


def plot_avg_response_time():
    global df
    if df.empty:
        return None
    df = df.sort_values(by='timestamp')
    df['response_time_diff'] = df['timestamp'].diff().dt.total_seconds()
    average_response_time = round(df['response_time_diff'].mean(), 2)
    return average_response_time


def plot_status_category():
    global df
    if df.empty:
        return None
    def categorize_status(status):
        if 200 <= status < 300:
            return "2xx (Success)"
        elif 400 <= status < 500:
            return "4xx (Client Error)"
        elif 500 <= status < 600:
            return "5xx (Server Error)"
        else:
            return "Other"

    df['status_category'] = df['status'].apply(categorize_status)
    status_category_counts = df['status_category'].value_counts()

    plt.figure(figsize=(8, 6))
    sns.barplot(x=status_category_counts.index, y=status_category_counts.values, hue=status_category_counts.index, palette="Purples_d", legend=False)
    plt.title('Status Code Grouping', fontsize=16)
    plt.xlabel('Status Category', fontsize=12)
    plt.ylabel('Count', fontsize=12)
    plt.grid(axis='y', linestyle='--', alpha=0.7)

    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    img = base64.b64encode(buf.read()).decode('utf-8')
    buf.close()
    return img


def plot_message_frequency():
    global df
    if df.empty:
        return None
    message_counts = df['message'].value_counts()
    return message_counts.to_dict()



@app.route('/dashboard', methods=['POST'])
def update_data_route():
    global df
    new_data = request.json
    update_data(new_data)
    return "Data updated successfully!"


@app.route('/dashboard')
def dashboard():
    status_img = plot_status_distribution()
    method_img = plot_method_frequency()
    average_response_time = plot_avg_response_time()
    status_category_img = plot_status_category()
    message_frequency_data = plot_message_frequency()

    return render_template('dashboard.html',
                           status_img=status_img,
                           method_img=method_img,
                           average_response_time=average_response_time,
                           status_category_img=status_category_img,
                           message_frequency_data=message_frequency_data)


if __name__ == '__main__':
    app.run(debug=True, host="0.0.0.0", port=8080)

 

이 Flask 애플리케이션은 HTTP 요청을 처리하여 데이터를 업데이트하고,

시각화된 대시보드를 제공한다. 주요 구성 요소와 작동 방식은 다음과 같다.

 

Flask 라우트

  1. /dashboard (POST 요청)
    JSON 형식의 데이터를 받아 update_data()를 호출하여 데이터를 업데이트
    반환 메시지: "Data updated successfully!"
  2. /dashboard (GET 요청)
    대시보드 HTML 페이지를 렌더링 한다.

    각 시각화 함수에서 생성한 이미지를 Base64로 인코딩하여 HTML로 전달하고,
    평균 응답 시간과 메시지 빈도 데이터도 포함

 

시각화 결과를 이미지로 반환

  • 이미지 생성 및 변환
    • 모든 그래프는 Matplotlib를 통해 생성
    • io.BytesIO를 사용하여 그래프를 PNG 이미지로 변환하고,
      Base64로 인코딩하여 HTML에서 표시할 수 있는 데이터 URI 형식으로 전달

 

작동 원리

  1. POST 요청으로 데이터 업데이트
    gRPC 서버 또는 다른 소스에서 데이터를 수집하여
    /dashboard 엔드포인트로 POST 요청을 보낸다.
    수신된 데이터는 전역 데이터프레임에 저장된다.
  2. 대시보드 렌더링
    클라이언트가 /dashboard GET 요청을 보내면 Flask 앱은 시각화 결과를 생성한다.
    생성된 이미지는 Base64로 인코딩 되어 HTML에서 렌더링 되며,
    메시지 빈도는 JSON 형식으로 표시된다.

이 Flask 애플리케이션을 통해 /dashboard URL로 접속 시 데이터 대시보드를 렌더링 하며,

이 화면에서 실시간으로 수집된 트레이스 데이터를 시각적으로 확인할 수 있다.

대시보드는 여러 시각화된 그래프와 함께 주요 통계 정보를 제공하며 주요 쟁점은 다음과 같다.

  • 실시간 상태 모니터링: 상태 코드, 요청 메서드, 응답 시간 등을 대시보드에서 확인
  • 트래픽 분석: 메시지 빈도 및 상태 코드를 그룹화하여 트래픽 패턴을 파악
  • 시각화 업데이트: 새로운 데이터가 들어오면 대시보드가 자동으로 최신 데이터를 반영

 

초기화면 - 데이터 수집 전

  • 만약 애플리케이션이 처음 실행되었거나, 데이터가 아직 수집되지 않은 경우,
    대시보드 화면은 "No data available" 또는 "No trace data available"라는 메시지를 표시하여
    사용자에게 데이터가 없음을 명시한다.
  • 이 메시지는 화면 상단에 표시되며, 이후 데이터가 수집되면 자동으로
    대시보드가 갱신되어 수집된 데이터에 대한 시각화를 진행한다.

 

초기화면 - 데이터 수집 후

 

다음으로 대시보드의 화면 구성을 살펴보면 다음과 같다.

총 3개의 그래프와, 1개의 표, 응답 시간을 렌더링 된 화면에서 볼 수 있다.

 

  1. 상태 코드 분포 (Status Code Proportion)
    상태 코드별 비율을 나타내는 막대그래프
    200번대 성공 상태 코드의 비율, 400번대 클라이언트 오류,
    500번대 서버 오류 등의 분포를 볼 수 있다.
  2. 요청 메서드 빈도 (Request Method Frequency)
    HTTP 요청 메서드(예: GET, POST 등)의 빈도를 나타내는 막대그래프
  3. 평균 응답 시간 (Average Response Time)
    모든 요청의 응답 시간 차이를 계산하여 평균 응답 시간을 초 단위로 보여준다.
  4. 상태 코드 그룹화 (Status Code Grouping)
    상태 코드들을 성공(2xx), 클라이언트 오류(4xx), 서버 오류(5xx) 등의
    그룹으로 나누어 보여주는 그래프
  5. 메시지 빈도 분석 (Message Frequency Analysis)
    각 메시지의 빈도수가 표시된다.
    이를 통해 특정 오류 메시지나 로그 메시지가 얼마나 자주 발생했는지 파악

 

이 시스템은 gRPC 스트리밍을 사용하여

클라이언트의 요청 데이터를 실시간으로 수집하고 분석하는 구조로,

두 가지 독립적인 언어에서 플러그인 방식으로 쉽게 구현할 수 있다.

 

현재는 간단한 데모 클라이언트를 사용하고 있지만,

이를 확장한다면 Spring Boot를 활용해 실제 클라이언트 요청을 수집하는

중앙 서버를 배치하고, 이를 gRPC 서버로 전달하여 요청 데이터를 분석 및 처리한 후,

시각화하는 구조로 변경하여 프로덕션 환경에서의 데이터를 실시간으로 수집하고 처리할 수 있다.

이처럼 gRPC의 특징 및 장점짧은 대시 시간과 빠른 대역폭 통신을 활용하여

HTTP 프로토콜을 사용하지 않고 실시간 요청 추적 시스템을 더욱 효율적으로 구축할 수 있다.

 

반응형

'IT > Server' 카테고리의 다른 글

[Spring boot] MSA 환경 구성  (0) 2025.03.25
[Server] MSA 아키텍처  (2) 2025.03.18
[Server] REST API & gRPC  (2) 2024.12.26
[Server] Spring vs Node.js  (5) 2024.10.02
[Server] Nginx vs Apache  (2) 2024.09.30