Project

General

Profile

1 41499 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.statsExport.mapreduce;
2 41142 eri.katsar
3 41876 eri.katsar
import com.google.common.collect.ArrayListMultimap;
4
import com.google.common.collect.Multimap;
5 41142 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
7
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
8
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10 42550 eri.katsar
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
11 41142 eri.katsar
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
12 42550 eri.katsar
import eu.dnetlib.data.proto.KindProtos;
13 41142 eri.katsar
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafRel;
15
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
16 57089 antonis.le
import eu.dnetlib.data.proto.TypeProtos;
17 41142 eri.katsar
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20
import org.apache.hadoop.hbase.util.Bytes;
21
import org.apache.hadoop.io.Text;
22
import org.apache.log4j.Logger;
23
24
import java.io.IOException;
25 41876 eri.katsar
import java.util.ArrayList;
26
import java.util.List;
27
import java.util.Map;
28 41142 eri.katsar
import java.util.Map.Entry;
29
30
/**
31
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
32
 * export
33
 */
34
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
35
    private Logger log = Logger.getLogger(this.getClass());
36
37 42575 eri.katsar
    //Index configuratios for OAFs
38 41142 eri.katsar
    private EntityConfigTable entityConfigTable;
39
40 42575 eri.katsar
    //Counters for produced entities. Visible in the JobTracker monitoring page.
41 41499 eri.katsar
    private enum STATS_COUNTERS {
42
        datasource,
43
        organization,
44
        result,
45
        person,
46
        project
47
    }
48
49 42520 eri.katsar
    private enum REL_COUNTERS {
50
        resultProject,
51
        datasourceOrganization,
52
        organizationOrganization,
53
        resultResult,
54
        personPerson,
55
        personResult,
56
        projectOrganization,
57
        projectPerson,
58
        resultOrganization
59
    }
60 41499 eri.katsar
61 42575 eri.katsar
    //Counter for corrupted records.
62
    private enum ERROR_COUNTERS {
63
        rottenRecords,
64
        rottenRelations
65
    }
66 42520 eri.katsar
67 56504 antonis.le
    private Serializer serializer;
68 57089 antonis.le
    private int id;
69
    private int increment;
70 56504 antonis.le
71 42575 eri.katsar
    //Init class: Load Index config and mapping for DB tables.
72 41142 eri.katsar
    @Override
73 55644 antonis.le
    protected void setup(Context context) {
74 41142 eri.katsar
        loadEntityConfig(context);
75 56504 antonis.le
76
        this.serializer = new Serializer(context.getConfiguration().get("stats.delim"), context.getConfiguration().get("stats.enclChar"));
77 57089 antonis.le
78
        id = context.getTaskAttemptID().getTaskID().getId();
79
        increment = context.getConfiguration().getInt("mapred.map.tasks", 0);
80
        if (increment == 0) {
81
            throw new IllegalArgumentException("mapred.map.tasks is zero");
82
        }
83 41876 eri.katsar
    }
84 41142 eri.katsar
85 41499 eri.katsar
86 42575 eri.katsar
    //Read HBASE table and decode Protos to OAF entities.
87 41142 eri.katsar
    @Override
88 55644 antonis.le
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) {
89 41142 eri.katsar
90 57089 antonis.le
        // generating unique integer if for the entity.
91
        id += increment;
92 56483 antonis.le
93 42561 eri.katsar
        try {
94 56483 antonis.le
            OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
95
            Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(keyDecoder.getType().toString())));
96 41142 eri.katsar
97 56483 antonis.le
            if (isValid(oaf))
98
                emitProtos(context, result, oaf);
99
100 42561 eri.katsar
        } catch (Exception e) {
101 56483 antonis.le
            log.error("Unable to parse proto in row: " + new String(keyIn.copyBytes()), e);
102 42127 eri.katsar
103 42579 eri.katsar
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
104 42561 eri.katsar
        }
105 41142 eri.katsar
    }
