Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.mapreduce;
2

    
3
import com.google.common.collect.ArrayListMultimap;
4
import com.google.common.collect.Multimap;
5
import com.google.protobuf.InvalidProtocolBufferException;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
7
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
8
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
9
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.ContextTransformer;
10
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
13
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
14
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
15
import eu.dnetlib.data.proto.KindProtos;
16
import eu.dnetlib.data.proto.OafProtos.Oaf;
17
import eu.dnetlib.data.proto.OafProtos.OafRel;
18
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
19
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
20
import eu.dnetlib.data.proto.TypeProtos.Type;
21
import org.apache.hadoop.hbase.client.Result;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableMapper;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26
import org.apache.log4j.Logger;
27

    
28
import java.io.ByteArrayInputStream;
29
import java.io.IOException;
30
import java.io.InputStream;
31
import java.util.ArrayList;
32
import java.util.List;
33
import java.util.Map;
34
import java.util.Map.Entry;
35
import java.util.Properties;
36

    
37
/**
38
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
39
 * export
40
 */
41
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
42
    private Logger log = Logger.getLogger(this.getClass());
43

    
44
    //Index configuratios for OAFs
45
    private EntityConfigTable entityConfigTable;
46

    
47
    //Counters for produced entities. Visible in the JobTracker monitoring page.
48
    private enum STATS_COUNTERS {
49
        datasource,
50
        organization,
51
        result,
52
        person,
53
        project
54
    }
55

    
56

    
57
    private enum REL_COUNTERS {
58
        resultProject,
59
        datasourceOrganization,
60
        organizationOrganization,
61
        resultResult,
62
        personPerson,
63
        personResult,
64
        projectOrganization,
65
        projectPerson,
66
        resultOrganization
67
    }
68

    
69
    //Counter for corrupted records.
70
    private enum ERROR_COUNTERS {
71
        rottenRecords,
72
        rottenRelations
73
    }
74

    
75
    //Init class: Load Index config and mapping for DB tables.
76
    @Override
77
    protected void setup(Context context) throws IOException, InterruptedException {
78
        loadEntityConfig(context);
79
    }
80

    
81

    
82
    //Read HBASE table and decode Protos to OAF entities.
83
    @Override
84
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
85

    
86
        Oaf oaf = null;
87
        OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
88
        try {
89
            oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(keyDecoder.getType().toString())));
90

    
91
        } catch (Exception e) {
92
            /*
93
            System.err.println("Unable to parse proto in row: " + oaf.getEntity().getType().toString() + keyDecoder.getKey());
94
            log.error("Unable to parse proto in row: " + oaf.getEntity().getType().toString() + keyDecoder.getKey(), e);
95
            */
96

    
97
            System.err.println("Unable to parse proto in row: " + keyDecoder.getKey());
98
            log.error("Unable to parse proto in row: " + keyDecoder.getKey(), e);
99

    
100
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
101
        }
102

    
103
        if (isValid(oaf)) {
104
            emitProtos(context, result, oaf);
105

    
106
        }
107

    
108

    
109
    }
110

    
111

    
112
    private boolean isValid(Oaf oaf) {
113
        if (oaf != null && oaf.isInitialized() && !deletedByInference(oaf) && !invisible(oaf)) return true;
114
        //if (oaf != null && oaf.isInitialized() && !deletedByInference(oaf)) return true;
115
        return false;
116
    }
117

    
118

    
119
    private void emitProtos(Context context, Result result, Oaf oaf) {
120

    
121
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).
122
                setDataInfo(oaf.getDataInfo()).setLastupdatetimestamp(oaf.getLastupdatetimestamp());
123

    
124
        oafBuilder.setEntity(oaf.getEntity());
125

    
126
        // emit relation first so we can cache them to entity protos
127
        emitRelation(context, result, oaf, oafBuilder);
128
        emitEntity(context, oafBuilder.build());
129

    
130
    }
