[NiFi] DistributedMapCache 프로세서
by Jaesang Lim
NiFi 관련 사내 세미나도 마무리되었고, 이제 NiFi로 유연하게 문제를 해결해야할 때인 것 같다
그래서 이번에는 NiFi에서 처리한 데이터를 Enrich하는 방법으로 DistributedMapCache Processor를 사용해서 해결하고자한다
매번 DB가서 해당 메타정보를 가져온다던지, 아니면 배치로 테이블로 Join해서 enrich하는 방법도 있겠지만, Data Flow을 한 곳에서 처리하고 싶어서 !
DistributedMapCache Processor
DistributedMapCache를 사용하기 위해 필요한 Processor 및 Controller Services
Processor
- PutDistributedMapCache
- FetchDistributedMapCache
Controller Service
- DistributedMapCacheClientService
- DistributedMapCacheServer
PutDistributedMapCache Processor
- 캐시할 key/value을 넣는 Processor
- key는 flowfile의 Attribute값
- Processor의 ‘Cache Entry Identifier’에 명시한 Attribute
-
value는 flowfile의 Content값
- github
FetchDistributedMapCache Processor
- key,flowfile의 Attribute값으로 값을 가져와서 새로운 Attribute로 붙임
- key는 Processor의 ‘Cache Entry Identifier’에 명시한 Attribute
- cache된 정보를 가져와 추가할 Attribute 이름 ‘Put Cache Value In Attribute’
- github
DistributedMapCacheClientService (Controller Service)
- DistributedMapCacheServer와 통신하는 Client
- Cluster끼리 Cache된 Map 공유하기 위함
- 설정값은 DistributedMapCacheServer의 hostname과 port만 설정함
- Cluster 환경에서는 hostname을 localhost로 설정
- github
DistributedMapCacheServer (Controller Service)
- Socket으로 캐시된 맵에 접근할 수 있게함
- DistributedMapCacheClientService과 상호작용
- 실행하고 꼭 확인해보자
- netstat -plnt 로 해당 port가 열렸는지.. (default : 4557)
- Persistence Directory
- 해당 경로를 지정하면 캐시된 정보를 디스크에 저장함
- 설정하지 않으면, 메모리에만 저장되고 관리
- Maximum Cache Entries 설정 가능
- Cache에 최대 몇개의 데이터를 가지고 있을 것인가
- 소스코드를 보니, 여기서 설정한 값으로 Map 만들더라..
당연히 Eviction 전략도 있음
- Eviction Strategy
- FIFO
- Least Recently Used (LRU)
- Least Frequently Used (LFU)
- github
모 여기까지는 사용법이고, 내가 재밌게 봤던 것은 Cache하는 자료구조는 무엇이고, Eviction 전략을 어떻게 구현했는가 궁금했음
캐시에 사용되는 Map은 HashMap을 사용하고, Eviction을 처리하기 위해서는 ConcurrentSkipListMap 사용함
- ConcurrentSkipListMap에 Eviction 전략에 따라서 각 Comparator를 구현해놓았음!
간단하게 Map 만들고, Evict하는 함수만 보면 다음과 같다
- 생각보다 소스코드가 간결해서 좋았음!
public class SimpleMapCache implements MapCache {
private static final Logger logger = LoggerFactory.getLogger(SimpleMapCache.class);
private final Map<ByteBuffer, MapCacheRecord> cache = new HashMap<>();
private final SortedMap<MapCacheRecord, ByteBuffer> inverseCacheMap;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final String serviceIdentifier;
private final int maxSize;
public SimpleMapCache(final String serviceIdentifier, final int maxSize, final EvictionPolicy evictionPolicy) {
// need to change to ConcurrentMap as this is modified when only the readLock is held
inverseCacheMap = new ConcurrentSkipListMap<>(evictionPolicy.getComparator());
this.serviceIdentifier = serviceIdentifier;
this.maxSize = maxSize;
}
@Override
public String toString() {
return "SimpleMapCache[service id=" + serviceIdentifier + "]";
}
// don't need synchronized because this method is only called when the writeLock is held, and all
// public methods obtain either the read or write lock
private MapCacheRecord evict() {
if (cache.size() < maxSize) {
return null;
}
final MapCacheRecord recordToEvict = inverseCacheMap.firstKey();
final ByteBuffer valueToEvict = inverseCacheMap.remove(recordToEvict);
cache.remove(valueToEvict);
if (logger.isDebugEnabled()) {
logger.debug("Evicting value {} from cache", new String(valueToEvict.array(), StandardCharsets.UTF_8));
}
return recordToEvict;
}
//.....
//....
}
Subscribe via RSS