1
|
package eu.dnetlib.iis.mainworkflows.importer.mapred;
|
2
|
|
3
|
import java.io.DataInput;
|
4
|
import java.io.DataOutput;
|
5
|
import java.io.File;
|
6
|
import java.io.IOException;
|
7
|
import java.io.InputStream;
|
8
|
import java.util.ArrayList;
|
9
|
import java.util.Arrays;
|
10
|
import java.util.Iterator;
|
11
|
import java.util.List;
|
12
|
import java.util.Map;
|
13
|
import java.util.Properties;
|
14
|
|
15
|
import org.apache.commons.io.IOUtils;
|
16
|
import org.apache.commons.lang.StringUtils;
|
17
|
import org.apache.hadoop.hbase.KeyValue;
|
18
|
import org.apache.hadoop.hbase.client.Result;
|
19
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
20
|
import org.apache.hadoop.io.Writable;
|
21
|
import org.apache.hadoop.mapreduce.InputFormat;
|
22
|
import org.apache.hadoop.mapreduce.InputSplit;
|
23
|
import org.apache.hadoop.mapreduce.JobContext;
|
24
|
import org.apache.hadoop.mapreduce.RecordReader;
|
25
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
26
|
|
27
|
import com.googlecode.protobuf.format.JsonFormat;
|
28
|
|
29
|
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
30
|
import eu.dnetlib.iis.core.javamapreduce.hack.SchemaSetter;
|
31
|
|
32
|
|
33
|
/**
|
34
|
* Input format returning Results built from JSON objects.
|
35
|
* @author mhorst
|
36
|
*
|
37
|
*/
|
38
|
public class PredefinedTableInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
|
39
|
|
40
|
public static final String IDX_PROPS_LOC = "import.input.format.idx.props.location";
|
41
|
|
42
|
public static final String DEFAULT_CHARSET = "utf8";
|
43
|
|
44
|
public static final String PART_SEPARATOR = "#";
|
45
|
|
46
|
public static class FakeSplit extends InputSplit implements Writable {
|
47
|
public void write(DataOutput out) throws IOException { }
|
48
|
public void readFields(DataInput in) throws IOException { }
|
49
|
public long getLength() { return 0L; }
|
50
|
public String[] getLocations() { return new String[0]; }
|
51
|
}
|
52
|
|
53
|
@Override
|
54
|
public List<InputSplit> getSplits(JobContext context) throws IOException,
|
55
|
InterruptedException {
|
56
|
List<InputSplit> ret = new ArrayList<InputSplit>();
|
57
|
ret.add(new FakeSplit());
|
58
|
return ret;
|
59
|
}
|
60
|
|
61
|
@Override
|
62
|
public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
|
63
|
InputSplit split, TaskAttemptContext context) throws IOException,
|
64
|
InterruptedException {
|
65
|
// required by our hack related to multiple outputs
|
66
|
SchemaSetter.set(context.getConfiguration());
|
67
|
|
68
|
return new RecordReader<ImmutableBytesWritable, Result>() {
|
69
|
|
70
|
private Iterator<Tuple> currentIt;
|
71
|
private Tuple currentTuple;
|
72
|
private float progress;
|
73
|
|
74
|
class Tuple {
|
75
|
final ImmutableBytesWritable key;
|
76
|
final Result value;
|
77
|
|
78
|
public Tuple (ImmutableBytesWritable key,
|
79
|
Result value) {
|
80
|
this.key = key;
|
81
|
this.value = value;
|
82
|
}
|
83
|
}
|
84
|
|
85
|
@Override
|
86
|
public void initialize(InputSplit split, TaskAttemptContext context)
|
87
|
throws IOException, InterruptedException {
|
88
|
progress = 0;
|
89
|
String idxLoc = context.getConfiguration().get(IDX_PROPS_LOC);
|
90
|
if (idxLoc!=null) {
|
91
|
List<Tuple> tuples = new ArrayList<Tuple>();
|
92
|
Properties props = new Properties();
|
93
|
InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(
|
94
|
idxLoc);
|
95
|
if (in==null) {
|
96
|
throw new IOException("unable to load resource from path: " + idxLoc);
|
97
|
}
|
98
|
try {
|
99
|
props.load(in);
|
100
|
for (Map.Entry<Object, Object> entry : props.entrySet()) {
|
101
|
System.err.println("key: " + entry.getKey() + ", value: " + entry.getValue());
|
102
|
String id = loadId(entry.getKey() + File.separator + "id");
|
103
|
System.err.println("id: " + id);
|
104
|
byte[] idBytes = id.getBytes(DEFAULT_CHARSET);
|
105
|
String[] parts = StringUtils.split(entry.getValue().toString(), ',');
|
106
|
System.err.println("parts: " + Arrays.asList(parts));
|
107
|
List<KeyValue> protoParts = new ArrayList<KeyValue>();
|
108
|
for (String part : parts) {
|
109
|
String[] familyWithQualifier = StringUtils.split(part, PART_SEPARATOR, 2);
|
110
|
if (familyWithQualifier==null || familyWithQualifier.length!=2) {
|
111
|
throw new RuntimeException("invalid part name value: '" + part +
|
112
|
"' not compliant with 'family" +
|
113
|
PART_SEPARATOR + "qualifier' template!");
|
114
|
}
|
115
|
Oaf.Builder oafBuilder = Oaf.newBuilder();
|
116
|
String resourcePath = entry.getKey() + File.separator + part;
|
117
|
String oafText = IOUtils.toString(
|
118
|
Thread.currentThread().getContextClassLoader().getResourceAsStream(resourcePath));
|
119
|
try {
|
120
|
JsonFormat.merge(oafText, oafBuilder);
|
121
|
} catch (Exception e) {
|
122
|
throw new RuntimeException("got exception when parsing text resource: " +
|
123
|
resourcePath + ", text content: " + oafText, e);
|
124
|
}
|
125
|
|
126
|
Oaf oaf = oafBuilder.build();
|
127
|
KeyValue keyValue = new KeyValue(
|
128
|
idBytes,
|
129
|
familyWithQualifier[0].getBytes(DEFAULT_CHARSET),
|
130
|
familyWithQualifier[1].getBytes(DEFAULT_CHARSET),
|
131
|
oaf.toByteArray());
|
132
|
protoParts.add(keyValue);
|
133
|
System.err.println(JsonFormat.printToString(oaf));
|
134
|
}
|
135
|
tuples.add(new Tuple(
|
136
|
new ImmutableBytesWritable(idBytes),
|
137
|
new Result(protoParts)));
|
138
|
}
|
139
|
currentIt = tuples.iterator();
|
140
|
} finally {
|
141
|
in.close();
|
142
|
}
|
143
|
} else {
|
144
|
throw new IOException("no " + IDX_PROPS_LOC + " property set!");
|
145
|
}
|
146
|
|
147
|
}
|
148
|
|
149
|
private String loadId(String loc) throws IOException {
|
150
|
return IOUtils.readLines(
|
151
|
Thread.currentThread().getContextClassLoader().getResourceAsStream(loc)).iterator().next().trim();
|
152
|
}
|
153
|
|
154
|
@Override
|
155
|
public boolean nextKeyValue() throws IOException,
|
156
|
InterruptedException {
|
157
|
if (currentIt.hasNext()) {
|
158
|
currentTuple = currentIt.next();
|
159
|
progress++;
|
160
|
return true;
|
161
|
} else {
|
162
|
return false;
|
163
|
}
|
164
|
}
|
165
|
|
166
|
@Override
|
167
|
public ImmutableBytesWritable getCurrentKey() throws IOException,
|
168
|
InterruptedException {
|
169
|
return currentTuple!=null?currentTuple.key:null;
|
170
|
}
|
171
|
|
172
|
@Override
|
173
|
public Result getCurrentValue() throws IOException,
|
174
|
InterruptedException {
|
175
|
return currentTuple!=null?currentTuple.value:null;
|
176
|
}
|
177
|
|
178
|
@Override
|
179
|
public float getProgress() throws IOException, InterruptedException {
|
180
|
return progress;
|
181
|
}
|
182
|
|
183
|
@Override
|
184
|
public void close() throws IOException {
|
185
|
currentIt = null;
|
186
|
currentTuple = null;
|
187
|
}
|
188
|
};
|
189
|
}
|
190
|
|
191
|
}
|