106
107 42127 eri.katsar
108 41142 eri.katsar
    private boolean isValid(Oaf oaf) {
109 56483 antonis.le
        return oaf != null && oaf.isInitialized() && !oaf.getDataInfo().getDeletedbyinference() && !oaf.getDataInfo().getInvisible();
110 41142 eri.katsar
    }
111
112 42561 eri.katsar
113 41142 eri.katsar
    private void emitProtos(Context context, Result result, Oaf oaf) {
114 42127 eri.katsar
115 42561 eri.katsar
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).
116
                setDataInfo(oaf.getDataInfo()).setLastupdatetimestamp(oaf.getLastupdatetimestamp());
117
118 41142 eri.katsar
        oafBuilder.setEntity(oaf.getEntity());
119
120
        // emit relation first so we can cache them to entity protos
121 57089 antonis.le
        emitRelation(context, result, oaf);
122 42561 eri.katsar
        emitEntity(context, oafBuilder.build());
123 41876 eri.katsar
124 41142 eri.katsar
    }
125
126 42561 eri.katsar
    private void emitEntity(Context context, Oaf oaf) {
127 41142 eri.katsar
128 56504 antonis.le
        String serialized = serializer.serialize(oaf);
129 42730 eri.katsar
        if (serialized != null) {
130 41876 eri.katsar
                try {
131 57521 antonis.le
                    Text TextKeyOut;
132 57089 antonis.le
133 57521 antonis.le
                    switch (oaf.getEntity().getType()) {
134
                        case project:
135
                        case datasource:
136
                        case organization:
137
                            TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + serializer.getId(oaf));
138
                            break;
139
                        case result:
140
                            default:
141
                            TextKeyOut = new Text(oaf.getEntity().getType().toString() + "," + id);
142
                            break;
143
                    }
144 57089 antonis.le
145
                    context.write(TextKeyOut, new ImmutableBytesWritable(serialized.getBytes()));
146 42561 eri.katsar
                    context.getCounter(STATS_COUNTERS.valueOf(oaf.getEntity().getType().toString())).increment(1);
147 41876 eri.katsar
148
                } catch (Exception e) {
149 41142 eri.katsar
                    log.error("Error writing entity to M/R output", e);
150
                }
151
            }
152 42730 eri.katsar
153 41142 eri.katsar
    }
154
155
    // may have multiple relations per each field
156 57089 antonis.le
    private void emitRelation(Context context, Result result, Oaf oaf) {
157 41142 eri.katsar
158
        try {
159 41876 eri.katsar
160
            // writing out : personResult,ID,data
161 41142 eri.katsar
            // derived relations; ResultLanguages, resultConcepts etc are
162
            // created here
163
164 41876 eri.katsar
            Multimap<String, String> relMap = ArrayListMultimap.create();
165 42561 eri.katsar
166 57089 antonis.le
            serializer.extractRelations(oaf, relMap);
167 41142 eri.katsar
168 41876 eri.katsar
            if (!relMap.isEmpty()) {
169
                for (Entry<String, String> rel : relMap.entries()) {
170 57521 antonis.le
                    Text TextKeyOut;
171 57089 antonis.le
172 57521 antonis.le
                    switch (oaf.getEntity().getType()) {
173
                        case project:
174
                        case datasource:
175
                        case organization:
176
                            TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
177
                            break;
178
                        case result:
179
                        default:
180
                            TextKeyOut = new Text(rel.getKey() + "," + id);
181
                            break;
182
                    }
183
184 41976 eri.katsar
                    context.write((TextKeyOut), new ImmutableBytesWritable(rel.getValue().getBytes()));
185 41142 eri.katsar
                }
186
            }
187 42561 eri.katsar
        } catch (Throwable e) {
188 41142 eri.katsar
            log.error("Error writing relation to M/R output", e);
189
        }
190
191
        // Existing Hbase relations are generated here
192 42564 eri.katsar
        if (entityConfigTable.getDescriptors(oaf.getEntity().getType()) != null) {
193 41142 eri.katsar
194 42561 eri.katsar
            for (LinkDescriptor ld : entityConfigTable.getDescriptors(oaf.getEntity().getType())) {
195 41142 eri.katsar
                try {
196 42575 eri.katsar
                    ArrayList<OafRel> oafRels = new ArrayList<OafRel>();
197
                    decodeRelation(oaf, context, result, ld, oafRels);
198 41142 eri.katsar
199 42575 eri.katsar
                    for (OafRel rel : oafRels) {
200 42561 eri.katsar
                        {
201 56504 antonis.le
                            Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
202
                            String buff = serializer.serialize(rel);
203 42561 eri.katsar
204 41876 eri.katsar
                            context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
205 42520 eri.katsar
                            context.getCounter(REL_COUNTERS.valueOf(rel.getRelType().toString())).increment(1);
206 41876 eri.katsar
                        }
207 41976 eri.katsar
                    }
208 42561 eri.katsar
                } catch (Throwable e) {
209
                    log.error("Error for record ", e);
210 42579 eri.katsar
                    context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
211 41142 eri.katsar
                }
212
            }
213
        }
214
    }
