Project

General

Profile

1
package eu.dnetlib.iis.mainworkflows.importer.mapred;
2

    
3
import java.io.DataInput;
4
import java.io.DataOutput;
5
import java.io.File;
6
import java.io.IOException;
7
import java.io.InputStream;
8
import java.util.ArrayList;
9
import java.util.Arrays;
10
import java.util.Iterator;
11
import java.util.List;
12
import java.util.Map;
13
import java.util.Properties;
14

    
15
import org.apache.commons.io.IOUtils;
16
import org.apache.commons.lang.StringUtils;
17
import org.apache.hadoop.hbase.KeyValue;
18
import org.apache.hadoop.hbase.client.Result;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.io.Writable;
21
import org.apache.hadoop.mapreduce.InputFormat;
22
import org.apache.hadoop.mapreduce.InputSplit;
23
import org.apache.hadoop.mapreduce.JobContext;
24
import org.apache.hadoop.mapreduce.RecordReader;
25
import org.apache.hadoop.mapreduce.TaskAttemptContext;
26

    
27
import com.googlecode.protobuf.format.JsonFormat;
28

    
29
import eu.dnetlib.data.proto.OafProtos.Oaf;
30
import eu.dnetlib.iis.core.javamapreduce.hack.SchemaSetter;
31

    
32

    
33
/**
34
 * Input format returning Results built from JSON objects.
35
 * @author mhorst
36
 *
37
 */
38
public class PredefinedTableInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
39

    
40
	public static final String IDX_PROPS_LOC = "import.input.format.idx.props.location";
41
	
42
	public static final String DEFAULT_CHARSET = "utf8";
43
	
44
	public static final String PART_SEPARATOR = "#";
45
	
46
	public static class FakeSplit extends InputSplit implements Writable {
47
	    public void write(DataOutput out) throws IOException { }
48
	    public void readFields(DataInput in) throws IOException { }
49
	    public long getLength() { return 0L; }
50
	    public String[] getLocations() { return new String[0]; }
51
	  }
52
	
53
	@Override
54
	public List<InputSplit> getSplits(JobContext context) throws IOException,
55
			InterruptedException {
56
		List<InputSplit> ret = new ArrayList<InputSplit>();
57
		ret.add(new FakeSplit());
58
		return ret;
59
	}
60

    
61
	@Override
62
	public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
63
			InputSplit split, TaskAttemptContext context) throws IOException,
64
			InterruptedException {
65
//		required by our hack related to multiple outputs
66
		SchemaSetter.set(context.getConfiguration());
67
		
68
		return new RecordReader<ImmutableBytesWritable, Result>() {
69
			
70
			private Iterator<Tuple> currentIt;
71
			private Tuple currentTuple;
72
			private float progress;
73
			
74
			class Tuple {
75
				final ImmutableBytesWritable key;
76
				final Result value;
77

    
78
				public Tuple (ImmutableBytesWritable key,
79
				Result value) {
80
					this.key = key;
81
					this.value = value;
82
				}
83
			}
84
			
85
			@Override
86
			public void initialize(InputSplit split, TaskAttemptContext context)
87
					throws IOException, InterruptedException {
88
				progress = 0;
89
				String idxLoc = context.getConfiguration().get(IDX_PROPS_LOC);
90
				if (idxLoc!=null) {
91
					List<Tuple> tuples = new ArrayList<Tuple>();
92
					Properties props = new Properties();
93
					InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(
94
			        		idxLoc);
95
					if (in==null) {
96
						throw new IOException("unable to load resource from path: " + idxLoc);
97
					}
98
					try {
99
						props.load(in);
100
				    	for (Map.Entry<Object, Object> entry : props.entrySet()) {
101
				    		System.err.println("key: " + entry.getKey() + ", value: " + entry.getValue());
102
				    		String id = loadId(entry.getKey() + File.separator + "id");
103
				    		System.err.println("id: " + id);
104
				    		byte[] idBytes = id.getBytes(DEFAULT_CHARSET);
105
				    		String[] parts = StringUtils.split(entry.getValue().toString(), ',');
106
				    		System.err.println("parts: " + Arrays.asList(parts));
107
				    		List<KeyValue> protoParts = new ArrayList<KeyValue>();
108
				    		for (String part : parts) {
109
				    			String[] familyWithQualifier = StringUtils.split(part, PART_SEPARATOR, 2);
110
				    			if (familyWithQualifier==null || familyWithQualifier.length!=2) {
111
				    				throw new RuntimeException("invalid part name value: '" + part + 
112
				    						"' not compliant with 'family" + 
113
				    						PART_SEPARATOR + "qualifier' template!");
114
				    			}
115
				    			Oaf.Builder oafBuilder = Oaf.newBuilder();
116
				    			String resourcePath = entry.getKey() + File.separator + part;
117
				    			String oafText = IOUtils.toString(
118
					    				Thread.currentThread().getContextClassLoader().getResourceAsStream(resourcePath));
119
					    		try {
120
					    			JsonFormat.merge(oafText, oafBuilder);	
121
					    		} catch (Exception e) {
122
					    			throw new RuntimeException("got exception when parsing text resource: " +
123
					    					resourcePath + ", text content: " + oafText, e);
124
					    		}
125
				    			
126
					    		Oaf oaf = oafBuilder.build();
127
					    		KeyValue keyValue = new KeyValue(
128
					    				idBytes, 
129
					    				familyWithQualifier[0].getBytes(DEFAULT_CHARSET),
130
					    				familyWithQualifier[1].getBytes(DEFAULT_CHARSET),
131
					    				oaf.toByteArray());
132
					    		protoParts.add(keyValue);
133
					    		System.err.println(JsonFormat.printToString(oaf));	
134
				    		}
135
				    		tuples.add(new Tuple(
136
				    				new ImmutableBytesWritable(idBytes), 
137
				    				new Result(protoParts)));
138
				    	}
139
				    	currentIt = tuples.iterator();
140
					} finally {
141
						in.close();
142
					}
143
				} else {
144
					throw new IOException("no " + IDX_PROPS_LOC + " property set!");
145
				}
146

    
147
			}
148

    
149
		    private String loadId(String loc) throws IOException {
150
		    	return IOUtils.readLines(
151
		    			Thread.currentThread().getContextClassLoader().getResourceAsStream(loc)).iterator().next().trim();
152
		    }
153
			
154
			@Override
155
			public boolean nextKeyValue() throws IOException,
156
					InterruptedException {
157
				if (currentIt.hasNext()) {
158
					currentTuple = currentIt.next();
159
					progress++;
160
					return true;
161
				} else {
162
					return false;
163
				}
164
			}
165

    
166
			@Override
167
			public ImmutableBytesWritable getCurrentKey() throws IOException,
168
					InterruptedException {
169
				return currentTuple!=null?currentTuple.key:null;
170
			}
171

    
172
			@Override
173
			public Result getCurrentValue() throws IOException,
174
					InterruptedException {
175
				return currentTuple!=null?currentTuple.value:null;
176
			}
177

    
178
			@Override
179
			public float getProgress() throws IOException, InterruptedException {
180
				return progress;
181
			}
182

    
183
			@Override
184
			public void close() throws IOException {
185
				currentIt = null;
186
				currentTuple = null;
187
			}
188
		};
189
	}
190

    
191
}
    (1-1/1)