Kafka/Producer

BuiltIn Partitioner

재심 2023. 3. 5. 11:48

목차

    [BuiltIn Partitioner?]

    문제 제기 : https://issues.apache.org/jira/browse/KAFKA-10888?focusedCommentId=17285383&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17285383

    업데이트 patch : https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner

     

    Apache Kafka 3.3 버전에 추가된 것으로 기존 StickyPartitioner를 사용할 때 Broker Network 상태에 따라 메시지가 특정 파티션으로 몰리는 현상이 있을 수 있다고 한다. 이에 대한 이슈 제기가 있었고, 이 현상을 완화하기 위해 나왔다.

     

    BuiltIn Partitioner는 배치에 이미 유입된 메시지 현황을 파악해서 완화하는 것이므로 Partitioner Interface를 구현하지 않았다.

    이는 기존 Partitioner의 구현방식과 다르며 Builtin Partitioner는 Accumulator 수준에서 동작하게 된다.

     

    이슈 리포팅 사례

    • UniformStickyPartitioner 사용시 0에 가까운 linger.ms 옵션 사용시 파티션별 메시지 size 불균형 관찰

    문제점

    UniformStickyPartitioner의 naming과는 다르게 파티셔닝이 균일하게 되지 않은 것으로 나타남

     

    원인

    slower broker에 오히려 더 많은 레코드를 전송하고, 이것으로 인해 더 느려지고 레코드 파티셔닝이 더 왜곡됨

     

    stickyness time(지정된 파티션에 배치를 쌓아 전송하는 시간)은 broker latency의 역수(반비례)

    : (처리 속도가) 느린 broker는 배치 소비 속도가 느려서 더 "sticky"한데, latency를 고려하여 메시지 전송량을 조절하지는 않고 있음

     

    ex) 배치사이즈를 매우 작게 하겠다는 설정하의 예시

    linger.ms=0 설정(실제로는 delay 등으로 인해 1개 이상의 레코드로 배치가 형성될 수 있음) 하에

    어떤 파티션이 속한 브로커가 느려진다고 가정하면(리더 변경 또는 네트워크 문제)

    producer는 해당 파티션이 available 할 때 까지 batch에 record를 축적함

    나머지 정상 컨디션의 파티션의 경우에는, linger.ms=0 설정으로 인해 비교적 적은 수의 record로 생성된 batch 가 전송됨 

    → record 불균형

     

    0번 파티션이 느린 상황에서, 0번 파티션의 배치 size > 나머지 파티션의 배치 size 

    파티션 TotalBatches TotalBytes TotalRecords BytesPerBatch RecordsPerBatch
    0    1683          25953200   25228         15420.80     14.99        
    1    1713          7836878    4622          4574.94      2.70
    2    1711          7546212    4381          4410.41      2.56
     

    시나리오

    파티션이 3개가 있다고 가정하자. 이 때 파티션 배치는 아래와 같고, linger.ms=0, batch.size=16384를 사용한다고 가정한다.

    1.     메시지 "A"가 파티션0의 배치에 추가되었다.
    2.     linger.ms=0이라서 즉시 전송하려고 하였으나 브로커0의 네트워크 상황이 좋지않아 바로 보내지 못하고 좀 더 대기하게 되었다.
    3.     메시지 "B"가 유입되었다.
    4.     OnNewBatch가 호출된 상황이 아니므로 파티션0의 배치에 추가되었다.
    5.     메시지 "C"가 유입되었다.
    6.     마찬가지로 OnNewBatch가 호출된 상황이 아니므로 파티션0의 배치에 추가되었다.
    7.     1초가 지난 후 브로커0이 해당 배치를 받아주었다.
    8.     파티션0에는 3개의 메시지가 전송되었다.
    9.     메시지 "D"가 유입되었다.
    10.     이번에는 OnNewBatch가 호출된 상황 (배치가 방금 전송된 경우) 이므로 파티션1의 배치에 메시지 "D"가 추가되었다.
    11.     linger.ms=0이므로 즉시 전송시도를 하였고, 브로커1은 상태가 좋아서 바로 받아주었다.
    12.     파티션1에는 1개의 메시지가 유입되었다.
    13.     메시지 "E"가 유입되었다.
    14.     마찬가지로 OnNewBatch가 호출되었고, 파티션2의 배치에 쌓이며, 브로커2의 상태도 좋아서 바로 전송되었다.
    15.     파티션2에도 1개의 메시지가 유입되었다.
    16.     메시지 "F"가 유입되었다.
    17.     OnNewBatch가 호출되었고, 이번에는 파티션0의 배치에 쌓였다.
    18.     linger.ms=0 이지만 브로커0이 여전히 느려서 바로 전송하지 못하였다.
    19.     메시지 "G"가 유입되었다.
    20.     OnNewBatch 호출상황이 아니므로 파티션0의 배치에 쌓였다.
    21.     브로커0이 배치를 수신하였다.

    => 결과적으로 가장 느린 브로커0 에는 메시지가 5개있고, 빠른 브로커인 1,2는 메시지가 각각 1개씩 밖에 없는 상황이 연출되었다.

     

    [Version Compatibility]

    Custom Partitioner 

    • 기존의 Partitioner 인터페이스를 구현하는 방식으로는 3.3+ 에서 개선된 sticky + adaptive partitioning 로직을 반영하는 것은 불가능
    • BuiltInPartitioner를 customizing 하는 것도 거의 불가능하다고 판단(RecordAccumulator 를 수정하더라도 배포하여 사용할 수 없다고 판단)

     

    Confluent Version

    • Apache Kafka Client 3.3에 대응되는 Confluent Kafka 7.3 버전을 사용하더라도 3.3 이하의 Apache Kafka client를 사용하는 것에 문제가 없음
    • 추후 deprecated 인 Partitioner는 삭제될 것 같다는 내용 전달 받음

     

    [Configuration 변경사항 (기본값, custom partitioner를 지정한 경우엔 아래 설정은 무시됨)]

    UniformStickyPartitioner를 대체하는 기본 구성

    • partitioner.class = null
    • paritioner.ignore.keys = true

     

    partitioner.adaptive.partitioning.enable = true

    → producer가 broker 성능에 adaptive하게 동작. 더 빠른 broker에서 호스팅되는 paritition에 더 많은 메시지 전송. false인 경우, partition을 임의로 지정

     

     

    partitioner.availability.timeout.ms= 0

    → 파티션이 가용한지 판단하는 시간.

     partitioner.adaptive.partitioning.enable=true && partitioner.availability.timeout.ms > 0 인 경우, partitioner.availability.timeout.ms 동안 어떤 partition이 unavailable하면 사용할 수 없는 partition이 됨(리더 broker의 latency로 계산)

    → partitioner.adaptive.partitioning.enable = false이면 이 설정은 무시됨

     

    [변경사항]

    key가 없는 레코드에 대한 기본적인 partitioning임.

    partitioner.ignore.keys = true이면, key가 있는 레코드도 균등하게 분배됨.

     

    UniformSticky 방식 리뉴얼

    배치를 생성할 때 마다 파티션을 전환하는 대신(기존 방식:onNewBatch()), batch.size 만큼 파티션에 생성될 때마다 파티션을 전환한다.

     linger.ms가 작을 때는 batch.size에 못미치는 batch도 즉시 전송하고 partition switch를 했는데, batch.size만큼 생성되고나서 파티션 전환

    → sticky하면서도 레코드 불균형을 방지

    strictly uniform partitioning의 잠재적 단점

    • 어떤 broker가 throughput 감당을 못하면 records가 Accumulator에 쌓이고
    • 결국 buffer pool memory가 고갈됨
    • production 속도가 가장 느린 broker의 capacity로 조정됨

    Adaptive Partitioning

    RecordAccumulator에서 전송 대기중인 배치의 대기열 크기(broker load)를 알 수 있으므로 partitioner가 지정되지 않으면 RecordAccumulator에서 partitioning이 수행됨

    • 대기열 크기로 파티션 선정
      • 다음 partition 선정에 대기열 크기를 고려함(파티션 선정 확률은 대기열 크기의 역수에 비례함 : 대기열이 작은 파티션이 선정될 확률이 큼)
      • 대기열 이란 - ready 상태의 partition에 적재된 batch의 개수가 적을 수 록 대기열이 짧음
    • partitioner.availability.timeout.ms 로 파티션 선정
      • 전송할 배치가 있는 파티션이 위 값내에 준비가 안되면 unavailable 파티션으로 표시되고
      • 전송 가능할 때까지 선정이 안됨
    • Partition 인터페이스를 구현한 기존 partitioner는 queue size나 broker readiness를 모름
      • custom partitioner가 지정되지 않으면(BuiltinPartitioner), KafkaProducer class는 RecordAccumulator에서 BuiltInpartitioner로 partitioning logic을 수행함

    *Adaptive Partitioning은 key가 없는 메시지에 효과가 있고, key가 있으면 효과가 없음

     

    [호환성, 지원 중단 및 마이그레이션 계획]

    • DefaultPartitioner와 UniformStickyPartitioner는 deprecated됨
    • 사용자 지정 파티셔너를 지정하지 않은 사용자는 새 동작을 자동으로 얻는다.
      • DefaultPartitioner , UniformStickyPartitioner를 명시한 경우 사용 중단 경고가 표시되지만 동작에는 이상 없음
    • 업데이트된 동작을 가져오려면 그에 따라 구성을 업데이트해야 한다(UniformStickyPartitioner를 대체하려는 경우, partitioner.class 설정을 제거하고 선택적으로 partitioner.ignore.keys 를 'true'로 설정 )
    • Partitioner.onNewBatch()는 더 이상 사용되지 않음

     

    [Partitioner 수준에서 구현 시도]

    Partitioner 수준에서 BuiltInPartitioner 구현을 시도하였으나 아래 사유로 Accumulator 수준에서 구현으로 변경하였다고 한다.

     

    • partitioner 단에서는 partition에 대기중인 batch size를 알지 못함
    • 해당 파티션에 produce된 recordSize를 get하는 getRecordSize(), partition load에 따라서 next partition을 지정하는 nextPartition() callback을 구상(producer에서 다시 호출하도록)
    • producer와 partitioner를 분리하려는 시도였으나, 인터페이스의 복잡성 증가 + 이해하기 어려움
    • Partition 인터페이스의 캡슐화가 깨짐을 알았고 partitioning 논리의 일부를 producer에게도 위임
    public interface Partitioner extends Configurable, Closeable {
      
        /**
         * Callbacks from the producer
         */
        interface Callbacks {
           /**
             * Get record size in bytes.  The keyBytes and valueBytes may present skewed view of the number
             * of bytes produced to the partition.  In addition, the callback takes into account the following:
             *  1. Headers
             *  2. Record overhead
             *  3. Batch overhead
             *  4. Compression
             *
             * @param partition The partition we need the record size for
             * @return The record size in bytes
             */
           int getRecordSize(int partition);
      
            /**
             * Calculate the partition number.  The producer keeps stats on partition load
             * and can use it as a signal for picking up the next partition.
             *
             * @return The partition number, or -1 if not implemented or not known
             */
           default int nextPartition() {
               return -1;
           }
        }
      
       // ... <skip> ...
      
       /**
         * Compute the partition for the given record.
         *
         * @param topic The topic name
         * @param key The key to partition on (or null if no key)
         * @param keyBytes The serialized key to partition on( or null if no key)
         * @param value The value to partition on or null
         * @param valueBytes The serialized value to partition on or null
         * @param callbacks The record size and partition callbacks (see {@link Partitioner#Callbacks})
         * @param cluster The current cluster metadata
         */
        default int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                              Callbacks callbacks, Cluster cluster) {
            return partition(topic, key, keyBytes, value, valueBytes, cluster);
        }
      
       // ... <skip> ...
    }

     

    [코드 분석]

    Partition 지정하지 않으면 partitioner=null → RecordAccumulator내부에서 생성하는 BuiltInPartitioner 인스턴스가 사용됨

    sticky partition이 있다면(topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster)) 해당 partition에 대기중인 batch에 append 시도

     

    public RecordAppendResult append(String topic, 
int partition, 
long timestamp, 
byte[] key, 
byte[] value, 
Header[] headers, 
AppendCallbacks callbacks, 
long maxTimeToBlock, 
boolean abortOnNewBatch, 
long nowMs, 
Cluster cluster) throws InterruptedException {

        TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k - > new TopicInfo(logContext, k, batchSize));

 // We keep track of the number of appending thread to make sure we do not miss batches in
        
 // abortIncompleteBatches().
        

        appendsInProgress.incrementAndGet();

        ByteBuffer buffer = null;

        if (headers == null) headers = Record.EMPTY_HEADERS;

        try {
 // Loop to retry in case we encounter partitioner's race conditions.
            

            while (true) {
 // If the message doesn't have any partition affinity, so we pick a partition based on the broker
                
 // availability and performance.  Note, that here we peek current partition before we hold the
                
 // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
                
 // deque lock.
                

                final BuiltInPartitioner.StickyPartitionInfo partitionInfo;

                final int effectivePartition;

                if (partition == RecordMetadata.UNKNOWN_PARTITION) {

                    partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);

                    effectivePartition = partitionInfo.partition();

                } else {

                    partitionInfo = null;

                    effectivePartition = partition;

                }

 // Now that we know the effective partition, let the caller know.
                

                setPartition(callbacks, effectivePartition);

 // check if we have an in-progress batch
                

                Deque < ProducerBatch > dq = topicInfo.batches.computeIfAbsent(effectivePartition, k - > new ArrayDeque < > ());

                synchronized(dq) {
 // After taking the lock, validate that the partition hasn't changed and retry.
                    

                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
 continue;


                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);

                    if (appendResult != null) {
 // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                        

                        boolean enableSwitch = allBatchesFull(dq);

                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);

                        return appendResult;

                    }

                }

 // we don't have an in-progress record batch try to allocate a new batch
                

                if (abortOnNewBatch) {
 // Return a result that will cause another call to append.
                    

                    return new RecordAppendResult(null, false, false, true, 0);

                }


                if (buffer == null) {

                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();

                    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

                    log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
 // This call may block if we exhausted buffer space.
                    

                    buffer = free.allocate(size, maxTimeToBlock);
 // Update the current time in case the buffer allocation blocked above.
                    
 // NOTE: getting time may be expensive, so calling it under a lock
                    
 // should be avoided.
                    

                    nowMs = time.milliseconds();

                }


                synchronized(dq) {
 // After taking the lock, validate that the partition hasn't changed and retry.
                    

                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
 continue;


                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
 // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                    

                    if (appendResult.newBatchCreated)
 buffer = null;
 // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                    

                    boolean enableSwitch = allBatchesFull(dq);

                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);

                    return appendResult;

                }

            }

        } finally {

            free.deallocate(buffer);

            appendsInProgress.decrementAndGet();

        }

    }

    → batch가 꽉 찼다면 builtInPartitioner.updatePartitionInfo() 에서 partition을 바꿀지 결정하는데

    • BuiltInPartitioner에서 StickyPartitionInfo라는 인스턴스에서 현재 sticky partition 정보를 가지고 있음
    • partition 번호와 partition에 produce된 messageByte
    void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, Cluster cluster, boolean enableSwitch) {
 // partitionInfo may be null if the caller didn't use built-in partitioner.
        

        if (partitionInfo == null)
 return;


        assert partitionInfo == stickyPartitionInfo.get();

        int producedBytes = partitionInfo.producedBytes.addAndGet(appendedBytes);

 // We're trying to switch partition once we produce stickyBatchSize bytes to a partition
     
        if (producedBytes >= stickyBatchSize * 2) {

            log.trace("Produced {} bytes, exceeding twice the batch size of {} bytes, with switching set to {}", 
producedBytes, stickyBatchSize, enableSwitch);

        }


        if (producedBytes >= stickyBatchSize && enableSwitch || producedBytes >= stickyBatchSize * 2) {
 // We've produced enough to this partition, switch to next.
            

            StickyPartitionInfo newPartitionInfo = new StickyPartitionInfo(nextPartition(cluster));

            stickyPartitionInfo.set(newPartitionInfo);

        }

    }

    PartitionLoadStats

    • 해당 토픽내의 partition에 대기중인 batch 정보를 가지고 있음(대기열)
    • 대기중인 batch가 적을 수 록 partition이 nextPartition이 될 확률이 가장 크게하는 로직
    private int nextPartition(Cluster cluster) {

        int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());

 // Cache volatile variable in local variable.
        

        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;

        int partition;


        if (partitionLoadStats == null) {
 // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
            
 // partition based on uniform distribution.
            

            List < PartitionInfo > availablePartitions = cluster.availablePartitionsForTopic(topic);

            if (availablePartitions.size() > 0) {

                partition = availablePartitions.get(random % availablePartitions.size()).partition();

            } else {
 // We don't have available partitions, just pick one among all partitions.
                

                List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);

                partition = random % partitions.size();

            }

        } else {
 // Calculate next partition based on load distribution.
            
 // Note that partitions without leader are excluded from the partitionLoadStats.
            

            assert partitionLoadStats.length > 0;


            int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;

            int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];

 // By construction, the cumulative frequency table is sorted, so we can use binary
            
 // search to find the desired index.
            

            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);

 // binarySearch results the index of the found element, or -(insertion_point) - 1
            
 // (where insertion_point is the index of the first element greater than the key).
            
 // We need to get the index of the first value that is strictly greater, which
            
 // would be the insertion point, except if we found the element that's equal to
            
 // the searched value (in this case we need to get next).  For example, if we have
            
 //  4 5 8
            
 // and we're looking for 3, then we'd get the insertion_point = 0, and the function
            
 // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're looking for 4, we'd
            
 // get 0, and we need the next one, so adding 1 works here as well.
            

            int partitionIndex = Math.abs(searchResult + 1);

            assert partitionIndex < partitionLoadStats.length;

            partition = partitionLoadStats.partitionIds[partitionIndex];

        }


        log.trace("Switching to partition {} in topic {}", partition, topic);

        return partition;

    }

     

    batch를 전송할 다음 partition 번호를 계산하는 logic

    현재 batch size : 각 파티션에 에 담겨있는 batch size(파티션 위치는 랜덤이고, 이 배열에 대응되는 partition 번호를 관리하는 배열이 따로 있음)

    • 0,  3,  1,  6,  2

    찾아야 하는 것 : 다음 데이터가 어느 partition의 batch 에 들어가야할지 찾아야함 . 

    현재 batch size 중 가장 큰 값은 6 이고, 파티션들의 batch size를 고르게 하려면 모두 최소 7 의 batch size를 가지면 됨  (Max value (6) + 들어갈 위치 (1) )

    가용가능한 batch size : (파티션들의 batch size를 고르게 하는 최소 batch size - 각 파티션의 batch size )  

    • 7,  4,  6,  1,  5    ( 데이터가 들어갈 수 있는 batch 크기 ) 

    위 배열을 왼쪽부터 누적하여 더해 하나의 배열로 만들면(batch 크기 개념은 유지하되, 정렬된 상태를 만들기 위함) idx는 아래와 같이 됨  ( 0 + 7 / 7 + 4 / 11 + 6 ... 이전까지의 Idx + 현재 batch size )

    • 7, 11, 17, 18, 23 

     

    Random % 23 하였을때, 이번 batch가 cumulativeFrequencyTables의 첫번째 파티션 에 들어갈 확률 = 7/23 ( 0 ~ 6 idx)
    두번째 파티션에 들어갈 확률 = 4/23 ( 7 ~ 10 Idx)

     

    따라서 위 연산을 이용하여 batch를 가장 적게 가진 파티션에 다음 batch가 들어갈 확률이 높다.

    → 더 많은 레코드를 생산함에 따라서 uniform한 batch가 생성된다

    private int nextPartition(Cluster cluster) {

        int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());

 // Cache volatile variable in local variable.
        

        PartitionLoadStats partitionLoadStats = this.partitionLoadStats;

        int partition;


        if (partitionLoadStats == null) {
 // We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
            
 // partition based on uniform distribution.
            

            List < PartitionInfo > availablePartitions = cluster.availablePartitionsForTopic(topic);

            if (availablePartitions.size() > 0) {

                partition = availablePartitions.get(random % availablePartitions.size()).partition();

            } else {
 // We don't have available partitions, just pick one among all partitions.
                

                List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);

                partition = random % partitions.size();

            }

        } else {
 // Calculate next partition based on load distribution.
            
 // Note that partitions without leader are excluded from the partitionLoadStats.
            

            assert partitionLoadStats.length > 0;


            int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;

            int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];

 // By construction, the cumulative frequency table is sorted, so we can use binary
            
 // search to find the desired index.
            

            int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);

 // binarySearch results the index of the found element, or -(insertion_point) - 1
            
 // (where insertion_point is the index of the first element greater than the key).
            
 // We need to get the index of the first value that is strictly greater, which
            
 // would be the insertion point, except if we found the element that's equal to
            
 // the searched value (in this case we need to get next).  For example, if we have
            
 //  4 5 8
            
 // and we're looking for 3, then we'd get the insertion_point = 0, and the function
            
 // would return -0 - 1 = -1, by adding 1 we'd get 0.  If we're looking for 4, we'd
            
 // get 0, and we need the next one, so adding 1 works here as well.
            

            int partitionIndex = Math.abs(searchResult + 1);

            assert partitionIndex < partitionLoadStats.length;

            partition = partitionLoadStats.partitionIds[partitionIndex];

        }


        log.trace("Switching to partition {} in topic {}", partition, topic);

        return partition;

    }

     

    위의 과정까지가 메시지를 전송할 partition 선정, batch 생성 과정이고

    위 과정이 끝나고 실제로 전송하는 Sender 스레드의 run()이 호출하는 메서드에서 ready 상태인 partition을 구하는 단계가 있음

     : NodeLatency(Broker latency) >  partitioner.availability.timeout.ms 인 node에 속한 partition에 대기중인 batch는 전송에서 제외하는 로직이 있음 

    (3.3 미만 버전에는 해당 로직이 없음)