phantasmicmeans 기술 블로그

MongoDB CDC, Change Streams 본문

Tech

MongoDB CDC, Change Streams

phantasmicmeans 2021. 11. 12. 12:34

CDC(change data capture)는 데이터베이스 내 데이터에 대한 변경을 식별해 후속 처리를 자동화 하는 기술이자 개념이고, 단어 그대로 데이터의 변경본을 캡쳐한다 로 이해하면 더 직관적입니다.

 

보통 변경본 캡처 후 데이터 가공 및 타 데이터베이스로의 적재가 필요한 환경에서 주로 사용되고, 이벤트 기반의 아키텍처를 구성할때에도 자주 사용되는 것 같습니다.

 

이번 글에서는 MongoDB를 기준으로 변경본 캡쳐 이벤트를 subscribe 하는 방법 중 아래 2가지를 간단하게 공유 드리고자 합니다.

 

  • Debezium connector for MongoDB
  • MongoDB Change Streams

Debezium Kafka Connector

CDC 오픈소스 중 가장 많이 사용 되는 것이 Debezium 이고 보통 Kafka Connect와 함께 사용되며 아래 DB들을 지원합니다.

  • MongoDB
  • MySQL
  • PostgreSQL
  • SQL Server
  • Oracle
  • Db2
  • Cassandra (Incubating)
  • Vitess (Incubating)

image


출처 - https://debezium.io/documentation/reference/architecture.html

위 그림은 mysql / postgresql 에서의 데이터 변경을 kafka 메시지로 밀어 넣고 이후 타 db로 밀어 넣는 과정을 나타냅니다.

참고 - mysql은 binlog로 데이터 변경본을 추출합니다.

1. Debezium connector for MongoDB

몽고DB 에서도 데이터 변경 기록을 kafka events로 밀어 넣을 수 있는데요, 좀 더 디테일한 사항들은 아래를 참고하시면 됩니다.

특징

  • Replica set / 샤딩 지원 O
  • oplog tailing

캡쳐되어 kafka로 발행 되는 이벤트

아래 update events를 예시로 보면 patch 부분이 데이터의 변경 내역입니다. (replace 요청은 모든 필드가 들어옵니다)

  • mysql의 update events는 before/after를 지원하지만, mongo는 아쉽게도 지원하지 않습니다.

 

2. MongoDB Change Streams

MongoDB의 Change Stream을 이용하면 update 이후의 full document 까지 받아볼 수 있습니다.

특징

  • Replica set / 샤딩 지원 O
  • 내부적으로 aggregation framework 사용
  • mongo 4.0 이전의 버전은 read concern majority 옵션이 필요

java/spring의 경우 change streams를 활용하는 방법이 2가지이며 아래 문서를 참고하시면 됩니다

 

Reactive Change Streams 사용 예시

this.reactiveMongoTemplate.changeStream(Movie.class)
        .withOptions(changeStreamOptionsBuilder -> changeStreamOptionsBuilder
                .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                .build())
        .watchCollection("movie")
        .filter(Criteria.where("title").is("test"))
        .listen()
        .doOnNext(this::print)
        .blockLast();

캡쳐되어 발행 되는 이벤트 raw data

 

create events
{
  "raw" : {
    ... 중략 ... 
    "destinationNamespaceDocument" : null,
    "fullDocument" : {
      "_id" : {
        "timestamp" : 1628409207,
        "date" : "2021-08-08T07:53:27.000+00:00"
      },
      "title" : "tttt",
      "director" : "test",
      "since" : "test",
      "audienceCnt" : 10
    },
    "documentKey" : {
      "_id" : {
        "value" : {
          "timestamp" : 1628409207,
          "date" : "2021-08-08T07:53:27.000+00:00"
        },
        "bsonType" : "OBJECT_ID",
        "double" : false,
        "boolean" : false,
        "binary" : false,
        "number" : false,
        "array" : false,
        "null" : false,
        "document" : false,
        "string" : false,
        "int32" : false,
        "int64" : false,
        "decimal128" : false,
        "objectId" : true,
        "dbpointer" : false,
        "timestamp" : false,
        "dateTime" : false,
        "symbol" : false,
        "regularExpression" : false,
        "javaScript" : false,
        "javaScriptWithScope" : false
      }
    },
    "operationType" : "INSERT",
    "updateDescription" : null,
    "txnNumber" : null,
    "lsid" : null,
    "namespace" : {
      "databaseName" : "test",
      "collectionName" : "movie",
      "fullName" : "test.movie"
    },
    "databaseName" : "test",
    "destinationNamespace" : null
  },
 ... 중략 ... 
  "operationType" : "INSERT",
  "databaseName" : "test",
  "collectionName" : "movie",
  "timestamp" : "2021-08-08T07:53:27Z",
  "body" : {
    "id" : "610f8d77096ba332fc8d307a",
    "title" : "tttt",
    "director" : "test",
    "since" : "test",
    "audienceCnt" : 10
  }
}

 

