Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.mapreduce;
2

    
3
import com.google.common.collect.ArrayListMultimap;
4
import com.google.common.collect.Multimap;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
7
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
8
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
11
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
12
import eu.dnetlib.data.proto.KindProtos;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafRel;
15
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
16
import eu.dnetlib.data.proto.TypeProtos;
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20
import org.apache.hadoop.hbase.util.Bytes;
21
import org.apache.hadoop.io.Text;
22
import org.apache.log4j.Logger;
23

    
24
import java.io.IOException;
25
import java.util.ArrayList;
26
import java.util.List;
27
import java.util.Map;
28
import java.util.Map.Entry;
29

    
30
/**
31
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
32
 * export
33
 */
34
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
35
    private Logger log = Logger.getLogger(this.getClass());
36

    
37
    //Index configuratios for OAFs
38
    private EntityConfigTable entityConfigTable;
39

    
40
    //Counters for produced entities. Visible in the JobTracker monitoring page.
41
    private enum STATS_COUNTERS {
42
        datasource,
43
        organization,
44
        result,
45
        person,
46
        project
47
    }
48

    
49
    private enum REL_COUNTERS {
50
        resultProject,
51
        datasourceOrganization,
52
        organizationOrganization,
53
        resultResult,
54
        personPerson,
55
        personResult,
56
        projectOrganization,
57
        projectPerson,
58
        resultOrganization
59
    }
60

    
61
    //Counter for corrupted records.
62
    private enum ERROR_COUNTERS {
63
        rottenRecords,
64
        rottenRelations
65
    }
66

    
67
    private Serializer serializer;
68
    private int id;
69
    private int increment;
70

    
71
    //Init class: Load Index config and mapping for DB tables.
72
    @Override
73
    protected void setup(Context context) {
74
        loadEntityConfig(context);
75

    
76
        this.serializer = new Serializer(context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"));
77

    
78
        id = context.getTaskAttemptID().getTaskID().getId();
79
        increment = context.getConfiguration().getInt("mapred.map.tasks", 0);
80
        if (increment == 0) {
81
            throw new IllegalArgumentException("mapred.map.tasks is zero");
82
        }
83
    }
84

    
85

    
86
    //Read HBASE table and decode Protos to OAF entities.
87
    @Override
88
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) {
89

    
90
        // generating unique integer if for the entity.
91
        id += increment;
92

    
93
        try {
94
            OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
95
            Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(keyDecoder.getType().toString())));
96

    
97
            if (isValid(oaf))
98
                emitProtos(context, result, oaf);
99

    
100
        } catch (Exception e) {
101
            log.error("Unable to parse proto in row: " + new String(keyIn.copyBytes()), e);
102

    
103
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
104
        }
105
    }
106

    
107

    
108
    private boolean isValid(Oaf oaf) {
109
        return oaf != null && oaf.isInitialized() && !oaf.getDataInfo().getDeletedbyinference() && !oaf.getDataInfo().getInvisible();
110
    }
111

    
112

    
113
    private void emitProtos(Context context, Result result, Oaf oaf) {
114

    
115
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).
116
                setDataInfo(oaf.getDataInfo()).setLastupdatetimestamp(oaf.getLastupdatetimestamp());
117

    
118
        oafBuilder.setEntity(oaf.getEntity());
119

    
120
        // emit relation first so we can cache them to entity protos
121
        emitRelation(context, result, oaf);
122
        emitEntity(context, oafBuilder.build());
123

    
124
    }
125

    
126
    private void emitEntity(Context context, Oaf oaf) {
127

    
128
        String serialized = serializer.serialize(oaf);
129
        if (serialized != null) {
130
                try {
131
                    Text TextKeyOut;
132

    
133
                    switch (oaf.getEntity().getType()) {
134
                        case project:
135
                        case datasource:
136
                        case organization:
137
                            TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + serializer.getId(oaf));
138
                            break;
139
                        case result:
140
                            default:
141
                            TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + id);
142
                            break;
143
                    }
144

    
145
                    context.write(TextKeyOut, new ImmutableBytesWritable(serialized.getBytes()));
146
                    context.getCounter(STATS_COUNTERS.valueOf(oaf.getEntity().getType().toString())).increment(1);
147

    
148
                } catch (Exception e) {
149
                    log.error("Error writing entity to M/R output", e);
150
                }
