Avro를 사용해 Kafka message를 송수신 해보자

IDL

Avro를 설명하기 전, 인터페이스 정의 언어(Interface Description Language, 이하 IDL)에 대해서 먼저 간략하게 말씀을 드리겠습니다.

IDL은 소프트웨어 컴포넌트간 통신시 사용되는 인터페이스를 정의 한 언어입니다.

동일한 언어를 사용하는 소프트웨어 (Java ↔ Java) 뿐만 아니라 다른 언어를 사용하는 소프트웨어 (Java ↔ C++, Python ↔ Java 등) 간에도 언어중립적인 방법으로 인터페이스를 표현해두고 통신을 하는 개념입니다.

대표적인 IDL 로는 아래들이 있습니다.

Google - protobuf facebook - thrift apache - avro


Avro

1

  • 아파치 하둡프로젝트에서 개발된 RPC / serialize 프레임워크
  • 바이너리 인코딩 / JSON 인코딩 모두 지원
  • JSON 으로 작성하는 스키마 파일
  • C, C++, C#, Go, 하스켈, 자바, 펄, PHP, 파이썬, 루비, 스칼라 언어에서 API 를 제공함
  • 하둡, 스파크, bigquery, Vertica, Logstash 등 많은 소프트웨어들에서 통신시 권장하는 방식으로 사용되며 쉽게 사용 할 수 있도록 plugin 을 지원하고 있음

Avro 스키마 (avsc)

Avro 에서 IDL로써 사용되는 Avro 스키마 파일은 Json 포맷으로 다음과 같은 형태로 정의됩니다.

// User.avsc
 
 
{
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"]}
    ]
}

Java gradle Avro 예제코드

Spring gradle 환경에서 Avro를 쉽게 사용 할 수 있도록 avro-gradle-plugin이 제공되고 있습니다.

(https://github.com/davidmc24/gradle-avro-plugin)

위 플러그인 개발사이트에서 제공하는 예제 스키마 Cat.avsc를 실제 Spring project에 적용하여 코드를 작성해보도록 하겠습니다.

Avro code generate

Java 에서 Avro를 사용하기 위해서는 먼저 avro 스키마 파일을 Java class 파일로 생성해주는 작업이 필요합니다.
가장먼저 할 일은 Avro 스키마 파일 (avsc)를 ‘src/main/avro’ 폴더 아래 위치시키는 것 입니다.

2

Avro 스키마 파일 (Cat.avsc)

// Cat.avsc
 
{
    "name": "Cat",
    "namespace": "avro.com.wmp.catalog",
    "type": "record",
    "fields" : [
        {
            "name": "breed",
            "type": {
                "name": "Breed",
                "type": "enum",
                "symbols" : [
                    "ABYSSINIAN", "AMERICAN_SHORTHAIR", "BIRMAN", "MAINE_COON", "ORIENTAL", "PERSIAN", "RAGDOLL", "SIAMESE", "SPHYNX"
                ]
            }
        }
    ]
}

다음은 avro-gradle-plugin을 사용하기 위한 설정입니다.

gradle plugin 설정

// settings.gradle
 
pluginManagement {
    repositories {
        gradlePluginPortal()
        jcenter()
        maven {
            name "JCenter Gradle Plugins"
            url  "https://dl.bintray.com/gradle/gradle-plugins"
        }
    }
}
// build.gradle
 
plugins {
    id "com.commercehub.gradle.plugin.avro" version "0.22.0"
}
 
dependencies {
    implementation "org.apache.avro:avro:1.10.1"
}

자, 이제 plugin만 등록해주고 gradle build를 해주게되면

avro gradle plugin이 동작해 generate-avro task를 거쳐 build 파일내에 클래스파일이 생성됩니다.

gradle build

3

build 과정을 통해 Cat 클래스파일이 생성되어 프로젝트내에서 Cat 객체를 선언하여 사용할 수 있는 상태가 되었습니다.


Kafka에서 Avro 사용하기

Avro를 사용하는 이유중 한가지는, 정의된 IDL로 Serialize 할 수 있기 때문입니다.
Kafka에서 Avro Serialize를 사용하기위해서는 메세지송수신시 Avro Serializer를 등록하여 사용 할 수 있습니다.

아래 코드를 통해 Kafka에 avro Serilizer/Deserializer를 등록하여 사용 할 수 있습니다.

// AvroSerializer.java


@Log4j2
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, T payload) {
        byte[] bytes = null;
        try {
            if (payload != null) {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
                DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(payload.getSchema());
                datumWriter.write(payload, binaryEncoder);
                binaryEncoder.flush();
                byteArrayOutputStream.close();
                bytes = byteArrayOutputStream.toByteArray();
            }
        } catch (Exception e) {
            log.error("Unable to serialize", e);
        }
        return bytes;
    }

    @Override
    public void close() {

    }
}

// AvroDeserializer.java


@Log4j2
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer {

    protected final Class<T> targetType;

    public AvroDeserializer(Class<T> targetType) {
        this.targetType = targetType;
    }

    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        T returnObject = null;

        try {

            if (data != null) {
                DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());
                Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
                returnObject = (T) datumReader.read(null, decoder);
            }
        } catch (Exception e) {
            log.error("Unable to Deserialize", e);
        }

        return returnObject;
    }

    @Override
    public void close() {

    }
}
// KafkaConfig.java

@Configuration
public class KafkaConfig {

    @Value(value = "${kafka.bootstrap.address}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Cat> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Cat> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

사실, Serilizer/Deserializer를 직접 등록하는것 보다 더 간편한방법으로 Confluent사에서 제공하는 avro-kafka-connector 를 이용하면 좀 편하게 설정 가능하지만, 스키마 레지스트리를 사용하는 경우에만 connector를 이용 할 수 있습니다.


Avro 스키마 파일을 Git으로 관리하기

스키마 파일을 프로젝트마다 관리하게되면 정합성의 문제에 빠지게 될 수 있습니다.

이러한 점을 해결하기위해 Confluent사 에서는 Avro 스키마 레지스트리를 제공하고있으나 그보다 간단하게 스키마 파일을 git에 올려두고 build 할때 마다 Avro 스키마 파일을 최신화하여 사용 할 수 있도록 task 를 만들어 보았습니다.

// build.gradle
 
 
plugins {
    id "de.undercouch.download" version "4.1.1"
}
 
def avroResourceDir = "$rootDir/src/main/avro"
 
task makeAvroDir() {
    if (!new File(avroResourceDir).exists()) {
        mkdir avroResourceDir
    }
}
 
task downloadAvro(type: Download) {
    dependsOn makeAvroDir
    src 'https://git-registry/Cat.avsc'
    dest avroResourceDir
}
 
task buildWithAvro() {
    dependsOn clean, downloadAvro, build
    downloadAvro.mustRunAfter clean
    build.mustRunAfter downloadAvro
}

위의 groovy 코드 로직의 흐름은 다음과 같습니다

  • clean
  • src/main/avro 디렉토리 생성
  • avro 스키마 파일 다운로드
  • build

이제 위 태스크를 사용하여 프로젝트별로 스키마를 관리하는것이 아니라
build 할 때마다 git 에서 최신화된 Avro 스키마를 가져와서 사용 할 수 있습니다.

./gradlew buildWithAvro