Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.mapreduce;
2

    
3
import com.google.common.collect.ArrayListMultimap;
4
import com.google.common.collect.Multimap;
5
import com.google.protobuf.InvalidProtocolBufferException;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
7
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
8
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
9
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.ContextTransformer;
10
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
13
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
14
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
15
import eu.dnetlib.data.proto.KindProtos;
16
import eu.dnetlib.data.proto.OafProtos.Oaf;
17
import eu.dnetlib.data.proto.OafProtos.OafRel;
18
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
19
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
20
import eu.dnetlib.data.proto.TypeProtos.Type;
21
import org.apache.hadoop.hbase.client.Result;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableMapper;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26
import org.apache.log4j.Logger;
27

    
28
import java.io.ByteArrayInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.util.ArrayList;
32
import java.util.List;
33
import java.util.Map;
34
import java.util.Map.Entry;
35
import java.util.Properties;
36

    
37
/**
38
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
39
 * export
40
 */
41
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
42
    private Logger log = Logger.getLogger(this.getClass());
43

    
44
    //Index configuratios for OAFs
45
    private EntityConfigTable entityConfigTable;
46

    
47
    //Counters for produced entities. Visible in the JobTracker monitoring page.
48
    private enum STATS_COUNTERS {
49
        datasource,
50
        organization,
51
        result,
52
        person,
53
        project
54
    }
55

    
56

    
57
    private enum REL_COUNTERS {
58
        resultProject,
59
        datasourceOrganization,
60
        organizationOrganization,
61
        resultResult,
62
        personPerson,
63
        personResult,
64
        projectOrganization,
65
        projectPerson,
66
        resultOrganization
67
    }
68

    
69
    //Counter for corrupted records.
70
    private enum ERROR_COUNTERS {
71
        rottenRecords,
72
        rottenRelations
73
    }
74

    
75
    //Init class: Load Index config and mapping for DB tables.
76
    @Override
77
    protected void setup(Context context) throws IOException, InterruptedException {
78
        loadEntityConfig(context);
79
    }
80

    
81

    
82
    //Read HBASE table and decode Protos to OAF entities.
83
    @Override
84
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
85

    
86
        Oaf oaf = null;
87
        OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
88
        try {
89
            oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(keyDecoder.getType().toString())));
90

    
91
        } catch (Exception e) {
92
            /*
93
            System.err.println("Unable to parse proto in row: " + oaf.getEntity().getType().toString() + keyDecoder.getKey());
94
            log.error("Unable to parse proto in row: " + oaf.getEntity().getType().toString() + keyDecoder.getKey(), e);
95
            */
96

    
97
            System.err.println("Unable to parse proto in row: " + keyDecoder.getKey());
98
            log.error("Unable to parse proto in row: " + keyDecoder.getKey(), e);
99

    
100
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
101
        }
102

    
103
        if (isValid(oaf)) {
104
            emitProtos(context, result, oaf);
105

    
106
        }
107

    
108

    
109
    }
110

    
111

    
112
    private boolean isValid(Oaf oaf) {
113
        if (oaf != null && oaf.isInitialized() && !deletedByInference(oaf)) return true;
114
        return false;
115
    }
116

    
117

    
118
    private void emitProtos(Context context, Result result, Oaf oaf) {
119

    
120
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).
121
                setDataInfo(oaf.getDataInfo()).setLastupdatetimestamp(oaf.getLastupdatetimestamp());
122

    
123
        oafBuilder.setEntity(oaf.getEntity());
124

    
125
        // emit relation first so we can cache them to entity protos
126
        emitRelation(context, result, oaf, oafBuilder);
127
        emitEntity(context, oafBuilder.build());
128

    
129
    }