151
            }
152

    
153
    }
154

    
155
    // may have multiple relations per each field
156
    private void emitRelation(Context context, Result result, Oaf oaf) {
157

    
158
        try {
159

    
160
            // writing out : personResult,ID,data
161
            // derived relations; ResultLanguages, resultConcepts etc are
162
            // created here
163

    
164
            Multimap<String, String> relMap = ArrayListMultimap.create();
165

    
166
            serializer.extractRelations(oaf, relMap);
167

    
168
            if (!relMap.isEmpty()) {
169
                for (Entry<String, String> rel : relMap.entries()) {
170
                    Text TextKeyOut;
171

    
172
                    switch (oaf.getEntity().getType()) {
173
                        case project:
174
                        case datasource:
175
                        case organization:
176
                            TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
177
                            break;
178
                        case result:
179
                        default:
180
                            TextKeyOut = new Text(rel.getKey() + "," + id);
181
                            break;
182
                    }
183

    
184
                    context.write((TextKeyOut), new ImmutableBytesWritable(rel.getValue().getBytes()));
185
                }
186
            }
187
        } catch (Throwable e) {
188
            log.error("Error writing relation to M/R output", e);
189
        }
190

    
191
        // Existing Hbase relations are generated here
192
        if (entityConfigTable.getDescriptors(oaf.getEntity().getType()) != null) {
193

    
194
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(oaf.getEntity().getType())) {
195
                try {
196
                    ArrayList<OafRel> oafRels = new ArrayList<OafRel>();
197
                    decodeRelation(oaf, context, result, ld, oafRels);
198

    
199
                    for (OafRel rel : oafRels) {
200
                        {
201
                            Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
202
                            String buff = serializer.serialize(rel);
203

    
204
                            context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
205
                            context.getCounter(REL_COUNTERS.valueOf(rel.getRelType().toString())).increment(1);
206
                        }
207
                    }
208
                } catch (Throwable e) {
209
                    log.error("Error for record ", e);
210
                    context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
211
                }
212
            }
213
        }
214
    }
215

    
216
    private void decodeRelation(final Oaf body, final Context context, Result result, final LinkDescriptor ld, List<OafRel> rels) {
217

    
218
        try {
219
            final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(KindProtos.Kind.relation);
220
            final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
221

    
222
            if (hasData(columnMap)) {
223

    
224
                for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
225

    
226
                    Oaf decodedOaf = decodeProto(e.getValue(), context);
227
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
228

    
229
                    if (ld.isSymmetric()) {
230
                        RelDescriptor rd = ld.getRelDescriptor();
231
                        relBuilder.setCachedTarget(body.getEntity()).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
232
                    }
233

    
234
                    OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
235
                    rels.add(oafBuilder.setDataInfo(decodedOaf.getDataInfo()).setRel(oafRel).build().getRel());
236
                }
237
            }
238

    
239
        } catch (Throwable throwable) {
240
            log.error("Error Decoding relation for: " + body.getRel().getRelType() + " " + body.getEntity().getId() + " ", throwable);
241
            context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
242
        }
243
    }
244

    
245
    private Oaf decodeProto(final byte[] body, Context context) {
246
        try {
247
            return Oaf.parseFrom(body);
248
        } catch (Exception e) {
249
            log.error(e);
250
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
251
        }
252
        return null;
253
    }
254

    
255
    private void loadEntityConfig(Context context) {
256
        String indexConf = context.getConfiguration().get("stats.indexConf");
257

    
258
        if (indexConf == null || indexConf.isEmpty()) {
259
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
260

    
261
        }
262

    
263
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
264

    
265
    }
266

    
267
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
268
        return rel.getSource().contains(rel.getTarget());
269
    }
270

    
271
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
272
        return columnMap != null && !columnMap.isEmpty();
273
    }
274

    
275
    public EntityConfigTable getEntityConfigTable() {
276
        return entityConfigTable;
277
    }
278

    
279
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
280
        this.entityConfigTable = entityConfigTable;
281
    }
282
}
(1-1/2)