Revision 49218
Added by Claudio Atzori over 6 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/MindistSearchMapper.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 | 4 |
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
5 | 7 |
import org.apache.hadoop.io.Text; |
6 | 8 |
import org.apache.hadoop.mapreduce.Mapper; |
7 | 9 |
|
... | ... | |
10 | 12 |
*/ |
11 | 13 |
public class MindistSearchMapper extends Mapper<Text, VertexWritable, Text, VertexWritable> { |
12 | 14 |
|
15 |
private static final Log log = LogFactory.getLog(MindistSearchMapper.class); |
|
16 |
|
|
17 |
private boolean debug = false; |
|
18 |
|
|
13 | 19 |
@Override |
20 |
protected void setup(Mapper.Context context) throws IOException, InterruptedException { |
|
21 |
super.setup(context); |
|
22 |
|
|
23 |
debug = context.getConfiguration().getBoolean("mindist_DEBUG", false); |
|
24 |
log.info("debug mode: " + debug); |
|
25 |
} |
|
26 |
|
|
27 |
@Override |
|
14 | 28 |
protected void map(Text key, VertexWritable value, Context context) throws IOException, InterruptedException { |
15 | 29 |
|
16 |
context.write(key, value);
|
|
30 |
emit(key, value, context);
|
|
17 | 31 |
if (value.isActivated()) { |
18 |
VertexWritable writable = new VertexWritable();
|
|
19 |
for (Text neighborVertex : value.getEdges()) {
|
|
20 |
if (!neighborVertex.toString().equals(value.getVertexId().toString())) {
|
|
21 |
writable.setVertexId(value.getVertexId());
|
|
22 |
writable.setEdges(null);
|
|
23 |
context.write(neighborVertex, writable);
|
|
32 |
VertexWritable vertex = new VertexWritable();
|
|
33 |
for (Text edge : value.getEdges()) {
|
|
34 |
if (!edge.toString().equals(value.getVertexId().toString())) {
|
|
35 |
vertex.setVertexId(value.getVertexId());
|
|
36 |
vertex.setEdges(null);
|
|
37 |
emit(edge, vertex, context);
|
|
24 | 38 |
} |
25 | 39 |
} |
26 | 40 |
} |
27 | 41 |
} |
28 | 42 |
|
43 |
private void emit(final Text key, final VertexWritable vertex, final Context context) throws IOException, InterruptedException { |
|
44 |
context.write(key, vertex); |
|
45 |
if (debug) { |
|
46 |
log.info(vertex.toJSON()); |
|
47 |
} |
|
48 |
} |
|
49 |
|
|
29 | 50 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/MindistSearchReducer.java | ||
---|---|---|
15 | 15 |
|
16 | 16 |
private static final Log log = LogFactory.getLog(MindistSearchReducer.class); |
17 | 17 |
|
18 |
public static final String UPDATE_COUNTER = "UpdateCounter"; |
|
19 |
public static final String SKIPPED = "SKIPPED"; |
|
20 |
public static final String UPDATED = "UPDATED"; |
|
21 |
|
|
18 | 22 |
private boolean depthOne; |
19 | 23 |
|
20 | 24 |
private boolean debug = false; |
... | ... | |
28 | 32 |
depthOne = true; |
29 | 33 |
} |
30 | 34 |
|
31 |
try { |
|
32 |
debug = Boolean.valueOf(context.getConfiguration().get("mindist_DEBUG")); |
|
33 |
} catch(Throwable e) { |
|
34 |
debug = false; |
|
35 |
} |
|
35 |
debug = context.getConfiguration().getBoolean("mindist_DEBUG", false); |
|
36 | 36 |
log.info("debug mode: " + debug); |
37 | 37 |
} |
38 | 38 |
|
... | ... | |
41 | 41 |
|
42 | 42 |
VertexWritable realVertex = null; |
43 | 43 |
Text currentMinimalKey = null; |
44 |
boolean foundEdges = false; |
|
44 |
//boolean foundEdges = false;
|
|
45 | 45 |
|
46 | 46 |
if (depthOne) { |
47 | 47 |
for (VertexWritable vertex : values) { |
48 | 48 |
if (!vertex.isMessage()) { |
49 | 49 |
//log.info(String.format("found vertex with edges: %s", key.toString())); |
50 | 50 |
realVertex = vertex.clone(); |
51 |
foundEdges = true; |
|
52 | 51 |
} |
53 | 52 |
} |
54 | 53 |
|
55 | 54 |
if (realVertex == null) { |
56 |
throw new IllegalStateException(String.format("foundEdges: %s, invalid input, key: '%s'", foundEdges, key.toString())); |
|
55 |
context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1); |
|
56 |
return; |
|
57 | 57 |
} |
58 | 58 |
|
59 | 59 |
realVertex.setActivated(true); |
... | ... | |
63 | 63 |
realVertex.setVertexId(key); |
64 | 64 |
} |
65 | 65 |
|
66 |
context.getCounter("UpdateCounter", "UPDATED").increment(1);
|
|
66 |
context.getCounter(UPDATE_COUNTER, UPDATED).increment(1);
|
|
67 | 67 |
} else { |
68 | 68 |
for (VertexWritable vertex : values) { |
69 | 69 |
if (!vertex.isMessage()) { |
... | ... | |
82 | 82 |
} |
83 | 83 |
} |
84 | 84 |
|
85 |
if (realVertex == null) { |
|
86 |
context.getCounter(UPDATE_COUNTER, SKIPPED).increment(1); |
|
87 |
return; |
|
88 |
} |
|
89 |
|
|
85 | 90 |
if (currentMinimalKey != null && currentMinimalKey.compareTo(realVertex.getVertexId()) < 0) { |
86 | 91 |
realVertex.setVertexId(currentMinimalKey); |
87 | 92 |
realVertex.setActivated(true); |
88 |
context.getCounter("UpdateCounter", "UPDATED").increment(1);
|
|
93 |
context.getCounter(UPDATE_COUNTER, UPDATED).increment(1);
|
|
89 | 94 |
} else { |
90 | 95 |
realVertex.setActivated(false); |
91 | 96 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/VertexWritable.java | ||
---|---|---|
3 | 3 |
import java.io.DataInput; |
4 | 4 |
import java.io.DataOutput; |
5 | 5 |
import java.io.IOException; |
6 |
import java.lang.reflect.Type; |
|
7 | 6 |
import java.util.TreeSet; |
8 | 7 |
|
9 | 8 |
import com.google.gson.*; |
... | ... | |
53 | 52 |
|
54 | 53 |
public void addVertex(Text id) { |
55 | 54 |
if (edges == null) { |
56 |
edges = new TreeSet<Text>();
|
|
55 |
edges = new TreeSet<>(); |
|
57 | 56 |
} |
58 | 57 |
edges.add(id); |
59 | 58 |
} |
... | ... | |
78 | 77 |
vertexId.readFields(in); |
79 | 78 |
int length = in.readInt(); |
80 | 79 |
if (length > -1) { |
81 |
edges = new TreeSet<Text>();
|
|
80 |
edges = new TreeSet<>(); |
|
82 | 81 |
for (int i = 0; i < length; i++) { |
83 | 82 |
Text temp = new Text(); |
84 | 83 |
temp.readFields(in); |
... | ... | |
103 | 102 |
|
104 | 103 |
public static VertexWritable fromJSON(String json) { |
105 | 104 |
final GsonBuilder builder = new GsonBuilder(); |
106 |
builder.registerTypeAdapter(VertexWritable.class, new JsonDeserializer<VertexWritable>() {
|
|
105 |
builder.registerTypeAdapter(VertexWritable.class, (JsonDeserializer<VertexWritable>) (json1, typeOfT, context) -> {
|
|
107 | 106 |
|
108 |
@Override |
|
109 |
public VertexWritable deserialize(final JsonElement json, final Type typeOfT, final JsonDeserializationContext context) throws JsonParseException { |
|
107 |
JsonObject jo = json1.getAsJsonObject(); |
|
110 | 108 |
|
111 |
JsonObject jo = json.getAsJsonObject(); |
|
109 |
VertexWritable vertexWritable = new VertexWritable(); |
|
110 |
vertexWritable.setVertexId(new Text(jo.get("vertexId").getAsString())); |
|
111 |
vertexWritable.setActivated(jo.get("activated").getAsBoolean()); |
|
112 | 112 |
|
113 |
VertexWritable vertexWritable = new VertexWritable(); |
|
114 |
vertexWritable.setVertexId(new Text(jo.get("vertexId").getAsString())); |
|
115 |
vertexWritable.setActivated(jo.get("activated").getAsBoolean()); |
|
113 |
TreeSet<Text> edges = new TreeSet<Text>(); |
|
116 | 114 |
|
117 |
TreeSet<Text> edges = new TreeSet<Text>(); |
|
118 |
|
|
119 |
for(JsonElement e : jo.get("edges").getAsJsonArray()) { |
|
120 |
edges.add(new Text(e.getAsString())); |
|
121 |
} |
|
122 |
vertexWritable.setEdges(edges); |
|
123 |
return vertexWritable; |
|
115 |
for(JsonElement e : jo.get("edges").getAsJsonArray()) { |
|
116 |
edges.add(new Text(e.getAsString())); |
|
124 | 117 |
} |
118 |
vertexWritable.setEdges(edges); |
|
119 |
return vertexWritable; |
|
125 | 120 |
}); |
126 | 121 |
return builder.create().fromJson(json, VertexWritable.class); |
127 | 122 |
|
... | ... | |
130 | 125 |
public String toJSON() { |
131 | 126 |
|
132 | 127 |
final GsonBuilder builder = new GsonBuilder(); |
133 |
builder.registerTypeAdapter(VertexWritable.class, new JsonSerializer<VertexWritable>() { |
|
134 |
@Override |
|
135 |
public JsonElement serialize(final VertexWritable src, final Type typeOfSrc, final JsonSerializationContext context) { |
|
136 |
JsonObject res = new JsonObject(); |
|
137 |
res.addProperty("vertexId", getVertexId().toString()); |
|
138 |
res.addProperty("activated", isActivated()); |
|
128 |
builder.registerTypeAdapter(VertexWritable.class, (JsonSerializer<VertexWritable>) (src, typeOfSrc, context) -> { |
|
129 |
JsonObject res = new JsonObject(); |
|
130 |
res.addProperty("vertexId", getVertexId().toString()); |
|
131 |
res.addProperty("activated", isActivated()); |
|
139 | 132 |
|
140 |
JsonArray edges = new JsonArray(); |
|
141 |
if (!isMessage()) { |
|
142 |
for (Text edge : getEdges()) { |
|
143 |
edges.add(new JsonPrimitive(edge.toString())); |
|
144 |
} |
|
133 |
JsonArray edges = new JsonArray(); |
|
134 |
if (!isMessage()) { |
|
135 |
for (Text edge : getEdges()) { |
|
136 |
edges.add(new JsonPrimitive(edge.toString())); |
|
145 | 137 |
} |
146 |
res.add("edges", edges); |
|
147 |
return res; |
|
148 | 138 |
} |
139 |
res.add("edges", edges); |
|
140 |
return res; |
|
149 | 141 |
}); |
150 | 142 |
|
151 | 143 |
return builder.create().toJson(this); |
... | ... | |
155 | 147 |
protected VertexWritable clone() { |
156 | 148 |
VertexWritable toReturn = new VertexWritable(new Text(vertexId.toString())); |
157 | 149 |
if (edges != null) { |
158 |
toReturn.edges = new TreeSet<Text>();
|
|
150 |
toReturn.edges = new TreeSet<>(); |
|
159 | 151 |
for (Text l : edges) { |
160 | 152 |
toReturn.edges.add(new Text(l.toString())); |
161 | 153 |
} |
Also available in: Unified diff
skip weird cases in CC algo