update events
{
  "raw" : {
     ... 중략 ... 
    "destinationNamespaceDocument" : null,
    "fullDocument" : {
      "_id" : {
        "timestamp" : 1628409207,
        "date" : "2021-08-08T07:53:27.000+00:00"
      },
      "title" : "tttt",
      "director" : "test",
      "since" : "test",
      "audienceCnt" : 50.0
    },
    ... 중략 ... 
    "operationType" : "UPDATE",
    "updateDescription" : {
      "removedFields" : [ ],
      "updatedFields" : {
        "audienceCnt" : {
          "value" : 50.0,
          "bsonType" : "DOUBLE",
          "double" : true,
          "boolean" : false,
          "binary" : false,
          "number" : true,
          "array" : false,
          "null" : false,
          "document" : false,
          "string" : false,
          "int32" : false,
          "int64" : false,
          "decimal128" : false,
          "objectId" : false,
          "dbpointer" : false,
          "timestamp" : false,
          "dateTime" : false,
          "symbol" : false,
          "regularExpression" : false,
          "javaScript" : false,
          "javaScriptWithScope" : false
        }
      }
    },
    "txnNumber" : null,
    "lsid" : null,
    "namespace" : {
      "databaseName" : "test",
      "collectionName" : "movie",
      "fullName" : "test.movie"
    },
    "databaseName" : "test",
    "destinationNamespace" : null
  },
  ... 중략 ... 
  "operationType" : "UPDATE",
  "databaseName" : "test",
  "collectionName" : "movie",
  "timestamp" : "2021-08-08T07:56:42Z",
  "body" : {
    "id" : "610f8d77096ba332fc8d307a",
    "title" : "tttt",
    "director" : "test",
    "since" : "test",
    "audienceCnt" : 50
  }
}

 

remove events
{
  "raw" : {
    ... 중략 ... 
    "destinationNamespaceDocument" : null,
    "fullDocument" : null,
    "documentKey" : {
      "_id" : {
        "value" : {
          "timestamp" : 1628409207,
          "date" : "2021-08-08T07:53:27.000+00:00"
        },
        "bsonType" : "OBJECT_ID",
        "double" : false,
        "boolean" : false,
        "binary" : false,
        "number" : false,
        "array" : false,
        "null" : false,
        "document" : false,
        "string" : false,
        "int32" : false,
        "int64" : false,
        "decimal128" : false,
        "objectId" : true,
        "dbpointer" : false,
        "timestamp" : false,
        "dateTime" : false,
        "symbol" : false,
        "regularExpression" : false,
        "javaScript" : false,
        "javaScriptWithScope" : false
      }
    },  
    ... 중략 ... 
    "operationType" : "DELETE",
    "updateDescription" : null,
    "txnNumber" : null,
    "lsid" : null,
    "namespace" : {
      "databaseName" : "test",
      "collectionName" : "movie",
      "fullName" : "test.movie"
    },
    "databaseName" : "test",
    "destinationNamespace" : null
  },
  ... 중략 ... 
  "operationType" : "DELETE",
  "databaseName" : "test",
  "collectionName" : "movie",
  "timestamp" : "2021-08-08T07:58:09Z",
  "body" : null
}

 

직접 로직을 작성해서 kafka로 밀어 넣어주고 있는 부분들이 있기에 cdc/change streams를 고려하고 있고, 이를 잘 활용하면 이벤트 기반 작업을 좀 더 수월하게 진행할 수 있을 것 같습니다.

 

cdc 활용 사례

'Tech' 카테고리의 다른 글

BFS로 서비스 스펙 나름 재밌게 친 썰  (0) 2022.07.05
약 500건의 unit test를 작성하고 느낀점  (0) 2022.07.05
Neo4j memory 구성  (0) 2020.07.01
Kafka  (0) 2020.06.03
goroutine  (0) 2020.06.03
Comments