215
216 42561 eri.katsar
    private void decodeRelation(final Oaf body, final Context context, Result result, final LinkDescriptor ld, List<OafRel> rels) {
217 42550 eri.katsar
218 42561 eri.katsar
        try {
219 42564 eri.katsar
            final Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(KindProtos.Kind.relation);
220 42561 eri.katsar
            final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
221 41142 eri.katsar
222 42561 eri.katsar
            if (hasData(columnMap)) {
223 42550 eri.katsar
224 42561 eri.katsar
                for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
225 42550 eri.katsar
226 42579 eri.katsar
                    Oaf decodedOaf = decodeProto(e.getValue(), context);
227 56504 antonis.le
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
228 42550 eri.katsar
229 56504 antonis.le
                    if (ld.isSymmetric()) {
230
                        RelDescriptor rd = ld.getRelDescriptor();
231
                        relBuilder.setCachedTarget(body.getEntity()).setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType());
232
                    }
233 42550 eri.katsar
234 56504 antonis.le
                    OafRel oafRel = relBuilder.setChild(ld.isChild()).build();
235
                    rels.add(oafBuilder.setDataInfo(decodedOaf.getDataInfo()).setRel(oafRel).build().getRel());
236 42561 eri.katsar
                }
237 41142 eri.katsar
            }
238 42550 eri.katsar
239 42561 eri.katsar
        } catch (Throwable throwable) {
240
            log.error("Error Decoding relation for: " + body.getRel().getRelType() + " " + body.getEntity().getId() + " ", throwable);
241 42579 eri.katsar
            context.getCounter(ERROR_COUNTERS.rottenRelations).increment(1);
242 42550 eri.katsar
        }
243
    }
244
245 42579 eri.katsar
    private Oaf decodeProto(final byte[] body, Context context) {
246 41142 eri.katsar
        try {
247
            return Oaf.parseFrom(body);
248 42520 eri.katsar
        } catch (Exception e) {
249 42127 eri.katsar
            log.error(e);
250 42579 eri.katsar
            context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1);
251 41142 eri.katsar
        }
252
        return null;
253
    }
254
255
    private void loadEntityConfig(Context context) {
256
        String indexConf = context.getConfiguration().get("stats.indexConf");
257
258
        if (indexConf == null || indexConf.isEmpty()) {
259
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
260
261
        }
262
263
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
264
265
    }
266
267
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
268
        return rel.getSource().contains(rel.getTarget());
269
    }
270
271
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
272
        return columnMap != null && !columnMap.isEmpty();
273
    }
274
275
    public EntityConfigTable getEntityConfigTable() {
276
        return entityConfigTable;
277
    }
278
279
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
280
        this.entityConfigTable = entityConfigTable;
281
    }
282
}