Revision 48027
Added by Alessia Bardi almost 7 years ago
modules/dnet-openaireplus-workflows/trunk/src/test/java/leu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNodeTest.java | ||
---|---|---|
1 |
package leu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
|
2 |
|
|
3 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
4 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
5 |
import eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimRelsJobNode; |
|
6 |
import eu.dnetlib.msro.rmi.MSROException; |
|
7 |
import org.junit.Test; |
|
8 |
import org.postgresql.util.Base64; |
|
9 |
|
|
10 |
/** |
|
11 |
* Created by Alessia Bardi on 26/06/2017. |
|
12 |
* |
|
13 |
* @author Alessia Bardi |
|
14 |
*/ |
|
15 |
public class ApplyClaimRelsJobNodeTest { |
|
16 |
|
|
17 |
private ApplyClaimRelsJobNode applyClaims = new ApplyClaimRelsJobNode(); |
|
18 |
|
|
19 |
|
|
20 |
@Test |
|
21 |
public void testGetValue() throws MSROException, InvalidProtocolBufferException { |
|
22 |
|
|
23 |
String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e"; |
|
24 |
String semantics = "resultProject_outcome_produces"; |
|
25 |
String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859"; |
|
26 |
long time = System.currentTimeMillis(); |
|
27 |
//final String sourceId, final String semantics, final String targetId, final long timestamp |
|
28 |
String res = applyClaims.getValue(sourceId, semantics, targetId, time); |
|
29 |
Oaf.Builder builder = Oaf.newBuilder().mergeFrom(Base64.decode(res)); |
|
30 |
System.out.println(builder.build().toString()); |
|
31 |
} |
|
32 |
|
|
33 |
@Test(expected = MSROException.class) |
|
34 |
public void testGetValueErrSem() throws MSROException { |
|
35 |
|
|
36 |
String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e"; |
|
37 |
String semantics = "produces"; |
|
38 |
String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859"; |
|
39 |
long time = System.currentTimeMillis(); |
|
40 |
//final String sourceId, final String semantics, final String targetId, final long timestamp |
|
41 |
String res = applyClaims.getValue(sourceId, semantics, targetId, time); |
|
42 |
System.out.println(res); |
|
43 |
} |
|
44 |
} |
modules/dnet-openaireplus-workflows/trunk/src/test/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNodeTest.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
|
2 |
|
|
3 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
4 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
5 |
import eu.dnetlib.msro.rmi.MSROException; |
|
6 |
import org.junit.Test; |
|
7 |
import org.postgresql.util.Base64; |
|
8 |
|
|
9 |
/** |
|
10 |
* Created by Alessia Bardi on 26/06/2017. |
|
11 |
* |
|
12 |
* @author Alessia Bardi |
|
13 |
*/ |
|
14 |
public class ApplyClaimRelsJobNodeTest { |
|
15 |
|
|
16 |
private ApplyClaimRelsJobNode applyClaims = new ApplyClaimRelsJobNode(); |
|
17 |
|
|
18 |
@Test |
|
19 |
public void testGetValue() throws MSROException, InvalidProtocolBufferException { |
|
20 |
|
|
21 |
String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e"; |
|
22 |
String semantics = "resultProject_outcome_produces"; |
|
23 |
String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859"; |
|
24 |
System.out.println(getValue(sourceId, semantics, targetId)); |
|
25 |
} |
|
26 |
|
|
27 |
@Test(expected = MSROException.class) |
|
28 |
public void testGetValueErrSem() throws MSROException, InvalidProtocolBufferException { |
|
29 |
|
|
30 |
String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e"; |
|
31 |
String semantics = "produces"; |
|
32 |
String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859"; |
|
33 |
getValue(sourceId, semantics, targetId); |
|
34 |
} |
|
35 |
|
|
36 |
private String getValue(final String rowKey, final String semantics, final String targetId) throws MSROException, InvalidProtocolBufferException { |
|
37 |
long time = System.currentTimeMillis(); |
|
38 |
//final String sourceId, final String semantics, final String targetId, final long timestamp |
|
39 |
String res = applyClaims.getValue(rowKey, semantics, targetId, time); |
|
40 |
Oaf.Builder builder = Oaf.newBuilder().mergeFrom(Base64.decode(res)); |
|
41 |
return builder.build().toString(); |
|
42 |
} |
|
43 |
} |
modules/dnet-openaireplus-workflows/trunk/src/test/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimUpdatesJobNodeTest.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
|
2 |
|
|
3 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
4 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
5 |
import eu.dnetlib.msro.rmi.MSROException; |
|
6 |
import org.junit.Test; |
|
7 |
import org.postgresql.util.Base64; |
|
8 |
|
|
9 |
/** |
|
10 |
* Created by Alessia Bardi on 26/06/2017. |
|
11 |
* |
|
12 |
* @author Alessia Bardi |
|
13 |
*/ |
|
14 |
public class ApplyClaimUpdatesJobNodeTest { |
|
15 |
|
|
16 |
private ApplyClaimUpdatesJobNode applyClaims = new ApplyClaimUpdatesJobNode(); |
|
17 |
|
|
18 |
|
|
19 |
@Test |
|
20 |
public void testGetValueEGI() throws MSROException, InvalidProtocolBufferException { |
|
21 |
String context = "egi"; |
|
22 |
String rowkey = "50|userclaim___::6a8f649d968e734a6733c23a351c1859"; |
|
23 |
System.out.println(getValue(context, rowkey)); |
|
24 |
} |
|
25 |
|
|
26 |
@Test |
|
27 |
public void testGetValueEGILongId() throws MSROException, InvalidProtocolBufferException { |
|
28 |
String context = "egi::classification::natsc::earths::other"; |
|
29 |
String rowkey = "50|userclaim___::6a8f649d968e734a6733c23a351c1859"; |
|
30 |
System.out.println(getValue(context, rowkey)); |
|
31 |
} |
|
32 |
|
|
33 |
private String getValue(final String context, final String rowkey) throws MSROException, InvalidProtocolBufferException { |
|
34 |
String semantics = "relatedTo"; |
|
35 |
long time = System.currentTimeMillis(); |
|
36 |
//final String sourceId, final String semantics, final String targetId, final long timestamp |
|
37 |
String res = applyClaims.getValue(context, semantics, rowkey, time); |
|
38 |
Oaf.Builder builder = Oaf.newBuilder().mergeFrom(Base64.decode(res)); |
|
39 |
return builder.build().toString(); |
|
40 |
} |
|
41 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/AbstractClaimsToHBASE.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
4 |
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo; |
|
5 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
|
6 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
7 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
8 |
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode; |
|
9 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
10 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
|
11 |
import org.apache.commons.logging.Log; |
|
12 |
import org.apache.commons.logging.LogFactory; |
|
13 |
import org.springframework.beans.factory.annotation.Autowired; |
|
14 |
|
|
15 |
/** |
|
16 |
* Created by Alessia Bardi on 28/06/2017. |
|
17 |
* |
|
18 |
* @author Alessia Bardi |
|
19 |
*/ |
|
20 |
public abstract class AbstractClaimsToHBASE extends SimpleJobNode implements ProgressJobNode { |
|
21 |
|
|
22 |
private static final Log log = LogFactory.getLog(AbstractClaimsToHBASE.class); |
|
23 |
|
|
24 |
private String sql; |
|
25 |
private String countQuery; |
|
26 |
|
|
27 |
private int total = 0; |
|
28 |
private int processed = 0; |
|
29 |
|
|
30 |
private String clusterName; |
|
31 |
private String tableName; |
|
32 |
|
|
33 |
@Autowired |
|
34 |
private UniqueServiceLocator serviceLocator; |
|
35 |
|
|
36 |
@Autowired |
|
37 |
private ClaimDatabaseUtils claimDatabaseUtils; |
|
38 |
|
|
39 |
@Override |
|
40 |
public ProgressProvider getProgressProvider() { |
|
41 |
return new ProgressProvider() { |
|
42 |
|
|
43 |
@Override |
|
44 |
public boolean isInaccurate() { |
|
45 |
return false; |
|
46 |
} |
|
47 |
|
|
48 |
@Override |
|
49 |
public int getTotalValue() { |
|
50 |
return total; |
|
51 |
} |
|
52 |
|
|
53 |
@Override |
|
54 |
public int getCurrentValue() { |
|
55 |
return processed; |
|
56 |
} |
|
57 |
}; |
|
58 |
} |
|
59 |
|
|
60 |
protected String getOpenAIREType(final String type){ |
|
61 |
switch(type){ |
|
62 |
case "publication": |
|
63 |
case "dataset": |
|
64 |
return "result"; |
|
65 |
default: |
|
66 |
return type; |
|
67 |
} |
|
68 |
} |
|
69 |
|
|
70 |
protected String getFullId(final String type, final String id) { |
|
71 |
final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id); |
|
72 |
return OafRowKeyDecoder.decode(fullId).getKey(); |
|
73 |
} |
|
74 |
|
|
75 |
protected static DataInfo getDataInfo(){ |
|
76 |
return DataInfo.newBuilder().setTrust("0.91").setInferred(false) |
|
77 |
.setProvenanceaction( |
|
78 |
Qualifier.newBuilder() |
|
79 |
.setClassid("user:claim") |
|
80 |
.setClassname("user:claim") |
|
81 |
.setSchemeid("dnet:provenanceActions") |
|
82 |
.setSchemename("dnet:provenanceActions")).build(); |
|
83 |
|
|
84 |
} |
|
85 |
public String getSql() { |
|
86 |
return sql; |
|
87 |
} |
|
88 |
|
|
89 |
public void setSql(final String sql) { |
|
90 |
this.sql = sql; |
|
91 |
} |
|
92 |
|
|
93 |
public String getCountQuery() { |
|
94 |
return countQuery; |
|
95 |
} |
|
96 |
|
|
97 |
public void setCountQuery(final String countQuery) { |
|
98 |
this.countQuery = countQuery; |
|
99 |
} |
|
100 |
|
|
101 |
public int getTotal() { |
|
102 |
return total; |
|
103 |
} |
|
104 |
|
|
105 |
public void setTotal(final int total) { |
|
106 |
this.total = total; |
|
107 |
} |
|
108 |
|
|
109 |
public int getProcessed() { |
|
110 |
return processed; |
|
111 |
} |
|
112 |
|
|
113 |
public void setProcessed(final int processed) { |
|
114 |
this.processed = processed; |
|
115 |
} |
|
116 |
|
|
117 |
public void incrementProcessed(){ |
|
118 |
processed++; |
|
119 |
} |
|
120 |
|
|
121 |
public String getClusterName() { |
|
122 |
return clusterName; |
|
123 |
} |
|
124 |
|
|
125 |
public void setClusterName(final String clusterName) { |
|
126 |
this.clusterName = clusterName; |
|
127 |
} |
|
128 |
|
|
129 |
public String getTableName() { |
|
130 |
return tableName; |
|
131 |
} |
|
132 |
|
|
133 |
public void setTableName(final String tableName) { |
|
134 |
this.tableName = tableName; |
|
135 |
} |
|
136 |
|
|
137 |
public UniqueServiceLocator getServiceLocator() { |
|
138 |
return serviceLocator; |
|
139 |
} |
|
140 |
|
|
141 |
public void setServiceLocator(final UniqueServiceLocator serviceLocator) { |
|
142 |
this.serviceLocator = serviceLocator; |
|
143 |
} |
|
144 |
|
|
145 |
public ClaimDatabaseUtils getClaimDatabaseUtils() { |
|
146 |
return claimDatabaseUtils; |
|
147 |
} |
|
148 |
|
|
149 |
public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) { |
|
150 |
this.claimDatabaseUtils = claimDatabaseUtils; |
|
151 |
} |
|
152 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNode.java | ||
---|---|---|
5 | 5 |
import com.googlecode.sarasvati.Arc; |
6 | 6 |
import com.googlecode.sarasvati.NodeToken; |
7 | 7 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
8 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
9 | 8 |
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo; |
10 | 9 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
11 | 10 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
... | ... | |
18 | 17 |
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome; |
19 | 18 |
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult; |
20 | 19 |
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset; |
21 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
22 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
23 | 20 |
import eu.dnetlib.msro.rmi.MSROException; |
24 |
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode; |
|
25 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
26 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
|
27 | 21 |
import eu.dnetlib.utils.ontologies.OntologyLoader; |
28 | 22 |
import org.apache.commons.codec.binary.Base64; |
29 | 23 |
import org.apache.commons.logging.Log; |
30 | 24 |
import org.apache.commons.logging.LogFactory; |
31 | 25 |
import org.apache.hadoop.util.StringUtils; |
32 |
import org.springframework.beans.factory.annotation.Autowired; |
|
33 | 26 |
|
34 | 27 |
/** |
35 | 28 |
* Created by alessia on 23/10/15. |
36 | 29 |
*/ |
37 |
public class ApplyClaimRelsJobNode extends SimpleJobNode implements ProgressJobNode {
|
|
30 |
public class ApplyClaimRelsJobNode extends AbstractClaimsToHBASE {
|
|
38 | 31 |
|
39 | 32 |
private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class); |
40 | 33 |
|
41 | 34 |
private final String SEPARATOR = "_"; |
42 | 35 |
|
43 |
@Autowired |
|
44 |
private UniqueServiceLocator serviceLocator; |
|
45 | 36 |
|
46 |
@Autowired |
|
47 |
private ClaimDatabaseUtils claimDatabaseUtils; |
|
48 |
|
|
49 |
private String sql; |
|
50 |
private String countQuery; |
|
51 |
private int total = 0; |
|
52 |
private int processed = 0; |
|
53 |
|
|
54 |
private String clusterName; |
|
55 |
|
|
56 |
private String tableName; |
|
57 |
|
|
58 |
// private String fetchSqlAsText(final String path) throws IOException { |
|
59 |
// return IOUtils.toString(getClass().getResourceAsStream(path)); |
|
60 |
// } |
|
61 |
|
|
62 | 37 |
@Override |
63 | 38 |
protected String execute(NodeToken token) throws Exception { |
64 | 39 |
//TODO: use claim.claim_date from the claim db |
65 | 40 |
long timestamp = System.currentTimeMillis(); |
66 |
total = this.claimDatabaseUtils.count(countQuery);
|
|
67 |
List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
|
|
41 |
setTotal(getClaimDatabaseUtils().count(getCountQuery()));
|
|
42 |
List<Claim> claimRels = getClaimDatabaseUtils().query(getSql());
|
|
68 | 43 |
int totalWrites = 0; |
69 | 44 |
int valid = 0; |
70 | 45 |
int discardedClaims = 0; |
71 | 46 |
|
72 |
HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
|
|
47 |
HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
|
|
73 | 48 |
|
74 | 49 |
for (Claim claim : claimRels) { |
75 | 50 |
log.debug(claim); |
76 | 51 |
try { |
77 |
String sourceId = fullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
|
|
78 |
String targetId = fullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
|
|
52 |
String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
|
|
53 |
String targetId = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
|
|
79 | 54 |
String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp); |
80 |
|
|
81 |
/* |
|
82 |
public void addHBaseColumn(final String clusterName, |
|
83 |
final String tableName, |
|
84 |
final String rowKey, |
|
85 |
final String columnFamily, |
|
86 |
final String qualifier, |
|
87 |
final String value) |
|
88 |
*/ |
|
89 |
hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value); |
|
90 |
processed++; |
|
55 |
// adding relationship |
|
56 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value); |
|
91 | 57 |
totalWrites++; |
92 | 58 |
|
93 | 59 |
String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics()); |
94 | 60 |
String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp); |
95 |
hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue); |
|
61 |
//adding inverse relationship |
|
62 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue); |
|
96 | 63 |
totalWrites++; |
97 |
}catch(IllegalArgumentException e){ |
|
98 |
log.error("Discarding claim "+claim+". Cause: "+e.getMessage()); |
|
64 |
incrementProcessed(); |
|
65 |
} catch (IllegalArgumentException e) { |
|
66 |
log.error("Discarding claim " + claim + ". Cause: " + e.getMessage()); |
|
99 | 67 |
discardedClaims++; |
100 | 68 |
} |
101 | 69 |
} |
102 | 70 |
|
103 |
log.info("totalClaims: " + total);
|
|
104 |
token.getEnv().setAttribute("claimSize", total);
|
|
105 |
log.info("writeOps: " + totalWrites); |
|
106 |
token.getEnv().setAttribute("writeOps", totalWrites);
|
|
107 |
log.info("validClaims: " + valid); |
|
108 |
token.getEnv().setAttribute("validClaims", valid); |
|
109 |
log.info("discardedClaims: " + discardedClaims); |
|
110 |
token.getEnv().setAttribute("discardedClaims", discardedClaims); |
|
71 |
log.info("totalClaimRels: " + getTotal());
|
|
72 |
token.getEnv().setAttribute("claimRelsSize", getTotal());
|
|
73 |
log.info("claim rels writeOps: " + totalWrites);
|
|
74 |
token.getEnv().setAttribute("claimRelsWriteOps", totalWrites);
|
|
75 |
log.info("validClaimRels: " + valid);
|
|
76 |
token.getEnv().setAttribute("validClaimRels", valid);
|
|
77 |
log.info("discardedClaimRels: " + discardedClaims);
|
|
78 |
token.getEnv().setAttribute("discardedClaimRels", discardedClaims);
|
|
111 | 79 |
|
112 | 80 |
return Arc.DEFAULT_ARC; |
113 | 81 |
} |
114 | 82 |
|
115 |
protected String getOpenAIREType(final String type){ |
|
116 |
switch(type){ |
|
117 |
case "publication": |
|
118 |
case "dataset": |
|
119 |
return "result"; |
|
120 |
default: |
|
121 |
return type; |
|
122 |
} |
|
123 |
} |
|
124 |
|
|
125 |
|
|
126 |
public String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{ |
|
83 |
protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException { |
|
127 | 84 |
log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId)); |
128 | 85 |
String[] relInfo = semantics.split(SEPARATOR); |
129 |
if(relInfo.length < 3){
|
|
130 |
throw new MSROException("Semantics "+semantics+" not supported: must be splittable in 3 by '_'");
|
|
86 |
if (relInfo.length < 3) {
|
|
87 |
throw new MSROException("Semantics " + semantics + " not supported: must be splittable in 3 by '_'");
|
|
131 | 88 |
} |
132 | 89 |
Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]); |
133 | 90 |
|
... | ... | |
171 | 128 |
)))); |
172 | 129 |
break; |
173 | 130 |
default: |
174 |
throw new MSROException("Semantics "+relInfo[0]+" not supported");
|
|
131 |
throw new MSROException("Semantics " + relInfo[0] + " not supported");
|
|
175 | 132 |
} |
176 | 133 |
|
177 | 134 |
builder.setRel(relBuilder); |
178 | 135 |
return Base64.encodeBase64String(builder.build().toByteArray()); |
179 | 136 |
} |
180 |
|
|
181 |
private String fullId(final String type, final String id) { |
|
182 |
final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id); |
|
183 |
return OafRowKeyDecoder.decode(fullId).getKey(); |
|
184 |
} |
|
185 |
|
|
186 |
public String getClusterName() { |
|
187 |
return clusterName; |
|
188 |
} |
|
189 |
|
|
190 |
public void setClusterName(final String clusterName) { |
|
191 |
this.clusterName = clusterName; |
|
192 |
} |
|
193 |
|
|
194 |
public String getTableName() { |
|
195 |
return tableName; |
|
196 |
} |
|
197 |
|
|
198 |
public void setTableName(final String tableName) { |
|
199 |
this.tableName = tableName; |
|
200 |
} |
|
201 |
|
|
202 |
public UniqueServiceLocator getServiceLocator() { |
|
203 |
return serviceLocator; |
|
204 |
} |
|
205 |
|
|
206 |
public void setServiceLocator(final UniqueServiceLocator serviceLocator) { |
|
207 |
this.serviceLocator = serviceLocator; |
|
208 |
} |
|
209 |
|
|
210 |
public ClaimDatabaseUtils getClaimDatabaseUtils() { |
|
211 |
return claimDatabaseUtils; |
|
212 |
} |
|
213 |
|
|
214 |
public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) { |
|
215 |
this.claimDatabaseUtils = claimDatabaseUtils; |
|
216 |
} |
|
217 |
|
|
218 |
public String getSql() { |
|
219 |
return sql; |
|
220 |
} |
|
221 |
|
|
222 |
public void setSql(final String sql) { |
|
223 |
this.sql = sql; |
|
224 |
} |
|
225 |
|
|
226 |
public String getCountQuery() { |
|
227 |
return countQuery; |
|
228 |
} |
|
229 |
|
|
230 |
public void setCountQuery(final String countQuery) { |
|
231 |
this.countQuery = countQuery; |
|
232 |
} |
|
233 |
|
|
234 |
@Override |
|
235 |
public ProgressProvider getProgressProvider() { |
|
236 |
return new ProgressProvider() { |
|
237 |
|
|
238 |
@Override |
|
239 |
public boolean isInaccurate() { |
|
240 |
return false; |
|
241 |
} |
|
242 |
|
|
243 |
@Override |
|
244 |
public int getTotalValue() { |
|
245 |
return total; |
|
246 |
} |
|
247 |
|
|
248 |
@Override |
|
249 |
public int getCurrentValue() { |
|
250 |
return processed; |
|
251 |
} |
|
252 |
}; |
|
253 |
} |
|
254 | 137 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimUpdatesJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 | 3 |
import java.util.List; |
5 | 4 |
|
6 | 5 |
import com.googlecode.sarasvati.Arc; |
7 | 6 |
import com.googlecode.sarasvati.NodeToken; |
8 | 7 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
9 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
10 |
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo; |
|
11 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
|
12 | 8 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
13 | 9 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
14 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
15 |
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata; |
|
16 |
import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
|
17 |
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType; |
|
18 |
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject; |
|
19 |
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome; |
|
20 |
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult; |
|
21 |
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset; |
|
22 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
23 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
10 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
11 |
import eu.dnetlib.data.proto.ResultProtos.Result; |
|
12 |
import eu.dnetlib.data.proto.ResultProtos.Result.Context; |
|
13 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
24 | 14 |
import eu.dnetlib.msro.rmi.MSROException; |
25 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
26 |
import eu.dnetlib.utils.ontologies.OntologyLoader; |
|
27 | 15 |
import org.apache.commons.codec.binary.Base64; |
28 |
import org.apache.commons.io.IOUtils; |
|
29 | 16 |
import org.apache.commons.logging.Log; |
30 | 17 |
import org.apache.commons.logging.LogFactory; |
31 | 18 |
import org.apache.hadoop.util.StringUtils; |
32 |
import org.springframework.beans.factory.annotation.Autowired; |
|
33 | 19 |
|
34 | 20 |
/** |
35 | 21 |
* Created by alessia on 23/10/15. |
36 | 22 |
*/ |
37 |
public class ApplyClaimUpdatesJobNode extends SimpleJobNode {
|
|
23 |
public class ApplyClaimUpdatesJobNode extends AbstractClaimsToHBASE {
|
|
38 | 24 |
|
39 | 25 |
private static final Log log = LogFactory.getLog(ApplyClaimUpdatesJobNode.class); |
40 | 26 |
|
41 |
private final String SEPARATOR = "_"; |
|
42 |
|
|
43 |
@Autowired |
|
44 |
private UniqueServiceLocator serviceLocator; |
|
45 |
|
|
46 |
@Autowired |
|
47 |
private ClaimDatabaseUtils claimDatabaseUtils; |
|
48 |
|
|
49 |
private String sql; |
|
50 |
|
|
51 |
private String clusterName; |
|
52 |
|
|
53 |
private String tableName; |
|
54 |
|
|
55 |
private String fetchSqlAsText(final String path) throws IOException { |
|
56 |
return IOUtils.toString(getClass().getResourceAsStream(path)); |
|
57 |
} |
|
58 |
|
|
59 | 27 |
@Override |
60 | 28 |
protected String execute(NodeToken token) throws Exception { |
61 | 29 |
//TODO: use claim.claim_date from the claim db |
62 | 30 |
long timestamp = System.currentTimeMillis(); |
63 | 31 |
|
64 |
List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
|
|
32 |
List<Claim> claimUpdates = this.getClaimDatabaseUtils().query(getSql());
|
|
65 | 33 |
int totalClaims = 0; |
66 | 34 |
int totalWrites = 0; |
35 |
int discardedClaims = 0; |
|
67 | 36 |
|
68 |
HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
|
|
37 |
HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
|
|
69 | 38 |
|
70 |
for (Claim claim : claimRels) { |
|
39 |
for (Claim claim : claimUpdates) { |
|
40 |
try{ |
|
71 | 41 |
log.debug(claim); |
72 | 42 |
totalClaims++; |
73 | 43 |
String contextId = claim.getSource(); |
74 |
String targetId = fullId(claim.getTargetType(), claim.getTarget());
|
|
44 |
String rowKey = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
|
|
75 | 45 |
|
76 |
String value = getValue(contextId, targetId, claim.getSemantics(), timestamp); |
|
77 |
/* |
|
78 |
public void addHBaseColumn(final String clusterName, |
|
79 |
final String tableName, |
|
80 |
final String rowKey, |
|
81 |
final String columnFamily, |
|
82 |
final String qualifier, |
|
83 |
final String value) |
|
84 |
*/ |
|
85 |
hadoopService.addHBaseColumn(clusterName, tableName, contextId, claim.getSemantics(), targetId, value); |
|
46 |
String value = getValue(contextId, rowKey, claim.getSemantics(), timestamp); |
|
47 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), rowKey, "result", "update_" + System.nanoTime(), value); |
|
86 | 48 |
totalWrites++; |
87 |
|
|
88 |
String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics()); |
|
89 |
String inverseValue = getValue(targetId, contextId, inverseSemantics, timestamp); |
|
90 |
hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, contextId, inverseValue); |
|
91 |
totalWrites++; |
|
92 |
|
|
49 |
incrementProcessed(); |
|
50 |
} catch (IllegalArgumentException e) { |
|
51 |
log.error("Discarding claim " + claim + ". Cause: " + e.getMessage()); |
|
52 |
discardedClaims++; |
|
53 |
} |
|
93 | 54 |
} |
94 | 55 |
|
95 |
log.info("totalClaims: " + totalClaims); |
|
96 |
token.getEnv().setAttribute("claimSize", totalClaims); |
|
97 |
log.info("writeOps: " + totalWrites); |
|
98 |
token.getEnv().setAttribute("writeOps", totalWrites); |
|
56 |
log.info("Total Claim Updates: " + totalClaims); |
|
57 |
token.getEnv().setAttribute("claimUpdatesSize", totalClaims); |
|
58 |
log.info("Claim updates writeOps: " + totalWrites); |
|
59 |
token.getEnv().setAttribute("claimUpdatesWriteOps", totalWrites); |
|
60 |
log.info("Discarded Claim Updates: " + discardedClaims); |
|
61 |
token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims); |
|
99 | 62 |
|
100 | 63 |
return Arc.DEFAULT_ARC; |
101 | 64 |
} |
102 | 65 |
|
103 |
|
|
104 |
private String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{ |
|
66 |
protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException { |
|
105 | 67 |
log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId)); |
106 |
String[] relInfo = semantics.split(SEPARATOR); |
|
107 |
Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]); |
|
108 | 68 |
|
109 |
Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp); |
|
110 |
builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false) |
|
111 |
.setProvenanceaction( |
|
112 |
Qualifier.newBuilder() |
|
113 |
.setClassid("user:claim") |
|
114 |
.setClassname("user:claim") |
|
115 |
.setSchemeid("dnet:provenanceActions") |
|
116 |
.setSchemename("dnet:provenanceActions") |
|
117 |
)); |
|
69 |
Result.Builder resultBuilder = Result.newBuilder().setMetadata(Result.Metadata.newBuilder().addContext(getContext(sourceId))); |
|
70 |
OafEntity.Builder entityBuilder = OafEntity.newBuilder().setId(targetId).setType(Type.result).setResult(resultBuilder); |
|
71 |
Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.entity).setLastupdatetimestamp(timestamp).setEntity(entityBuilder); |
|
118 | 72 |
|
119 |
final SubRelType subRelType = SubRelType.valueOf(relInfo[1]); |
|
120 |
final OafRel.Builder relBuilder = OafRel.newBuilder() |
|
121 |
.setSubRelType(subRelType) |
|
122 |
.setRelClass(relInfo[2]) |
|
123 |
.setRelType(RelType.valueOf(relInfo[0])) |
|
124 |
.setSource(sourceId).setTarget(targetId).setChild(false); |
|
125 |
|
|
126 |
switch (relInfo[0]) { |
|
127 |
case "resultProject": |
|
128 |
|
|
129 |
relBuilder.setResultProject(ResultProject.newBuilder() |
|
130 |
.setOutcome(Outcome.newBuilder().setRelMetadata( |
|
131 |
RelMetadata.newBuilder().setSemantics( |
|
132 |
semanticsBuilder |
|
133 |
.setSchemeid("dnet:result_project_relations") |
|
134 |
.setSchemename("dnet:result_project_relations") |
|
135 |
.build() |
|
136 |
)))); |
|
137 |
break; |
|
138 |
case "resultResult_publicationDataset_isRelatedTo": |
|
139 |
relBuilder.setResultResult(ResultResult.newBuilder() |
|
140 |
.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata( |
|
141 |
RelMetadata.newBuilder().setSemantics( |
|
142 |
semanticsBuilder |
|
143 |
.setSchemeid("dnet:result_result_relations") |
|
144 |
.setSchemename("dnet:result_result_relations") |
|
145 |
.build() |
|
146 |
)))); |
|
147 |
break; |
|
148 |
default: |
|
149 |
throw new MSROException("Semantics "+relInfo[0]+" not supported"); |
|
150 |
} |
|
151 |
|
|
152 |
builder.setRel(relBuilder); |
|
153 | 73 |
return Base64.encodeBase64String(builder.build().toByteArray()); |
154 | 74 |
} |
155 | 75 |
|
156 |
private String fullId(final String type, final String id) { |
|
157 |
final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id); |
|
158 |
return OafRowKeyDecoder.decode(fullId).getKey(); |
|
76 |
private Context getContext(final String sourceId) { |
|
77 |
return Context.newBuilder().setDataInfo(getDataInfo()).setId(sourceId).build(); |
|
159 | 78 |
} |
160 |
|
|
161 |
public String getClusterName() { |
|
162 |
return clusterName; |
|
163 |
} |
|
164 |
|
|
165 |
public void setClusterName(final String clusterName) { |
|
166 |
this.clusterName = clusterName; |
|
167 |
} |
|
168 |
|
|
169 |
public String getTableName() { |
|
170 |
return tableName; |
|
171 |
} |
|
172 |
|
|
173 |
public void setTableName(final String tableName) { |
|
174 |
this.tableName = tableName; |
|
175 |
} |
|
176 |
|
|
177 |
public UniqueServiceLocator getServiceLocator() { |
|
178 |
return serviceLocator; |
|
179 |
} |
|
180 |
|
|
181 |
public void setServiceLocator(final UniqueServiceLocator serviceLocator) { |
|
182 |
this.serviceLocator = serviceLocator; |
|
183 |
} |
|
184 |
|
|
185 |
public ClaimDatabaseUtils getClaimDatabaseUtils() { |
|
186 |
return claimDatabaseUtils; |
|
187 |
} |
|
188 |
|
|
189 |
public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) { |
|
190 |
this.claimDatabaseUtils = claimDatabaseUtils; |
|
191 |
} |
|
192 |
|
|
193 |
public String getSql() { |
|
194 |
return sql; |
|
195 |
} |
|
196 |
|
|
197 |
public void setSql(final String sql) { |
|
198 |
this.sql = sql; |
|
199 |
} |
|
200 | 79 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ClaimDatabaseUtils.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.util.List; |
4 | 4 |
|
5 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
6 |
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo; |
|
7 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
|
8 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
5 | 9 |
import eu.dnetlib.enabling.database.DataSourceFactory; |
6 | 10 |
import eu.dnetlib.enabling.database.utils.JdbcTemplateFactory; |
7 | 11 |
import org.springframework.beans.factory.annotation.Autowired; |
... | ... | |
38 | 42 |
return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).queryForObject(sqlCountQuery, Integer.class); |
39 | 43 |
} |
40 | 44 |
|
41 |
|
|
42 | 45 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/hbase/tmp_claims2hbase.xml | ||
---|---|---|
11 | 11 |
<WORKFLOW_TYPE>Data Load</WORKFLOW_TYPE> |
12 | 12 |
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> |
13 | 13 |
<CONFIGURATION start="manual"> |
14 |
<NODE isStart="true" name="start"> |
|
15 |
<DESCRIPTION>start</DESCRIPTION> |
|
16 |
<PARAMETERS/> |
|
17 |
<ARCS> |
|
18 |
<ARC to="queryClaimRels"/> |
|
19 |
</ARCS> |
|
20 |
</NODE> |
|
21 |
<NODE name="applyClaimRels" type="ApplyClaimRels"> |
|
14 |
<NODE name="applyClaimRels" type="ApplyClaimRels" isStart="true"> |
|
22 | 15 |
<DESCRIPTION>Apply Claim Rels</DESCRIPTION> |
23 | 16 |
<PARAMETERS> |
24 | 17 |
<PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM> |
25 | 18 |
<PARAM managedBy="user" name="tableName" required="true" type="string"></PARAM> |
26 | 19 |
<PARAM managedBy="user" name="sql" required="true" type="string">SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM> |
27 |
<PARAM managedBy="user" name="countSql" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM>
|
|
20 |
<PARAM managedBy="user" name="countQuery" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM>
|
|
28 | 21 |
</PARAMETERS> |
29 | 22 |
<ARCS> |
30 | 23 |
<ARC to="applyClaimUpdates"/> |
... | ... | |
36 | 29 |
<PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM> |
37 | 30 |
<PARAM managedBy="user" name="tableName" required="true" type="string"></PARAM> |
38 | 31 |
<PARAM managedBy="user" name="sql" required="true" type="string">SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM> |
39 |
<PARAM managedBy="user" name="countSql" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM>
|
|
32 |
<PARAM managedBy="user" name="countQuery" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM>
|
|
40 | 33 |
</PARAMETERS> |
41 | 34 |
<ARCS> |
42 | 35 |
<ARC to="success"/> |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/applicationContext-claims.xml | ||
---|---|---|
1 | 1 |
<?xml version="1.0" encoding="UTF-8"?> |
2 | 2 |
<beans xmlns="http://www.springframework.org/schema/beans" |
3 |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" |
|
4 |
xmlns:p="http://www.springframework.org/schema/p" xmlns:util="http://www.springframework.org/schema/util" |
|
5 |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd |
|
6 |
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd |
|
7 |
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> |
|
8 |
|
|
9 |
<!--<bean id="claimsJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"--> |
|
10 |
<!--p:dataSource-ref="claimsDatasource"/>--> |
|
3 |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|
4 |
xmlns:p="http://www.springframework.org/schema/p" |
|
5 |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> |
|
6 |
|
|
11 | 7 |
<bean id="claimsJdbcTemplateFactory" |
12 | 8 |
class="eu.dnetlib.enabling.database.utils.JdbcTemplateFactory" |
13 | 9 |
p:dataSourceFactory-ref="claimsDataSourceFactory" /> |
14 | 10 |
|
15 |
<!--<bean id="claimsDatasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">--> |
|
16 |
<!--<property name="driverClassName" value="org.postgresql.Driver"/>--> |
|
17 |
<!--<property name="url" value="${dnet.openaire.claims.db.url}"/>--> |
|
18 |
<!--<property name="username" value="${dnet.openaire.claims.db.username}"/>--> |
|
19 |
<!--<property name="password" value="${dnet.openaire.claims.db.password}"/>--> |
|
20 |
<!--</bean>--> |
|
21 | 11 |
<bean id="claimsDataSourceFactory" |
22 | 12 |
class="eu.dnetlib.enabling.database.DataSourceFactoryImpl" |
23 | 13 |
p:driverClassName="org.postgresql.Driver" |
... | ... | |
25 | 15 |
p:username="${dnet.openaire.claims.db.username}" |
26 | 16 |
p:password="${dnet.openaire.claims.db.password}" /> |
27 | 17 |
|
18 |
<bean id="claimDatabaseUtils" class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ClaimDatabaseUtils" /> |
|
28 | 19 |
|
29 |
<bean id="claimDatabaseUtils" class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ClaimDatabaseUtils" /> |
|
30 | 20 |
</beans> |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml | ||
---|---|---|
313 | 313 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimRelsJobNode" |
314 | 314 |
scope="prototype"/> |
315 | 315 |
|
316 |
<bean id="wfNodeApplyClaimUpdates" |
|
317 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimUpdatesJobNode" |
|
318 |
scope="prototype"/> |
|
319 |
|
|
316 | 320 |
</beans> |
Also available in: Unified diff
Node for writing context updates on hbase and refactoring