Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.utils;
2

    
3
import com.google.common.base.Functions;
4
import com.google.common.collect.ImmutableSortedMap;
5
import com.google.common.collect.Ordering;
6
import com.lambdaworks.redis.RedisClient;
7
import com.lambdaworks.redis.RedisConnection;
8
import com.lambdaworks.redis.RedisStringsConnection;
9
import org.apache.hadoop.conf.Configuration;
10
import org.apache.hadoop.fs.FileSystem;
11
import org.apache.hadoop.fs.LocatedFileStatus;
12
import org.apache.hadoop.fs.Path;
13
import org.apache.hadoop.fs.RemoteIterator;
14
import org.apache.hadoop.io.SequenceFile;
15
import org.eclipse.jdt.internal.core.SourceType;
16
import org.junit.Test;
17
import sun.reflect.generics.tree.Tree;
18

    
19
import java.io.*;
20
import java.math.BigInteger;
21
import java.util.*;
22
import java.io.BufferedReader;
23
import java.io.BufferedWriter;
24
import java.io.IOException;
25
import java.io.InputStreamReader;
26
import java.io.OutputStreamWriter;
27
import java.util.ArrayList;
28
import java.util.HashMap;
29
import java.util.List;
30
import java.util.Map;
31

    
32

    
33
public class FrequencyCounter {
34

    
35
    private enum ORDERING_TYPE {
36
        ASC, DESC
37
    }
38

    
39
    public static void main(String[] args) throws Exception {
40
        String inputPath = "/tmp/lod_blocks/stats";
41
        String outputPath = "/tmp/lod_blocks/stats/frequencyMap";
42
        String redisHost = "194.177.192.118";
43

    
44

    
45
        try {
46

    
47
            FileSystem fs = FileSystem.get(new Configuration());
48
            Path path = new Path(inputPath);
49
            List filenames = getFiles(fs, path);
50
            TreeMap<Integer, Integer> frequencyMap = getFrequencyMap(fs, filenames);
51
            System.out.println("Sorted Map is " + frequencyMap.entrySet());
52
            TreeMap<BigInteger, Double> statistics = getStatistics(frequencyMap);
53
            int optimalBlockSize = getOptimalBlockSize(statistics);
54

    
55
            //WRITE optimalBlockSize to REDIS? somewhere .. Goes here!!!
56

    
57
            System.out.println("Statistics Map" + statistics.entrySet());
58

    
59
            System.out.println("Optimal Size " + optimalBlockSize);
60

    
61
            writeToRedis("optimalBlockSize", String.valueOf(optimalBlockSize));
62
            fs.close();
63
        } catch (Exception e) {
64

    
65
            throw new Exception("Error : ", e);
66
        }
67
    }
68

    
69
    private static List getFiles(FileSystem fs, Path path) throws IOException {
70
        RemoteIterator<LocatedFileStatus> Files;
71
        List<String> fileNames = new ArrayList<String>();
72

    
73
        Files = fs.listFiles(path, false);
74

    
75
        while (Files.hasNext()) {
76
            String fileName = Files.next().getPath().toString();
77
            fileNames.add(fileName);
78

    
79
        }
80
        System.out.println("Filenames" + fileNames);
81
        return fileNames;
82
    }
83

    
84
    private static TreeMap<Integer, Integer> getFrequencyMap(FileSystem fs, List<String> fileNames) throws Exception {
85
        String line = null;
86

    
87
        try {
88

    
89
            TreeMap<Integer, Integer> frequencyMap = new TreeMap<>();
90
            for (String file : fileNames) {
91
                System.out.println("opening " + new Path(file));
92

    
93
                BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(file))));
94
                line = br.readLine();
95

    
96
                while (line != null) {
97

    
98
                    String[] split = line.split("\t");
99

    
100
                    if (frequencyMap.containsKey(Integer.parseInt(split[1]))) {
101
                        frequencyMap.put(Integer.parseInt(split[1]), frequencyMap.get(Integer.parseInt(split[1])) + 1);
102

    
103
                    } else {
104
                        frequencyMap.put(Integer.parseInt(split[1]), 1);
105

    
106
                    }
107
                    line = br.readLine();
108
                }
109
            }
