일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- test
- unit
- container image #docker #layer #filesystem #content addressable
- 설계
- reactive
- Java
- docker
- spring cloud netflix
- spring cloud netflix zuul
- 단위테스트
- java #jvm #reference #gc #strong reference
- netflix eureka
- microservice architecture
- BFS
- springcloud
- forkandjoinpool #threadpool #jvm #async #non-blocking
- code refactoring
- 서비스스펙
- spring cloud
- Eureka
- zuul
- 탐색
- Spring Data Redis
- spring cloud netflix eureka
- api-gateway
- netflix
- Dynamic Routing
- dfs
- unittest
- Today
- Total
phantasmicmeans 기술 블로그
MongoDB CDC, Change Streams 본문
CDC(change data capture)는 데이터베이스 내 데이터에 대한 변경을 식별해 후속 처리를 자동화 하는 기술이자 개념이고, 단어 그대로 데이터의 변경본을 캡쳐한다
로 이해하면 더 직관적입니다.
보통 변경본 캡처 후 데이터 가공 및 타 데이터베이스로의 적재가 필요한 환경에서 주로 사용되고, 이벤트 기반의 아키텍처를 구성할때에도 자주 사용되는 것 같습니다.
이번 글에서는 MongoDB를 기준으로 변경본 캡쳐 이벤트를 subscribe 하는 방법 중 아래 2가지를 간단하게 공유 드리고자 합니다.
- Debezium connector for MongoDB
- MongoDB Change Streams
Debezium Kafka Connector
CDC 오픈소스 중 가장 많이 사용 되는 것이 Debezium 이고 보통 Kafka Connect와 함께 사용되며 아래 DB들을 지원합니다.
- MongoDB
- MySQL
- PostgreSQL
- SQL Server
- Oracle
- Db2
- Cassandra (Incubating)
- Vitess (Incubating)
출처 - https://debezium.io/documentation/reference/architecture.html
위 그림은 mysql / postgresql 에서의 데이터 변경을 kafka 메시지로 밀어 넣고 이후 타 db로 밀어 넣는 과정을 나타냅니다.
참고 - mysql은 binlog로 데이터 변경본을 추출합니다.
1. Debezium connector for MongoDB
몽고DB 에서도 데이터 변경 기록을 kafka events로 밀어 넣을 수 있는데요, 좀 더 디테일한 사항들은 아래를 참고하시면 됩니다.
특징
- Replica set / 샤딩 지원 O
- oplog tailing
캡쳐되어 kafka로 발행 되는 이벤트
아래 update events를 예시로 보면 patch
부분이 데이터의 변경 내역입니다. (replace 요청은 모든 필드가 들어옵니다)
- mysql의 update events는 before/after를 지원하지만, mongo는 아쉽게도 지원하지 않습니다.
2. MongoDB Change Streams
MongoDB의 Change Stream을 이용하면 update 이후의 full document
까지 받아볼 수 있습니다.
특징
- Replica set / 샤딩 지원 O
- 내부적으로
aggregation framework
사용 - mongo 4.0 이전의 버전은 read concern
majority
옵션이 필요
java/spring의 경우 change streams
를 활용하는 방법이 2가지이며 아래 문서를 참고하시면 됩니다
- spring data mongodb change streams
- Change Streams with MessageListener
- Reactive Change Streams
Reactive Change Streams 사용 예시
this.reactiveMongoTemplate.changeStream(Movie.class)
.withOptions(changeStreamOptionsBuilder -> changeStreamOptionsBuilder
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
.build())
.watchCollection("movie")
.filter(Criteria.where("title").is("test"))
.listen()
.doOnNext(this::print)
.blockLast();
캡쳐되어 발행 되는 이벤트 raw data
create events
{
"raw" : {
... 중략 ...
"destinationNamespaceDocument" : null,
"fullDocument" : {
"_id" : {
"timestamp" : 1628409207,
"date" : "2021-08-08T07:53:27.000+00:00"
},
"title" : "tttt",
"director" : "test",
"since" : "test",
"audienceCnt" : 10
},
"documentKey" : {
"_id" : {
"value" : {
"timestamp" : 1628409207,
"date" : "2021-08-08T07:53:27.000+00:00"
},
"bsonType" : "OBJECT_ID",
"double" : false,
"boolean" : false,
"binary" : false,
"number" : false,
"array" : false,
"null" : false,
"document" : false,
"string" : false,
"int32" : false,
"int64" : false,
"decimal128" : false,
"objectId" : true,
"dbpointer" : false,
"timestamp" : false,
"dateTime" : false,
"symbol" : false,
"regularExpression" : false,
"javaScript" : false,
"javaScriptWithScope" : false
}
},
"operationType" : "INSERT",
"updateDescription" : null,
"txnNumber" : null,
"lsid" : null,
"namespace" : {
"databaseName" : "test",
"collectionName" : "movie",
"fullName" : "test.movie"
},
"databaseName" : "test",
"destinationNamespace" : null
},
... 중략 ...
"operationType" : "INSERT",
"databaseName" : "test",
"collectionName" : "movie",
"timestamp" : "2021-08-08T07:53:27Z",
"body" : {
"id" : "610f8d77096ba332fc8d307a",
"title" : "tttt",
"director" : "test",
"since" : "test",
"audienceCnt" : 10
}
}
update events
{
"raw" : {
... 중략 ...
"destinationNamespaceDocument" : null,
"fullDocument" : {
"_id" : {
"timestamp" : 1628409207,
"date" : "2021-08-08T07:53:27.000+00:00"
},
"title" : "tttt",
"director" : "test",
"since" : "test",
"audienceCnt" : 50.0
},
... 중략 ...
"operationType" : "UPDATE",
"updateDescription" : {
"removedFields" : [ ],
"updatedFields" : {
"audienceCnt" : {
"value" : 50.0,
"bsonType" : "DOUBLE",
"double" : true,
"boolean" : false,
"binary" : false,
"number" : true,
"array" : false,
"null" : false,
"document" : false,
"string" : false,
"int32" : false,
"int64" : false,
"decimal128" : false,
"objectId" : false,
"dbpointer" : false,
"timestamp" : false,
"dateTime" : false,
"symbol" : false,
"regularExpression" : false,
"javaScript" : false,
"javaScriptWithScope" : false
}
}
},
"txnNumber" : null,
"lsid" : null,
"namespace" : {
"databaseName" : "test",
"collectionName" : "movie",
"fullName" : "test.movie"
},
"databaseName" : "test",
"destinationNamespace" : null
},
... 중략 ...
"operationType" : "UPDATE",
"databaseName" : "test",
"collectionName" : "movie",
"timestamp" : "2021-08-08T07:56:42Z",
"body" : {
"id" : "610f8d77096ba332fc8d307a",
"title" : "tttt",
"director" : "test",
"since" : "test",
"audienceCnt" : 50
}
}
remove events
{
"raw" : {
... 중략 ...
"destinationNamespaceDocument" : null,
"fullDocument" : null,
"documentKey" : {
"_id" : {
"value" : {
"timestamp" : 1628409207,
"date" : "2021-08-08T07:53:27.000+00:00"
},
"bsonType" : "OBJECT_ID",
"double" : false,
"boolean" : false,
"binary" : false,
"number" : false,
"array" : false,
"null" : false,
"document" : false,
"string" : false,
"int32" : false,
"int64" : false,
"decimal128" : false,
"objectId" : true,
"dbpointer" : false,
"timestamp" : false,
"dateTime" : false,
"symbol" : false,
"regularExpression" : false,
"javaScript" : false,
"javaScriptWithScope" : false
}
},
... 중략 ...
"operationType" : "DELETE",
"updateDescription" : null,
"txnNumber" : null,
"lsid" : null,
"namespace" : {
"databaseName" : "test",
"collectionName" : "movie",
"fullName" : "test.movie"
},
"databaseName" : "test",
"destinationNamespace" : null
},
... 중략 ...
"operationType" : "DELETE",
"databaseName" : "test",
"collectionName" : "movie",
"timestamp" : "2021-08-08T07:58:09Z",
"body" : null
}
직접 로직을 작성해서 kafka로 밀어 넣어주고 있는 부분들이 있기에 cdc/change streams를 고려하고 있고, 이를 잘 활용하면 이벤트 기반 작업을 좀 더 수월하게 진행할 수 있을 것 같습니다.
cdc 활용 사례
'Tech' 카테고리의 다른 글
BFS로 서비스 스펙 나름 재밌게 친 썰 (0) | 2022.07.05 |
---|---|
약 500건의 unit test를 작성하고 느낀점 (0) | 2022.07.05 |
Neo4j memory 구성 (0) | 2020.07.01 |
Kafka (0) | 2020.06.03 |
goroutine (0) | 2020.06.03 |