이제 Infinispan에는 어떤 흥미로운 Data 저장되어 있다. 우리는 그것을 이용하여 어떤 computation 수행할 것이다. Every country별로 평균 온도를 계산해보자. 한가지 단순한 접근법은 하나의 node에서 전체 dataset 대해 반복하여 computation 수행하는 것이다. 하지만 여기에는 두가지 불리한 면이 있다. 첫번째는 각각의 owner로부터 data fetch 필요하다는 것이다. (빠른 메모리 대신에 느린 network 통해서 데이터를 가져와야 ). 두번째는 전체 cluster 배치된 node 아닌 하나의 node 마력만 이용한다는 것이다. 얼마나 낭비인가!


Infinispan 두가지 문제에 대한 해결책을 제시한다. 분산 처리와 map/reduce 대한 강력한 개념을 사용하여 모든 node에서 locally computation 수행할 있다. 그리고  작업 수행 결과를 master node 보내서 작업 수행 결과를 aggregation한다. Average country temperature 계산하기 위해서 어떻게 map/reduce 사용할 있는지 살펴보자.


첫째로 every entry 호출하고, computation 수행하고, 결과를 돌려주는 method mapper 필요하다. 우리의 경우에는 "computation" entry country temperature 추출하고 정보들을 collector 보낸다.


public class CountryTemperatureMapper implements Mapper<String, LocationWeather, String, Float> {

@Override

public void map(String key, LocationWeather value, Collector<String, Float> collector) {

collector.emit(value.country, value.temperature);

}

}


위의 코드는 entry 대해서 local에서 실행될 것이다. Mapper 선언의 Generic type input key/value output key/value pair 나타낸다. 특히, 우리는 location/weather pair 처리하여 country/temperature pair return할 것이다. Collector 동일한 output key 속한 모든 값을 aggregate하고, reducer 그것을 전달할 것이다.


public class CountryTemperatureReducer implements Reducer<String, Float> {

@Override

public Float reduce(String reducedKey, Iterator<Float> i) {

int count = 0;

float sum = 0f;

while(i.hasNext()) {

++count;

sum += i.next();

}

return sum / count;

}

}


Reducer output key 대해서 수집된 value 호출할 것이다. 우리의 경우에는 모든 값을 더해서 ("temperatures") 평균을 내기 위해 count 나눴다. 끝났다!(that's it!) 한번 실행해 보자.


git checkout -f step-10

mvn clean package exec:exec # from terminal 1

mvn exec:exec # from terminal 2


[Coordinator Output]


---- Average country temperatures ----
Average temperature in Canada is -11.0° C
Average temperature in USA is 5.8° C
Average temperature in Romania is 7.0° C
Average temperature in UK is 3.0° C
Average temperature in Italy is 5.5° C
Average temperature in Portugal is 13.6° C
Average temperature in Switzerland is -0.1° C



이제 우리는 우리가 제품에서 실행하고 싶은 실제 작업 수행에 유용한 기능을 가진 전체적으로 동작하는 application 가지게 되었다. 이제 조금 간단하게 environment에서 configuration 변경하는 방법에 대해서 다음 단계에서 알아보자.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/677

우리의 application에서 clustering 소개할 , key value 대한 serializable 대한 필요성에 초점을 맞췄다. 그러나 Infinispan에서는 표준 Java Serialization 사용하지 않는다. 훨씬 높은 performance library Jboss Marshalling 사용한다. 일반적으로 여러분의 entity java.io.Serializable 구현하기 위해서 아무것도 필요가 없다. Jboss Marshalling 여러분을 위해서 모든 것이 고려되어 있다. 그러나 프로젝트의 lifetime 동안 변경되는 entry 대해서 backward / forward 호환성을 지원하기 위한 예제를 위해 경우에 따라 serialization format 대해서 모든 제어를 수행하기를 원하는 경우가 있을 있다.


이런 경우에는 custom Externalizer 제공해야 한다. Externalizer stream 통해 여러분의 entity 어떻게 읽고 쓰는지를 아는 단순한 class이다. 우리의 LocationWeather class 위해서 간단한 externalizer 만들어 보자.


public static class LWExternalizer implements Externalizer<LocationWeather> {

@Override

public void writeObject(ObjectOutput output, LocationWeather object) throws IOException {

output.writeFloat(object.temperature);

output.writeUTF(object.conditions);

output.writeUTF(object.country);

}

@Override

public LocationWeather readObject(ObjectInput input) throws IOException, ClassNotFoundException {

float temperature = input.readFloat();

String conditions = input.readUTF();

String country = input.readUTF();

return new LocationWeather(temperature, conditions, country);

}

}



또한 externalizer 그것을 사용하도록 선언하기 위해서 우리 entity 특별한 annotation 추가해야 한다.


@SerializeWith(LocationWeather.LWExternalizer.class)

public class LocationWeather implements Serializable {



우리가 특별히 어떤 것을 것은 아니기 때문에 application 실행해보면 다른 차이점을 없다. 그러나, 이제 여러분은 엔진룸 (under the hood)에서 발생한 것에 대한 좋은 아이디어를 가지게 되었다.


git checkout -f step-9

mvn clean package exec:exec # from terminal 1

mvn exec:exec # from terminal 2


이제 우리는 흥미로운 어떤 것과 씨름할 준비가 되었다. 우리는 단지 데이터를 저장하고 조회하는 대신에 data 가지고 어떤 것을 수행하는데 infinispan 사용해볼 것이다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/676

Infinispan에서 cluster내에 entries들을 분산하는 것은 consistent hashing algorithm 근거하여 이루어진다. algorithm entry key 사용하여 hash 계산하고, 어느 node primary owner인지, 나머지 node들이 backup owner 동작할 것인지 결정하는데 hash 사용한다. 분산에 대해 제어하는 것은 가능하다. 특히, performance 때문에 grouping algorithm 사용하여 연관된 entry들을 동일 node 같은 장소에 배치하는 것은 아주 유용하다. 이것 때문에 우리는 key 기반 group name 계산할 있는 Grouper class 생성할 필요가 있다. 예제는 다음과 같다.


public static class LocationGrouper implements Grouper<String> {

@Override

public String computeGroup(String key, String group) {

return key.split(",")[1].trim();

}

@Override public Class<String> getKeyType() {

return String.class;

}

}



위의 코드는 아주 간단하다. 단지 key comma delimiter 사용하여 분리하고, 두번째 항목 ("country") group으로 사용한다. Hashing algorithm key 대신에 group 사용하여 entry hash 계산할 것이다. 특정 grouper 사용하기 위해서는 configuration 추가해야 한다.


config.clustering().hash().groups().enabled().addGrouper(new LocationWeather.LocationGrouper());



이제 application 실행해 보자.


git checkout -f step-8

mvn clean package exec:exec # from terminal 1

mvn exec:exec # from terminal 2


이번 실행에서 node별로 output 조사해보면 동일한 country 속한 모든 entry 대한 event log "paired"되어 있음을 있을 것이다. 경우에는 "Romania" group 두번째 Node "hashed"되었다.


[Coordinator Output]


-- Entry for Bucharest, Romania modified by another node in the cluster
-- Entry for Cluj-Napoca, Romania modified by another node in the cluster



다음 단계에서는 node간에 전송된 entry들을 serialized form으로 어떻게 제어하는지를 살펴볼 것이다.

받은 트랙백이 없고, 댓글이 없습니다.

댓글+트랙백 RSS :: http://www.yongbi.net/rss/response/675