NiFi 관련 사내 세미나도 마무리되었고, 이제 NiFi로 유연하게 문제를 해결해야할 때인 것 같다

그래서 이번에는 NiFi에서 처리한 데이터를 Enrich하는 방법으로 DistributedMapCache Processor를 사용해서 해결하고자한다

매번 DB가서 해당 메타정보를 가져온다던지, 아니면 배치로 테이블로 Join해서 enrich하는 방법도 있겠지만, Data Flow을 한 곳에서 처리하고 싶어서 !


DistributedMapCache Processor

DistributedMapCache를 사용하기 위해 필요한 Processor 및 Controller Services

Processor

  • PutDistributedMapCache
  • FetchDistributedMapCache

Controller Service

  • DistributedMapCacheClientService
  • DistributedMapCacheServer

PutDistributedMapCache Processor

  • 캐시할 key/value을 넣는 Processor
  • key는 flowfile의 Attribute값
    • Processor의 ‘Cache Entry Identifier’에 명시한 Attribute
  • value는 flowfile의 Content값

  • github

FetchDistributedMapCache Processor

  • key,flowfile의 Attribute값으로 값을 가져와서 새로운 Attribute로 붙임
    • key는 Processor의 ‘Cache Entry Identifier’에 명시한 Attribute
    • cache된 정보를 가져와 추가할 Attribute 이름 ‘Put Cache Value In Attribute’
  • github

DistributedMapCacheClientService (Controller Service)

  • DistributedMapCacheServer와 통신하는 Client
  • Cluster끼리 Cache된 Map 공유하기 위함
  • 설정값은 DistributedMapCacheServer의 hostname과 port만 설정함
    • Cluster 환경에서는 hostname을 localhost로 설정
  • github

DistributedMapCacheServer (Controller Service)

  • Socket으로 캐시된 맵에 접근할 수 있게함
  • DistributedMapCacheClientService과 상호작용
  • 실행하고 꼭 확인해보자
    • netstat -plnt 로 해당 port가 열렸는지.. (default : 4557)
  • Persistence Directory
    • 해당 경로를 지정하면 캐시된 정보를 디스크에 저장함
    • 설정하지 않으면, 메모리에만 저장되고 관리
  • Maximum Cache Entries 설정 가능
    • Cache에 최대 몇개의 데이터를 가지고 있을 것인가
    • 소스코드를 보니, 여기서 설정한 값으로 Map 만들더라..

당연히 Eviction 전략도 있음

  • Eviction Strategy
    • FIFO
    • Least Recently Used (LRU)
    • Least Frequently Used (LFU)
  • github

모 여기까지는 사용법이고, 내가 재밌게 봤던 것은 Cache하는 자료구조는 무엇이고, Eviction 전략을 어떻게 구현했는가 궁금했음

캐시에 사용되는 Map은 HashMap을 사용하고, Eviction을 처리하기 위해서는 ConcurrentSkipListMap 사용함

  • ConcurrentSkipListMap에 Eviction 전략에 따라서 각 Comparator를 구현해놓았음!

간단하게 Map 만들고, Evict하는 함수만 보면 다음과 같다

  • 생각보다 소스코드가 간결해서 좋았음!
public class SimpleMapCache implements MapCache {

    private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);

    private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
    private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;

    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();

    private final String serviceIdentifier;

    private final int maxSize;

    public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
        // need to change to ConcurrentMap as this is modified when only the readLock is held
        inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
        this.serviceIdentifier = serviceIdentifier;
        this.maxSize = maxSize;
    }

    @Override
    public String toString() {
        return "SimpleMapCache[service id=" + serviceIdentifier + "]";
    }

    // don't need synchronized because this method is only called when the writeLock is held, and all
    // public methods obtain either the read or write lock
    private MapCacheRecord evict() {
        if (cache.size() < maxSize) {
            return null;
        }

        final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
        final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
        cache.remove(valueToEvict);

        if (logger.isDebugEnabled()) {
            logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
        }

        return recordToEvict;
    }
    //.....
    //....
    }