Revision 57580
Added by Miriam Baglioni over 4 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/orcidthroughproducts/PropagationOrcidToResultFileReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.orcidthroughproducts; |
|
2 |
|
|
3 |
import com.googlecode.protobuf.format.JsonFormat; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.PropagationProjectToResultReducer; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.ResultProjectIterator; |
|
8 |
import eu.dnetlib.data.proto.OafProtos; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
12 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 |
import org.apache.hadoop.io.Text; |
|
14 |
import org.apache.hadoop.mapreduce.Reducer; |
|
15 |
|
|
16 |
import java.io.IOException; |
|
17 |
import java.util.List; |
|
18 |
|
|
19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; |
|
20 |
|
|
21 |
public class PropagationOrcidToResultFileReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> { |
|
22 |
private static final Log log = LogFactory.getLog(PropagationOrcidToResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
private Text keyOut; |
|
25 |
private Text outValue; |
|
26 |
|
|
27 |
|
|
28 |
@Override |
|
29 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
30 |
super.setup(context); |
|
31 |
keyOut = new Text(""); |
|
32 |
outValue = new Text(); |
|
33 |
} |
|
34 |
|
|
35 |
|
|
36 |
@Override |
|
37 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
38 |
ResultIterator rh = null; |
|
39 |
try { |
|
40 |
rh = new ResultOrcidIterator(values, Bytes.toString(key.copyBytes())); |
|
41 |
} catch (NotValidResultSequenceException e) { |
|
42 |
context.getCounter(COUNTER_PROPAGATION, e.getMessage()).increment(1); |
|
43 |
return; |
|
44 |
} |
|
45 |
while (rh.hasNext()) { |
|
46 |
List<OafProtos.Oaf> oaf_list = rh.next(); |
|
47 |
if(oaf_list != null){ |
|
48 |
for (OafProtos.Oaf oaf : oaf_list) { |
|
49 |
keyOut.set(oaf.getEntity().getId()); |
|
50 |
outValue.set(JsonFormat.printToString(oaf).getBytes()); |
|
51 |
context.write(keyOut, outValue); |
|
52 |
context.getCounter(COUNTER_PROPAGATION, "Added orcid to result").increment(1); |
|
53 |
} |
|
54 |
|
|
55 |
} |
|
56 |
|
|
57 |
} |
|
58 |
|
|
59 |
} |
|
60 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/orcidthroughproducts/PropagationOrcidToResultReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.orcidthroughproducts; |
|
2 |
|
|
3 |
import com.googlecode.protobuf.format.JsonFormat; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.PropagationProjectToResultReducer; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.ResultProjectIterator; |
|
8 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
9 |
import eu.dnetlib.data.proto.OafProtos; |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.apache.hadoop.hbase.client.Put; |
|
13 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
14 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
15 |
import org.apache.hadoop.hbase.util.Bytes; |
|
16 |
import org.apache.hadoop.io.Text; |
|
17 |
|
|
18 |
import java.io.IOException; |
|
19 |
import java.util.List; |
|
20 |
|
|
21 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; |
|
22 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.RELATION; |
|
23 |
|
|
24 |
public class PropagationOrcidToResultReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> { |
|
25 |
private static final Log log = LogFactory.getLog(PropagationOrcidToResultReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
26 |
private ImmutableBytesWritable keyOut; |
|
27 |
|
|
28 |
|
|
29 |
|
|
30 |
@Override |
|
31 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
32 |
super.setup(context); |
|
33 |
keyOut = new ImmutableBytesWritable(); |
|
34 |
} |
|
35 |
|
|
36 |
|
|
37 |
@Override |
|
38 |
protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { |
|
39 |
ResultIterator rh = null; |
|
40 |
try { |
|
41 |
rh = new ResultOrcidIterator(values, Bytes.toString(key.copyBytes())); |
|
42 |
} catch (NotValidResultSequenceException e) { |
|
43 |
context.getCounter(COUNTER_PROPAGATION, e.getMessage()).increment(1); |
|
44 |
return; |
|
45 |
} |
|
46 |
|
|
47 |
while (rh.hasNext()) { |
|
48 |
List<OafProtos.Oaf> oaf_list = rh.next(); |
|
49 |
if(oaf_list != null){ |
|
50 |
for (OafProtos.Oaf oaf : oaf_list) { |
|
51 |
byte[] targetRowKey = Bytes.toBytes(oaf.getEntity().getId()); |
|
52 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oaf.toByteArray()); |
|
53 |
keyOut.set(targetRowKey); |
|
54 |
context.write(keyOut, put); |
|
55 |
context.getCounter(COUNTER_PROPAGATION, "added orcid to product").increment(1); |
|
56 |
|
|
57 |
} |
|
58 |
|
|
59 |
} |
|
60 |
|
|
61 |
} |
|
62 |
|
|
63 |
|
|
64 |
|
|
65 |
} |
|
66 |
|
|
67 |
|
|
68 |
|
|
69 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/orcidthroughproducts/ResultOrcidIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.orcidthroughproducts; |
|
2 |
|
|
3 |
import com.googlecode.protobuf.format.JsonFormat; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.*; |
|
5 |
import eu.dnetlib.data.proto.*; |
|
6 |
import org.apache.hadoop.io.Text; |
|
7 |
|
|
8 |
import java.util.ArrayList; |
|
9 |
import java.util.Arrays; |
|
10 |
import java.util.Iterator; |
|
11 |
import java.util.List; |
|
12 |
import java.util.stream.Collectors; |
|
13 |
|
|
14 |
public class ResultOrcidIterator extends ResultIterator { |
|
15 |
|
|
16 |
private Iterator<String> author_iterator; |
|
17 |
private List<FieldTypeProtos.Author> autoritative_authors ; |
|
18 |
private List<String> relatedResult ; |
|
19 |
|
|
20 |
|
|
21 |
public ResultOrcidIterator(final Iterable<Text> values, final String key) throws NotValidResultSequenceException { |
|
22 |
super(values,key); |
|
23 |
} |
|
24 |
|
|
25 |
@Override |
|
26 |
protected void checkSequence() throws NotValidResultSequenceException { |
|
27 |
if(!it.hasNext()){ |
|
28 |
throw new NotValidResultSequenceException("Empty information for key"); |
|
29 |
} |
|
30 |
|
|
31 |
try { |
|
32 |
autoritative_authors = new ArrayList<>(); |
|
33 |
relatedResult = new ArrayList<>(); |
|
34 |
analizeValueList(); |
|
35 |
|
|
36 |
}catch(JsonFormat.ParseException e){ |
|
37 |
throw new NotValidResultSequenceException("Problems recreating the author list from serialization"); |
|
38 |
} |
|
39 |
|
|
40 |
List<FieldTypeProtos.Author> authors_with_orcid = autoritative_authors.stream() |
|
41 |
.map(a -> { |
|
42 |
if (a.getPidList() == null || a.getPidList().isEmpty()) |
|
43 |
return null; |
|
44 |
return a; |
|
45 |
}) |
|
46 |
.filter(a -> a!= null) |
|
47 |
.filter(a -> containsOrcid(a.getPidList())) |
|
48 |
.collect(Collectors.toList()); |
|
49 |
|
|
50 |
|
|
51 |
if(authors_with_orcid.size() == 0 || relatedResult.size() == 0){ |
|
52 |
resultId = TERMINATOR; |
|
53 |
return; |
|
54 |
} |
|
55 |
|
|
56 |
|
|
57 |
author_iterator = relatedResult.iterator(); |
|
58 |
autoritative_authors = authors_with_orcid; |
|
59 |
getNext(); |
|
60 |
|
|
61 |
} |
|
62 |
|
|
63 |
private boolean containsOrcid(List<FieldTypeProtos.KeyValue> pidList){ |
|
64 |
if(pidList == null) |
|
65 |
return false; |
|
66 |
return pidList |
|
67 |
.stream() |
|
68 |
.filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID)) |
|
69 |
.collect(Collectors.toList()).size() > 0; |
|
70 |
} |
|
71 |
|
|
72 |
private void getNext(){ |
|
73 |
if (author_iterator.hasNext()) |
|
74 |
resultId = author_iterator.next(); |
|
75 |
else |
|
76 |
resultId = TERMINATOR; |
|
77 |
} |
|
78 |
|
|
79 |
@Override |
|
80 |
public List<OafProtos.Oaf> next() { |
|
81 |
//get the next merged author list |
|
82 |
try { |
|
83 |
//list of authors in the related result |
|
84 |
Emit e = Emit.fromJson(resultId); |
|
85 |
List<FieldTypeProtos.Author> author_list = getAuthorList(e); |
|
86 |
|
|
87 |
ResultProtos.Result.Metadata.Builder metadata = searchMatch(author_list); |
|
88 |
|
|
89 |
if (metadata != null){ |
|
90 |
ArrayList<OafProtos.Oaf> ret = new ArrayList<OafProtos.Oaf>(Arrays.asList(getUpdate(metadata, e.getId()))); |
|
91 |
getNext(); |
|
92 |
return ret; |
|
93 |
} |
|
94 |
|
|
95 |
|
|
96 |
}catch(JsonFormat.ParseException e){ |
|
97 |
|
|
98 |
} |
|
99 |
getNext(); |
|
100 |
return null; |
|
101 |
} |
|
102 |
|
|
103 |
private ResultProtos.Result.Metadata.Builder searchMatch(List<FieldTypeProtos.Author> author_list){ |
|
104 |
ResultProtos.Result.Metadata.Builder metadataBuilder = ResultProtos.Result.Metadata.newBuilder(); |
|
105 |
boolean updated = false; |
|
106 |
// for (FieldTypeProtos.Author a: autoritative_authors){ |
|
107 |
// searchAuthor(a,author_list); |
|
108 |
// } |
|
109 |
|
|
110 |
for (FieldTypeProtos.Author a: author_list){ |
|
111 |
FieldTypeProtos.Author.Builder author = searchAuthor(a, autoritative_authors); |
|
112 |
if(author != null){ |
|
113 |
updated = true; |
|
114 |
metadataBuilder.addAuthor(author); |
|
115 |
}else{ |
|
116 |
metadataBuilder.addAuthor(FieldTypeProtos.Author.newBuilder(a)); |
|
117 |
} |
|
118 |
} |
|
119 |
if(updated) |
|
120 |
return metadataBuilder; |
|
121 |
return null; |
|
122 |
} |
|
123 |
|
|
124 |
|
|
125 |
private boolean equals(FieldTypeProtos.Author a1, FieldTypeProtos.Author a2){ |
|
126 |
if(a1.hasSurname()){ |
|
127 |
if(a2.hasSurname()){ |
|
128 |
if(!a1.getSurname().trim().equalsIgnoreCase(a2.getSurname().trim())){ |
|
129 |
return false; |
|
130 |
} |
|
131 |
//have the same surname. Check the name |
|
132 |
if(a1.hasName()){ |
|
133 |
if (a2.hasName()){ |
|
134 |
if (a1.getName().trim().equalsIgnoreCase(a2.getName().trim())){ |
|
135 |
return true; //same name and same surname in a related research result |
|
136 |
} |
|
137 |
//they could be differently written (i.e. only the initials of the name in one of the two |
|
138 |
return (a1.getName().trim().substring(0,0).equalsIgnoreCase(a2.getName().trim().substring(0,0))); |
|
139 |
} |
|
140 |
} |
|
141 |
} |
|
142 |
} |
|
143 |
// if(a1.hasFullname()){ |
|
144 |
// if (a2.hasFullname()){ |
|
145 |
// if (a1.getFullname().trim().equalsIgnoreCase(a2.getFullname().trim())){ |
|
146 |
// return true; |
|
147 |
// } |
|
148 |
// //split string containing name and surname |
|
149 |
// String[] ns_a1 = a1.getFullname().trim().split(" "); |
|
150 |
// String[] ns_a2 = a2.getFullname().trim().split(" "); |
|
151 |
// |
|
152 |
// |
|
153 |
// if (ns_a1[0].endsWith(".") || ns_a1[0].endsWith(",")){ |
|
154 |
// ns_a1[0] = ns_a1[0].substring(0,ns_a1[0].length()-1); |
|
155 |
// } |
|
156 |
// if (ns_a1[1].endsWith(".") || ns_a1[1].endsWith(",")){ |
|
157 |
// ns_a1[1] = ns_a1[1].substring(0,ns_a1[1].length()-1); |
|
158 |
// } |
|
159 |
// |
|
160 |
// if (ns_a2[0].endsWith(".") || ns_a2[0].endsWith(",")){ |
|
161 |
// ns_a2[0] = ns_a2[0].substring(0,ns_a2[0].length()-1); |
|
162 |
// } |
|
163 |
// if (ns_a2[1].endsWith(".") || ns_a2[1].endsWith(",")){ |
|
164 |
// ns_a2[1] = ns_a2[1].substring(0,ns_a2[1].length()-1); |
|
165 |
// } |
|
166 |
// |
|
167 |
// if(ns_a1[0].compareTo(ns_a1[1]) < 0){ |
|
168 |
// String tmp = ns_a1[0]; |
|
169 |
// ns_a1[0] = ns_a1[1]; |
|
170 |
// ns_a1[1] = tmp; |
|
171 |
// } |
|
172 |
// |
|
173 |
// if(ns_a2[0].compareTo(ns_a2[1]) < 0){ |
|
174 |
// String tmp = ns_a2[0]; |
|
175 |
// ns_a2[0] = ns_a2[1]; |
|
176 |
// ns_a2[1] = tmp; |
|
177 |
// |
|
178 |
// } |
|
179 |
// |
|
180 |
// if(ns_a1[0].equalsIgnoreCase(ns_a2[0])){ |
|
181 |
// if(ns_a1[1].equalsIgnoreCase(ns_a2[1])){//same name and surname |
|
182 |
// return true; |
|
183 |
// } |
|
184 |
// if(ns_a1[1].length() == 1 || ns_a2[1].length() == 1){ |
|
185 |
// return ns_a1[1].charAt(0) == ns_a2[1].charAt(0);//same surname and initial of the name |
|
186 |
// } |
|
187 |
// return false; |
|
188 |
// |
|
189 |
// }else{ |
|
190 |
// if(ns_a1[1].equalsIgnoreCase(ns_a2[1])){ |
|
191 |
// if(ns_a1[0].length() == 1 || ns_a2[0].length()==1) |
|
192 |
// return ns_a1[0].charAt(0) == ns_a2[0].charAt(0); |
|
193 |
// else |
|
194 |
// return false; |
|
195 |
// } |
|
196 |
// } |
|
197 |
// |
|
198 |
// |
|
199 |
// |
|
200 |
// } |
|
201 |
// return false; |
|
202 |
// } |
|
203 |
return false; |
|
204 |
|
|
205 |
} |
|
206 |
|
|
207 |
private FieldTypeProtos.Author.Builder searchAuthor(FieldTypeProtos.Author a, List<FieldTypeProtos.Author> author_list){ |
|
208 |
if(containsOrcid(a.getPidList())) |
|
209 |
return null; |
|
210 |
for(FieldTypeProtos.Author autoritative_author : author_list) { |
|
211 |
if (equals(autoritative_author, a)) { |
|
212 |
if(!containsOrcid(a.getPidList())) |
|
213 |
return update(a, autoritative_author); |
|
214 |
} |
|
215 |
} |
|
216 |
return null; |
|
217 |
|
|
218 |
} |
|
219 |
|
|
220 |
private void analizeValueList() throws JsonFormat.ParseException { |
|
221 |
while(it.hasNext()){ |
|
222 |
Value v = Value.fromJson(it.next().toString()); |
|
223 |
|
|
224 |
if(v.getType().equals(PropagationConstants.Type.fromresult)){ |
|
225 |
autoritative_authors.addAll(getAuthorList(Emit.fromJson(v.getValue ()))); |
|
226 |
} |
|
227 |
if(v.getType().equals(PropagationConstants.Type.fromsemrel)){ |
|
228 |
relatedResult.add(v.getValue()); |
|
229 |
} |
|
230 |
} |
|
231 |
|
|
232 |
} |
|
233 |
private FieldTypeProtos.Author.Builder update(FieldTypeProtos.Author related_author, FieldTypeProtos.Author autoritative_autor ){ |
|
234 |
|
|
235 |
FieldTypeProtos.Author.Builder res = FieldTypeProtos.Author.newBuilder(related_author); |
|
236 |
List<FieldTypeProtos.KeyValue> apid_list = autoritative_autor.getPidList(); |
|
237 |
FieldTypeProtos.KeyValue akv = apid_list.stream().filter(kv -> kv.getKey().equals(PropagationConstants.AUTHOR_PID)).collect(Collectors.toList()).get(0); |
|
238 |
FieldTypeProtos.KeyValue.Builder kvb = FieldTypeProtos.KeyValue.newBuilder(); |
|
239 |
kvb.setKey(akv.getKey()).setValue(akv.getValue()); |
|
240 |
kvb.setDataInfo(Utils.getDataInfo( |
|
241 |
PropagationConstants.ORCID_RESULT_TRUST, |
|
242 |
PropagationConstants.CLASS_ORCID_ID, |
|
243 |
PropagationConstants.SCHEMA_ID, |
|
244 |
PropagationConstants.SCHEMA_NAME, |
|
245 |
PropagationConstants.DATA_INFO_TYPE, |
|
246 |
PropagationConstants.CLASS_ORCID_NAME) |
|
247 |
); |
|
248 |
return res.addPid(kvb); |
|
249 |
|
|
250 |
|
|
251 |
} |
|
252 |
|
|
253 |
private List<FieldTypeProtos.Author> getAuthorList(Emit e) throws JsonFormat.ParseException { |
|
254 |
|
|
255 |
List<FieldTypeProtos.Author> authors = new ArrayList<>(); |
|
256 |
for (String author : e.getAuthor_list()) { |
|
257 |
FieldTypeProtos.Author.Builder author_builder = FieldTypeProtos.Author.newBuilder(); |
|
258 |
JsonFormat.merge(author, author_builder); |
|
259 |
authors.add(author_builder.build()); |
|
260 |
} |
|
261 |
|
|
262 |
return authors; |
|
263 |
|
|
264 |
} |
|
265 |
|
|
266 |
|
|
267 |
public static OafProtos.Oaf getUpdate(ResultProtos.Result.Metadata.Builder metadata, String resultId) { |
|
268 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
269 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
270 |
.setType(TypeProtos.Type.result) |
|
271 |
.setId(resultId) |
|
272 |
.setResult(result); |
|
273 |
|
|
274 |
return OafProtos.Oaf.newBuilder() |
|
275 |
.setKind(KindProtos.Kind.entity) |
|
276 |
.setEntity(entity) |
|
277 |
.build(); |
|
278 |
} |
|
279 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/orcidthroughproducts/PropagationOrcidToResultMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.orcidthroughproducts; |
|
2 |
|
|
3 |
import com.google.gson.Gson; |
|
4 |
import com.googlecode.protobuf.format.JsonFormat; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation.Key; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization.DedupedList; |
|
8 |
import eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization.OrganizationMap; |
|
9 |
import eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult.PropagationProjectToResultReducer; |
|
10 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
11 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
|
12 |
import eu.dnetlib.data.proto.OafProtos; |
|
13 |
import eu.dnetlib.data.proto.TypeProtos; |
|
14 |
import org.apache.avro.generic.GenericData; |
|
15 |
import org.apache.commons.lang3.StringUtils; |
|
16 |
import org.apache.commons.logging.Log; |
|
17 |
import org.apache.commons.logging.LogFactory; |
|
18 |
import org.apache.hadoop.hbase.client.Result; |
|
19 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
20 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
21 |
import org.apache.hadoop.hbase.util.Bytes; |
|
22 |
import org.apache.hadoop.io.Text; |
|
23 |
|
|
24 |
import java.io.IOException; |
|
25 |
import java.util.ArrayList; |
|
26 |
import java.util.HashSet; |
|
27 |
import java.util.List; |
|
28 |
import java.util.Set; |
|
29 |
import java.util.stream.Collectors; |
|
30 |
|
|
31 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
32 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; |
|
33 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity; |
|
34 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget; |
|
35 |
|
|
36 |
public class PropagationOrcidToResultMapper extends TableMapper<ImmutableBytesWritable, Text> { |
|
37 |
private static final Log log = LogFactory.getLog(PropagationOrcidToResultMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
38 |
private Text valueOut; |
|
39 |
private ImmutableBytesWritable keyOut; |
|
40 |
private String[] sem_rels; |
|
41 |
private String trust; |
|
42 |
|
|
43 |
@Override |
|
44 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
45 |
super.setup(context); |
|
46 |
valueOut = new Text(); |
|
47 |
keyOut = new ImmutableBytesWritable(); |
|
48 |
|
|
49 |
sem_rels = context.getConfiguration().getStrings("propagatetoorcid.semanticrelations", DEFAULT_RESULT_RELATION_SET); |
|
50 |
trust = context.getConfiguration().get("propagatetoorcid.trust","0.85"); |
|
51 |
|
|
52 |
} |
|
53 |
|
|
54 |
@Override |
|
55 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
56 |
final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType(); |
|
57 |
final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference |
|
58 |
|
|
59 |
|
|
60 |
if (entity != null) { |
|
61 |
|
|
62 |
if (type == TypeProtos.Type.result){ |
|
63 |
Set<String> result_result = new HashSet<>(); |
|
64 |
//verifico se il risultato ha una relazione semantica verso uno o piu' risultati. |
|
65 |
//per ogni risultato linkato con issupplementto o issupplementedby emetto: |
|
66 |
// id risultato linkato come chiave, |
|
67 |
// id risultato oggetto del mapping e lista degli autori del risultato oggetto del mapper come value |
|
68 |
for(String sem : sem_rels){ |
|
69 |
result_result.addAll(getRelationTarget(value, sem, context, COUNTER_PROPAGATION)); |
|
70 |
} |
|
71 |
if(!result_result.isEmpty()){ |
|
72 |
List<String> authorlist = getAuthorList(entity.getResult().getMetadata().getAuthorList()); |
|
73 |
Emit e = new Emit(); |
|
74 |
e.setId(Bytes.toString(keyIn.get())); |
|
75 |
e.setAuthor_list(authorlist); |
|
76 |
valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class), |
|
77 |
trust, |
|
78 |
Type.fromsemrel).toJson()); |
|
79 |
for (String result: result_result){ |
|
80 |
keyOut.set(Bytes.toBytes(result)); |
|
81 |
context.write(keyOut,valueOut); |
|
82 |
context.getCounter(COUNTER_PROPAGATION,"emit for sem_rel").increment(1); |
|
83 |
} |
|
84 |
|
|
85 |
//emetto anche id dell'oggetto del mapper come chiave e lista degli autori come valore |
|
86 |
e.setId(keyIn.toString()); |
|
87 |
e.setAuthor_list(authorlist); |
|
88 |
valueOut.set(Value.newInstance(new Gson().toJson(e, Emit.class), trust, Type.fromresult).toJson()); |
|
89 |
context.write(keyIn, valueOut); |
|
90 |
context.getCounter(COUNTER_PROPAGATION,"emit for result with orcid").increment(1); |
|
91 |
|
|
92 |
} |
|
93 |
} |
|
94 |
|
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
private List<String> getAuthorList(List<FieldTypeProtos.Author> author_list){ |
|
99 |
|
|
100 |
return author_list.stream().map(a -> new JsonFormat().printToString(a)).collect(Collectors.toList()); |
|
101 |
|
|
102 |
} |
|
103 |
|
|
104 |
|
|
105 |
|
|
106 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/orcidthroughproducts/Emit.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.orcidthroughproducts; |
|
2 |
|
|
3 |
import com.google.gson.Gson; |
|
4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization.DedupedList; |
|
5 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
|
6 |
|
|
7 |
import java.io.Serializable; |
|
8 |
import java.util.List; |
|
9 |
|
|
10 |
public class Emit implements Serializable { |
|
11 |
|
|
12 |
private String id; |
|
13 |
private List<String> author_list; |
|
14 |
|
|
15 |
public String getId() { |
|
16 |
return id; |
|
17 |
} |
|
18 |
|
|
19 |
public void setId(String id) { |
|
20 |
this.id = id; |
|
21 |
} |
|
22 |
|
|
23 |
public List<String> getAuthor_list() { |
|
24 |
return author_list; |
|
25 |
} |
|
26 |
|
|
27 |
public void setAuthor_list(List<String> author_list) { |
|
28 |
this.author_list = author_list; |
|
29 |
} |
|
30 |
|
|
31 |
public static Emit fromJson(String value) { |
|
32 |
return new Gson().fromJson(value, Emit.class); |
|
33 |
} |
|
34 |
} |
Also available in: Unified diff
mapreduce job for the propagation of ORCID through result. Follows only isSupplementedBy, isSupplementTo semantic relations