Project

General

Profile

« Previous | Next » 

Revision 39861

Added by Eri Katsari over 8 years ago

first changes for lod csv export

View differences:

StatsMapper.java
17 17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18 18
import org.apache.hadoop.hbase.util.Bytes;
19 19
import org.apache.hadoop.io.Text;
20
import org.apache.hadoop.mapreduce.Mapper.Context;
21 20
import org.apache.log4j.Logger;
22 21

  
23 22
import java.io.ByteArrayInputStream;
24 23
import java.io.IOException;
25 24
import java.io.InputStream;
26
import java.util.*;
25
import java.util.HashMap;
26
import java.util.List;
27
import java.util.Map;
27 28
import java.util.Map.Entry;
29
import java.util.Properties;
28 30

  
29 31
/**
30 32
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
......
130 132
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
131 133
        // ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
132 134

  
133
        if (tableMappings.containsKey(type.toString())) {
134
            String serialized = serializer.serialize(oafBuilder.build());
135
            if (serialized != null) {
136 135

  
137
                try {
138
                    Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
139
                    context.getCounter(STATS_COUNTER.WRITTEN_ENTITIES).increment(1);
140
                    context.getCounter(PROTOS_COUNTER.valueOf(type.toString().toUpperCase())).increment(1);
136
        String serialized = serializer.serialize(oafBuilder.build());
137
        if (serialized != null) {
141 138

  
142
                    context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
139
            try {
140
                Text TextKeyOut = new Text("entity" + "," + serializer.getId(oaf));
141
                context.getCounter(STATS_COUNTER.WRITTEN_ENTITIES).increment(1);
142
                context.getCounter(PROTOS_COUNTER.valueOf(type.toString().toUpperCase())).increment(1);
143 143

  
144
                } catch (IOException e) {
145
                    log.error("Error writing entity to M/R output", e);
146
                } catch (InterruptedException e) {
147
                    log.error("Error writing entity to M/R output: Job Interrupted.", e);
148
                }
144
                context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
145

  
146
            } catch (IOException e) {
147
                log.error("Error writing entity to M/R output", e);
148
            } catch (InterruptedException e) {
149
                log.error("Error writing entity to M/R output: Job Interrupted.", e);
149 150
            }
150 151
        }
152

  
151 153
    }
152 154

  
153 155
    // may have multiple relations per each field
......
161 163

  
162 164
                for (Entry<String, List<String>> rel : relMap.entrySet()) {
163 165

  
164
                    Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
166
                    Text TextKeyOut = new Text("entity" + "," + serializer.getId(oaf));
165 167

  
166 168
                    for (String relVals : rel.getValue()) {
167 169

  
......
189 191

  
190 192
                    final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
191 193

  
192
                    ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
194
                    HashMap<String, OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
193 195

  
194 196
                    context.getCounter(STATS_COUNTER.INPUT_RELATIONS).increment(1);
195
                    for (OafRel rel : relOaf) {
197
                    for (Entry<String, OafRel> rel : relOaf.entrySet()) {
196 198

  
197
                        builder.getEntityBuilder().addCachedRel(rel);
199
                        builder.getEntityBuilder().addCachedRel(rel.getValue());
198 200

  
199 201
                        try {
200 202

  
201
                            // TODO skip tables what we dont want to import (
202
                            // like personresults)
203
                            if (tableMappings.containsKey(rel.getRelType().toString()))
204 203

  
205
                            {
206
                                Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
207
                                String buff = serializer.serialize(rel);
208
                                context.getCounter(STATS_COUNTER.WRITTEN_RELATIONS).increment(1);
209
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
204
                            Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(rel.getValue()));
205
                            String buff = serializer.serialize(rel.getValue());
206
                            context.getCounter(STATS_COUNTER.WRITTEN_RELATIONS).increment(1);
207
                            context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
210 208

  
211
                            }
212 209

  
213 210
                        } catch (Exception e) {
214 211
                            log.error("Error while writing Relation Proto to M/R output", e);
......
225 222

  
226 223
    }
227 224

  
228
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
225
    private HashMap<String, OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
229 226

  
230
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
227
        HashMap<String, OafRel> rels = new HashMap<String, OafRel>();
231 228

  
232 229
        if (hasData(columnMap)) {
233 230

  
......
242 239
                    // skip dedups
243 240

  
244 241
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
245
                        // log.info("avoid to emit dedup self: " +
246
                        // relBuilder.getSource());
242
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
243
                        relBuilder.setCachedTarget(body.getEntity());
244
                        rels.put("dedup", oafRel);
247 245

  
248 246
                    } else {
249 247

  
250 248
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
251 249
                        relBuilder.setCachedTarget(body.getEntity());
252
                        rels.add(oafRel);
250
                        rels.put("entity", oafRel);
253 251

  
254 252
                    }
255 253
                }

Also available in: Unified diff