Project

General

Profile

« Previous | Next » 

Revision 57508

fixed export procedure: include also relationships

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper2DHP.java
2 2

  
3 3
import com.google.gson.Gson;
4 4
import com.google.protobuf.InvalidProtocolBufferException;
5
import eu.dnetlib.data.mapreduce.util.DedupUtils;
5 6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6 7
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
7 8
import eu.dnetlib.data.proto.OafProtos;
......
50 51

  
51 52
			if (oaf == null) {
52 53
				return;
54
			} else {
55
				emit(context, oaf);
53 56
			}
54 57

  
55 58
			final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
......
57 60
			for (byte[] cf : row.keySet()) {
58 61

  
59 62
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
60

  
61
					if(Bytes.toString(q.getKey()).startsWith("update")) {
63
					if(Bytes.toString(q.getKey()).startsWith("update") || q.getKey().equals(DedupUtils.BODY_S)) {
62 64
						continue;
63 65
					}
64

  
65
					Oaf result = null;
66
					try {
67
						result = ProtoConverter.convert(oaf);
68
					} catch (Throwable e) {
69
						context.getCounter("Convert", "error" ).increment(1);
66
					if (new String(q.getValue()).equals("")) {
67
						context.getCounter("export", "skipped relation " + new String(cf)).increment(1);
68
					} else {
69
						emit(context, OafProtos.Oaf.parseFrom(q.getValue()));
70 70
					}
71
					if (result != null) {
72
						emit(result.getClass().getName(), result, context);
73
					}
74 71
				}
75 72
			}
76 73
		} catch (final Throwable e) {
......
78 75
			throw new RuntimeException(e);
79 76
		}
80 77
	}
81
	private void emit(final String type,  final Oaf result, final Context context) throws IOException, InterruptedException {
82
		keyOut.set(type);
83
		valueOut.set(g.toJson(result));
84
		context.write(keyOut, valueOut);
85 78

  
86
		context.getCounter("export", type).increment(1);
79
	private void emit(Context context, OafProtos.Oaf oaf) throws IOException, InterruptedException {
80
		Oaf result = null;
81
		try {
82
			result = ProtoConverter.convert(oaf);
83
		} catch (Throwable e) {
84
			context.getCounter("export", "error:" + e.getClass().getName()).increment(1);
85
		}
86
		if (result != null) {
87
			keyOut.set(result.getClass().getName());
88
			valueOut.set(g.toJson(result));
89
			context.write(keyOut, valueOut);
90

  
91
			context.getCounter("export", result.getClass().getName()).increment(1);
92
		}
87 93
	}
88 94

  
89 95
	private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type)

Also available in: Unified diff