1
|
package eu.dnetlib.data.mapreduce.hbase.lodExport.utils;
|
2
|
|
3
|
import com.google.common.base.Functions;
|
4
|
import com.google.common.collect.ImmutableSortedMap;
|
5
|
import com.google.common.collect.Ordering;
|
6
|
import com.lambdaworks.redis.RedisClient;
|
7
|
import com.lambdaworks.redis.RedisConnection;
|
8
|
import com.lambdaworks.redis.RedisStringsConnection;
|
9
|
import org.apache.hadoop.conf.Configuration;
|
10
|
import org.apache.hadoop.fs.FileSystem;
|
11
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
12
|
import org.apache.hadoop.fs.Path;
|
13
|
import org.apache.hadoop.fs.RemoteIterator;
|
14
|
import org.apache.hadoop.io.SequenceFile;
|
15
|
import org.eclipse.jdt.internal.core.SourceType;
|
16
|
import org.junit.Test;
|
17
|
import sun.reflect.generics.tree.Tree;
|
18
|
|
19
|
import java.io.*;
|
20
|
import java.math.BigInteger;
|
21
|
import java.util.*;
|
22
|
import java.io.BufferedReader;
|
23
|
import java.io.BufferedWriter;
|
24
|
import java.io.IOException;
|
25
|
import java.io.InputStreamReader;
|
26
|
import java.io.OutputStreamWriter;
|
27
|
import java.util.ArrayList;
|
28
|
import java.util.HashMap;
|
29
|
import java.util.List;
|
30
|
import java.util.Map;
|
31
|
|
32
|
|
33
|
public class FrequencyCounter {
|
34
|
|
35
|
private enum ORDERING_TYPE {
|
36
|
ASC, DESC
|
37
|
}
|
38
|
|
39
|
public static void main(String[] args) throws Exception {
|
40
|
String inputPath = "/tmp/lod_blocks/stats";
|
41
|
String outputPath = "/tmp/lod_blocks/stats/frequencyMap";
|
42
|
String redisHost = "194.177.192.118";
|
43
|
|
44
|
|
45
|
try {
|
46
|
|
47
|
FileSystem fs = FileSystem.get(new Configuration());
|
48
|
Path path = new Path(inputPath);
|
49
|
List filenames = getFiles(fs, path);
|
50
|
TreeMap<Integer, Integer> frequencyMap = getFrequencyMap(fs, filenames);
|
51
|
System.out.println("Sorted Map is " + frequencyMap.entrySet());
|
52
|
TreeMap<BigInteger, Double> statistics = getStatistics(frequencyMap);
|
53
|
int optimalBlockSize = getOptimalBlockSize(statistics);
|
54
|
|
55
|
//WRITE optimalBlockSize to REDIS? somewhere .. Goes here!!!
|
56
|
|
57
|
System.out.println("Statistics Map" + statistics.entrySet());
|
58
|
|
59
|
System.out.println("Optimal Size " + optimalBlockSize);
|
60
|
|
61
|
writeToRedis("optimalBlockSize", String.valueOf(optimalBlockSize));
|
62
|
fs.close();
|
63
|
} catch (Exception e) {
|
64
|
|
65
|
throw new Exception("Error : ", e);
|
66
|
}
|
67
|
}
|
68
|
|
69
|
private static List getFiles(FileSystem fs, Path path) throws IOException {
|
70
|
RemoteIterator<LocatedFileStatus> Files;
|
71
|
List<String> fileNames = new ArrayList<String>();
|
72
|
|
73
|
Files = fs.listFiles(path, false);
|
74
|
|
75
|
while (Files.hasNext()) {
|
76
|
String fileName = Files.next().getPath().toString();
|
77
|
fileNames.add(fileName);
|
78
|
|
79
|
}
|
80
|
System.out.println("Filenames" + fileNames);
|
81
|
return fileNames;
|
82
|
}
|
83
|
|
84
|
private static TreeMap<Integer, Integer> getFrequencyMap(FileSystem fs, List<String> fileNames) throws Exception {
|
85
|
String line = null;
|
86
|
|
87
|
try {
|
88
|
|
89
|
TreeMap<Integer, Integer> frequencyMap = new TreeMap<>();
|
90
|
for (String file : fileNames) {
|
91
|
System.out.println("opening " + new Path(file));
|
92
|
|
93
|
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(file))));
|
94
|
line = br.readLine();
|
95
|
|
96
|
while (line != null) {
|
97
|
|
98
|
String[] split = line.split("\t");
|
99
|
|
100
|
if (frequencyMap.containsKey(Integer.parseInt(split[1]))) {
|
101
|
frequencyMap.put(Integer.parseInt(split[1]), frequencyMap.get(Integer.parseInt(split[1])) + 1);
|
102
|
|
103
|
} else {
|
104
|
frequencyMap.put(Integer.parseInt(split[1]), 1);
|
105
|
|
106
|
}
|
107
|
line = br.readLine();
|
108
|
}
|
109
|
}
|
110
|
|
111
|
|
112
|
return frequencyMap;
|
113
|
} catch (Exception e) {
|
114
|
throw new Exception("Error : " + line, e);
|
115
|
}
|
116
|
|
117
|
}
|
118
|
|
119
|
private static TreeMap<BigInteger, Double> getStatistics(TreeMap<Integer, Integer> sortedMap) {
|
120
|
TreeMap<BigInteger, Double> statistics = new TreeMap<BigInteger, Double>();
|
121
|
double CC = 0d;
|
122
|
CC = 0d;
|
123
|
int f;
|
124
|
BigInteger totalSizeOfBlocks = BigInteger.ZERO;
|
125
|
BigInteger numberOfComparisons = BigInteger.ZERO;
|
126
|
|
127
|
Set<Integer> keys = sortedMap.keySet();
|
128
|
for (Integer key : keys) {
|
129
|
BigInteger blockSize = BigInteger.valueOf(key.intValue());
|
130
|
f = sortedMap.get(key);
|
131
|
totalSizeOfBlocks = totalSizeOfBlocks.add(BigInteger.valueOf(f).multiply(blockSize));
|
132
|
numberOfComparisons = numberOfComparisons.add(BigInteger.valueOf(f).multiply(blockSize.multiply(blockSize.subtract(BigInteger.ONE)).shiftLeft(1)));
|
133
|
CC = totalSizeOfBlocks.doubleValue() / numberOfComparisons.doubleValue();
|
134
|
statistics.put(blockSize, CC);
|
135
|
}
|
136
|
|
137
|
|
138
|
return statistics;
|
139
|
}
|
140
|
|
141
|
|
142
|
private static int getOptimalBlockSize(TreeMap<BigInteger, Double> statistics) {
|
143
|
int optimalBlockSize = Integer.valueOf(statistics.lastEntry().getKey().intValue());
|
144
|
double eps = 1d;
|
145
|
NavigableSet<BigInteger> keys = statistics.descendingKeySet();
|
146
|
for (BigInteger key : keys) {
|
147
|
double diff = Math.abs(statistics.get(key) - statistics.get(key.subtract(BigInteger.ONE)));
|
148
|
if (diff < eps) {
|
149
|
eps = diff;
|
150
|
optimalBlockSize = Integer.valueOf(key.intValue());
|
151
|
}
|
152
|
}
|
153
|
|
154
|
return optimalBlockSize;
|
155
|
}
|
156
|
|
157
|
|
158
|
public static void writeToRedis(String key, String value) {
|
159
|
RedisClient client = RedisClient.create("redis://194.177.192.118");
|
160
|
RedisStringsConnection<String, String> connection = client.connect();
|
161
|
connection.set(key, value);
|
162
|
System.out.println("Set to redis " + " " + key + " to " + connection.get(key));
|
163
|
|
164
|
|
165
|
}
|
166
|
|
167
|
public static SortedMap<Integer, Integer> sortMap(Map map, ORDERING_TYPE ordering) {
|
168
|
Ordering<Integer> valueComparator;
|
169
|
if (ordering == ORDERING_TYPE.DESC) {
|
170
|
valueComparator = Ordering.natural().onResultOf(Functions.forMap(map)).compound(Ordering.natural()).reverse();
|
171
|
} else {
|
172
|
valueComparator = Ordering.natural().onResultOf(Functions.forMap(map)).compound(Ordering.natural());
|
173
|
}
|
174
|
return ImmutableSortedMap.copyOf(map, valueComparator);
|
175
|
|
176
|
|
177
|
}
|
178
|
|
179
|
private static void writeMap(FileSystem fs, TreeMap<Integer, Integer> map, String output) throws IOException {
|
180
|
Path outputPath = new Path(output);
|
181
|
BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(outputPath, true)));
|
182
|
// TO append data to a file, use fs.append(Path f)
|
183
|
|
184
|
for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
|
185
|
br.append(entry.getKey() + ":" + entry.getValue() + ",");
|
186
|
}
|
187
|
|
188
|
br.close();
|
189
|
}
|
190
|
|
191
|
/* @Test
|
192
|
public void test() {
|
193
|
|
194
|
TreeMap map = new TreeMap();
|
195
|
|
196
|
map.put(6, 23);
|
197
|
map.put(3, 9);
|
198
|
map.put(1, 2);
|
199
|
System.out.println(map.entrySet());
|
200
|
|
201
|
System.out.println(map.descendingMap());
|
202
|
|
203
|
|
204
|
}*/
|
205
|
}
|
206
|
|
207
|
|
208
|
|