131

    
132
    private void emitEntity(Context context, Oaf oaf) {
133

    
134
        String serialized = Serializer.serialize(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"));
135
        if (serialized != null) {
136
                try {
137
                    Text TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + Serializer.getId(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar")));
138
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
139
                    context.getCounter(STATS_COUNTERS.valueOf(oaf.getEntity().getType().toString())).increment(1);
140

    
141
                } catch (Exception e) {
142
                    log.error("Error writing entity to M/R output", e);
143
                }
144
            }
145

    
146
    }
147

    
148
    // may have multiple relations per each field
149
    private void emitRelation(Context context, Result result, Oaf oaf, Oaf.Builder builder) {
150

    
151
        try {
152

    
153
            // writing out : personResult,ID,data
154
            // derived relations; ResultLanguages, resultConcepts etc are
155
            // created here
156

    
157
            Multimap<String, String> relMap = ArrayListMultimap.create();
158

    
159
            Serializer.extractRelations(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"), relMap);
160

    
161
            if (!relMap.isEmpty()) {
162
                for (Entry<String, String> rel : relMap.entries()) {
163
                    Text TextKeyOut = new Text(rel.getKey() + "," + Serializer.getId(oaf, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar")));
164
                    //TODO here output
165
                    context.write((TextKeyOut), new ImmutableBytesWritable(rel.getValue().getBytes()));
166

    
167
                }
168
            }
169

    
170

    
171
        } catch (Throwable e) {
172
            log.error("Error writing relation to M/R output", e);
173
        }
174

    
175

    
176
        // Existing Hbase relations are generated here
177
        if (entityConfigTable.getDescriptors(oaf.getEntity().getType()) != null) {
178

    
179
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(oaf.getEntity().getType())) {
180

    
181
                try {
182

    
183
                    ArrayList<OafRel> oafRels = new ArrayList<OafRel>();
184
                    decodeRelation(oaf, context, result, ld, oafRels);
185

    
186
                    for (OafRel rel : oafRels) {
187
                        {
188

    
189
                        //  builder.getEntityBuilder().addCachedRel(rel);
190

    
191
                            Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString()
192
                                    + "," + Serializer.getId(rel, context.getConfiguration().get("stats.delim"),
193
                                    context.getConfiguration().get("stats.enclChar")));
194

    
195
                            String buff = Serializer.serialize(rel, context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"));
196

    
197
                            context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
198

    
199
                            context.getCounter(REL_COUNTERS.valueOf(rel.getRelType().toString())).increment(1);
200
                        }
201

    
202
                    }
203
                } catch (Throwable e) {
204
                    log.error("Error for record ", e);
205
                    context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
206

    
207
                }
208

    
209
            }
210
        }
211

    
212
    }
213

    
214
    private void decodeRelation(final Oaf body, final Context context, Result result, final LinkDescriptor ld, List<OafRel> rels) {
215

    
216
        try {
217
            final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(KindProtos.Kind.relation);
218
            final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
219

    
220
            if (hasData(columnMap)) {
221

    
222
                for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
223

    
224
                    Oaf decodedOaf = decodeProto(e.getValue(), context);
225

    
226
                        OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
227

    
228
                        if (ld.isSymmetric()) {
229
                            RelDescriptor rd = ld.getRelDescriptor();
230
                            relBuilder.setCachedTarget(body.getEntity()).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
231
                        }
232

    
233

    
234
                        OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
235
                        rels.add(oafBuilder.setDataInfo(decodedOaf.getDataInfo()).setRel(oafRel).build().getRel());
236

    
237

    
238

    
239
                }
240
            }
241

    
242
        } catch (Throwable throwable) {
243
            log.error("Error Decoding relation for: " + body.getRel().getRelType() + " " + body.getEntity().getId() + " ", throwable);
244
            context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
245

    
246
        }
247
    }
248

    
249

    
250
    private Oaf decodeProto(final byte[] body, Context context) {
251
        try {
252
            return Oaf.parseFrom(body);
253
        } catch (Exception e) {
254
            log.error(e);
255
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
256

    
257

    
258
        }
259
        return null;
260
    }
261

    
262
    private void loadEntityConfig(Context context) {
263
        String indexConf = context.getConfiguration().get("stats.indexConf");
264

    
265
        if (indexConf == null || indexConf.isEmpty()) {
266
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
267

    
268
        }
269

    
270
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
271

    
272
    }
273

    
274
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
275
        return rel.getSource().contains(rel.getTarget());
276
    }
277

    
278
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
279
        return columnMap != null && !columnMap.isEmpty();
280
    }
281

    
282
    private boolean deletedByInference(final Oaf oaf) {
283
        return oaf.getDataInfo().getDeletedbyinference();
284
    }
285

    
286
    private boolean invisible(final Oaf oaf) {
287
        return oaf.getDataInfo().getInvisible();
288
    }
289

    
290
    @Override
291
    protected void cleanup(Context context) throws IOException, InterruptedException {
292

    
293
        super.cleanup(context);
294
    }
295

    
296
    public EntityConfigTable getEntityConfigTable() {
297
        return entityConfigTable;
298
    }
299

    
300
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
301
        this.entityConfigTable = entityConfigTable;
302
    }
303

    
304
}
(1-1/2)