130

    
131
    private void emitEntity(Context context, Oaf oaf) {
132

    
133
        String serialized = Serializer.serialize(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"));
134
        if (serialized != null) {
135
                try {
136
                    Text TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + Serializer.getId(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar")));
137
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
138
                    context.getCounter(STATS_COUNTERS.valueOf(oaf.getEntity().getType().toString())).increment(1);
139

    
140
                } catch (Exception e) {
141
                    log.error("Error writing entity to M/R output", e);
142
                }
143
            }
144

    
145
    }
146

    
147
    // may have multiple relations per each field
148
    private void emitRelation(Context context, Result result, Oaf oaf, Oaf.Builder builder) {
149

    
150
        try {
151

    
152
            // writing out : personResult,ID,data
153
            // derived relations; ResultLanguages, resultConcepts etc are
154
            // created here
155

    
156
            Multimap<String, String> relMap = ArrayListMultimap.create();
157

    
158
            Serializer.extractRelations(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"), relMap);
159

    
160
            if (!relMap.isEmpty()) {
161
                for (Entry<String, String> rel : relMap.entries()) {
162
                    Text TextKeyOut = new Text(rel.getKey() + "," + Serializer.getId(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar")));
163
                    //TODO here output
164
                    context.write((TextKeyOut), new ImmutableBytesWritable(rel.getValue().getBytes()));
165

    
166
                }
167
            }
168

    
169

    
170
        } catch (Throwable e) {
171
            log.error("Error writing relation to M/R output", e);
172
        }
173

    
174

    
175
        // Existing Hbase relations are generated here
176
        if (entityConfigTable.getDescriptors(oaf.getEntity().getType()) != null) {
177

    
178
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(oaf.getEntity().getType())) {
179

    
180
                try {
181

    
182
                    ArrayList<OafRel> oafRels = new ArrayList<OafRel>();
183
                    decodeRelation(oaf, context, result, ld, oafRels);
184

    
185
                    for (OafRel rel : oafRels) {
186
                        {
187

    
188
                        //  builder.getEntityBuilder().addCachedRel(rel);
189

    
190
                            Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString()
191
                                    + "," + Serializer.getId(rel, context.getConfiguration().get("stats.delim"),
192
                                    context.getConfiguration().get("stats.enclChar")));
193

    
194
                            String buff = Serializer.serialize(rel, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"));
195

    
196
                            context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
197

    
198
                            context.getCounter(REL_COUNTERS.valueOf(rel.getRelType().toString())).increment(1);
199
                        }
200

    
201
                    }
202
                } catch (Throwable e) {
203
                    log.error("Error for record ", e);
204
                    context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
205

    
206
                }
207

    
208
            }
209
        }
210

    
211
    }
212

    
213
    private void decodeRelation(final Oaf body, final Context context, Result result, final LinkDescriptor ld, List<OafRel> rels) {
214

    
215
        try {
216
            final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(KindProtos.Kind.relation);
217
            final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
218

    
219
            if (hasData(columnMap)) {
220

    
221
                for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
222

    
223
                    Oaf decodedOaf = decodeProto(e.getValue(), context);
224

    
225
                        OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
226

    
227
                        if (ld.isSymmetric()) {
228
                            RelDescriptor rd = ld.getRelDescriptor();
229
                            relBuilder.setCachedTarget(body.getEntity()).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
230
                        }
231

    
232

    
233
                        OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
234
                        rels.add(oafBuilder.setDataInfo(decodedOaf.getDataInfo()).setRel(oafRel).build().getRel());
235

    
236

    
237

    
238
                }
239
            }
240

    
241
        } catch (Throwable throwable) {
242
            log.error("Error Decoding relation for: " + body.getRel().getRelType() + " " + body.getEntity().getId() + " ", throwable);
243
            context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
244

    
245
        }
246
    }
247

    
248

    
249
    private Oaf decodeProto(final byte[] body, Context context) {
250
        try {
251
            return Oaf.parseFrom(body);
252
        } catch (Exception e) {
253
            log.error(e);
254
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
255

    
256

    
257
        }
258
        return null;
259
    }
260

    
261
    private void loadEntityConfig(Context context) {
262
        String indexConf = context.getConfiguration().get("stats.indexConf");
263

    
264
        if (indexConf == null || indexConf.isEmpty()) {
265
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
266

    
267
        }
268

    
269
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
270

    
271
    }
272

    
273
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
274
        return rel.getSource().contains(rel.getTarget());
275
    }
276

    
277
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
278
        return columnMap != null && !columnMap.isEmpty();
279
    }
280

    
281
    private boolean deletedByInference(final Oaf oaf) {
282
        return oaf.getDataInfo().getDeletedbyinference();
283
    }
284

    
285
    @Override
286
    protected void cleanup(Context context) throws IOException, InterruptedException {
287

    
288
        super.cleanup(context);
289
    }
290

    
291
    public EntityConfigTable getEntityConfigTable() {
292
        return entityConfigTable;
293
    }
294

    
295
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
296
        this.entityConfigTable = entityConfigTable;
297
    }
298

    
299
}
(1-1/2)