110

    
111

    
112
            return frequencyMap;
113
        } catch (Exception e) {
114
            throw new Exception("Error : " + line, e);
115
        }
116

    
117
    }
118

    
119
    private static TreeMap<BigInteger, Double> getStatistics(TreeMap<Integer, Integer> sortedMap) {
120
        TreeMap<BigInteger, Double> statistics = new TreeMap<BigInteger, Double>();
121
        double CC = 0d;
122
        CC = 0d;
123
        int f;
124
        BigInteger totalSizeOfBlocks = BigInteger.ZERO;
125
        BigInteger numberOfComparisons = BigInteger.ZERO;
126

    
127
        Set<Integer> keys = sortedMap.keySet();
128
        for (Integer key : keys) {
129
            BigInteger blockSize = BigInteger.valueOf(key.intValue());
130
            f = sortedMap.get(key);
131
            totalSizeOfBlocks = totalSizeOfBlocks.add(BigInteger.valueOf(f).multiply(blockSize));
132
            numberOfComparisons = numberOfComparisons.add(BigInteger.valueOf(f).multiply(blockSize.multiply(blockSize.subtract(BigInteger.ONE)).shiftLeft(1)));
133
            CC = totalSizeOfBlocks.doubleValue() / numberOfComparisons.doubleValue();
134
            statistics.put(blockSize, CC);
135
        }
136

    
137

    
138
        return statistics;
139
    }
140

    
141

    
142
    private static int getOptimalBlockSize(TreeMap<BigInteger, Double> statistics) {
143
        int optimalBlockSize = Integer.valueOf(statistics.lastEntry().getKey().intValue());
144
        double eps = 1d;
145
        NavigableSet<BigInteger> keys = statistics.descendingKeySet();
146
        for (BigInteger key : keys) {
147
        	double diff = Math.abs(statistics.get(key) - statistics.get(key.subtract(BigInteger.ONE)));
148
            if (diff < eps) {
149
                eps = diff;
150
                optimalBlockSize = Integer.valueOf(key.intValue());
151
            }
152
        }
153

    
154
        return optimalBlockSize;
155
    }
156

    
157

    
158
    public static void writeToRedis(String key, String value) {
159
        RedisClient client = RedisClient.create("redis://194.177.192.118");
160
        RedisStringsConnection<String, String> connection = client.connect();
161
        connection.set(key, value);
162
        System.out.println("Set to redis " + " " + key + " to " + connection.get(key));
163

    
164

    
165
    }
166

    
167
    public static SortedMap<Integer, Integer> sortMap(Map map, ORDERING_TYPE ordering) {
168
        Ordering<Integer> valueComparator;
169
        if (ordering == ORDERING_TYPE.DESC) {
170
            valueComparator = Ordering.natural().onResultOf(Functions.forMap(map)).compound(Ordering.natural()).reverse();
171
        } else {
172
            valueComparator = Ordering.natural().onResultOf(Functions.forMap(map)).compound(Ordering.natural());
173
        }
174
        return ImmutableSortedMap.copyOf(map, valueComparator);
175

    
176

    
177
    }
178

    
179
    private static void writeMap(FileSystem fs, TreeMap<Integer, Integer> map, String output) throws IOException {
180
        Path outputPath = new Path(output);
181
        BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(outputPath, true)));
182
        // TO append data to a file, use fs.append(Path f)
183

    
184
        for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
185
            br.append(entry.getKey() + ":" + entry.getValue() + ",");
186
        }
187

    
188
        br.close();
189
    }
190

    
191
/*   @Test
192
    public void test() {
193

    
194
        TreeMap map = new TreeMap();
195

    
196
        map.put(6, 23);
197
        map.put(3, 9);
198
        map.put(1, 2);
199
        System.out.println(map.entrySet());
200

    
201
       System.out.println(map.descendingMap());
202

    
203

    
204
    }*/
205
}
206

    
207

    
208

    
(2-2/3)