Revision 48027
Added by Alessia Bardi over 6 years ago
ApplyClaimUpdatesJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 | 3 |
import java.util.List; |
5 | 4 |
|
6 | 5 |
import com.googlecode.sarasvati.Arc; |
7 | 6 |
import com.googlecode.sarasvati.NodeToken; |
8 | 7 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
9 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
10 |
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo; |
|
11 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
|
12 | 8 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
13 | 9 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
14 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
15 |
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata; |
|
16 |
import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
|
17 |
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType; |
|
18 |
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject; |
|
19 |
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome; |
|
20 |
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult; |
|
21 |
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset; |
|
22 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
23 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
10 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
11 |
import eu.dnetlib.data.proto.ResultProtos.Result; |
|
12 |
import eu.dnetlib.data.proto.ResultProtos.Result.Context; |
|
13 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
24 | 14 |
import eu.dnetlib.msro.rmi.MSROException; |
25 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
26 |
import eu.dnetlib.utils.ontologies.OntologyLoader; |
|
27 | 15 |
import org.apache.commons.codec.binary.Base64; |
28 |
import org.apache.commons.io.IOUtils; |
|
29 | 16 |
import org.apache.commons.logging.Log; |
30 | 17 |
import org.apache.commons.logging.LogFactory; |
31 | 18 |
import org.apache.hadoop.util.StringUtils; |
32 |
import org.springframework.beans.factory.annotation.Autowired; |
|
33 | 19 |
|
34 | 20 |
/** |
35 | 21 |
* Created by alessia on 23/10/15. |
36 | 22 |
*/ |
37 |
public class ApplyClaimUpdatesJobNode extends SimpleJobNode {
|
|
23 |
public class ApplyClaimUpdatesJobNode extends AbstractClaimsToHBASE {
|
|
38 | 24 |
|
39 | 25 |
private static final Log log = LogFactory.getLog(ApplyClaimUpdatesJobNode.class); |
40 | 26 |
|
41 |
private final String SEPARATOR = "_"; |
|
42 |
|
|
43 |
@Autowired |
|
44 |
private UniqueServiceLocator serviceLocator; |
|
45 |
|
|
46 |
@Autowired |
|
47 |
private ClaimDatabaseUtils claimDatabaseUtils; |
|
48 |
|
|
49 |
private String sql; |
|
50 |
|
|
51 |
private String clusterName; |
|
52 |
|
|
53 |
private String tableName; |
|
54 |
|
|
55 |
private String fetchSqlAsText(final String path) throws IOException { |
|
56 |
return IOUtils.toString(getClass().getResourceAsStream(path)); |
|
57 |
} |
|
58 |
|
|
59 | 27 |
@Override |
60 | 28 |
protected String execute(NodeToken token) throws Exception { |
61 | 29 |
//TODO: use claim.claim_date from the claim db |
62 | 30 |
long timestamp = System.currentTimeMillis(); |
63 | 31 |
|
64 |
List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
|
|
32 |
List<Claim> claimUpdates = this.getClaimDatabaseUtils().query(getSql());
|
|
65 | 33 |
int totalClaims = 0; |
66 | 34 |
int totalWrites = 0; |
35 |
int discardedClaims = 0; |
|
67 | 36 |
|
68 |
HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
|
|
37 |
HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
|
|
69 | 38 |
|
70 |
for (Claim claim : claimRels) { |
|
39 |
for (Claim claim : claimUpdates) { |
|
40 |
try{ |
|
71 | 41 |
log.debug(claim); |
72 | 42 |
totalClaims++; |
73 | 43 |
String contextId = claim.getSource(); |
74 |
String targetId = fullId(claim.getTargetType(), claim.getTarget());
|
|
44 |
String rowKey = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
|
|
75 | 45 |
|
76 |
String value = getValue(contextId, targetId, claim.getSemantics(), timestamp); |
|
77 |
/* |
|
78 |
public void addHBaseColumn(final String clusterName, |
|
79 |
final String tableName, |
|
80 |
final String rowKey, |
|
81 |
final String columnFamily, |
|
82 |
final String qualifier, |
|
83 |
final String value) |
|
84 |
*/ |
|
85 |
hadoopService.addHBaseColumn(clusterName, tableName, contextId, claim.getSemantics(), targetId, value); |
|
46 |
String value = getValue(contextId, rowKey, claim.getSemantics(), timestamp); |
|
47 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), rowKey, "result", "update_" + System.nanoTime(), value); |
|
86 | 48 |
totalWrites++; |
87 |
|
|
88 |
String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics()); |
|
89 |
String inverseValue = getValue(targetId, contextId, inverseSemantics, timestamp); |
|
90 |
hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, contextId, inverseValue); |
|
91 |
totalWrites++; |
|
92 |
|
|
49 |
incrementProcessed(); |
|
50 |
} catch (IllegalArgumentException e) { |
|
51 |
log.error("Discarding claim " + claim + ". Cause: " + e.getMessage()); |
|
52 |
discardedClaims++; |
|
53 |
} |
|
93 | 54 |
} |
94 | 55 |
|
95 |
log.info("totalClaims: " + totalClaims); |
|
96 |
token.getEnv().setAttribute("claimSize", totalClaims); |
|
97 |
log.info("writeOps: " + totalWrites); |
|
98 |
token.getEnv().setAttribute("writeOps", totalWrites); |
|
56 |
log.info("Total Claim Updates: " + totalClaims); |
|
57 |
token.getEnv().setAttribute("claimUpdatesSize", totalClaims); |
|
58 |
log.info("Claim updates writeOps: " + totalWrites); |
|
59 |
token.getEnv().setAttribute("claimUpdatesWriteOps", totalWrites); |
|
60 |
log.info("Discarded Claim Updates: " + discardedClaims); |
|
61 |
token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims); |
|
99 | 62 |
|
100 | 63 |
return Arc.DEFAULT_ARC; |
101 | 64 |
} |
102 | 65 |
|
103 |
|
|
104 |
private String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{ |
|
66 |
protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException { |
|
105 | 67 |
log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId)); |
106 |
String[] relInfo = semantics.split(SEPARATOR); |
|
107 |
Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]); |
|
108 | 68 |
|
109 |
Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp); |
|
110 |
builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false) |
|
111 |
.setProvenanceaction( |
|
112 |
Qualifier.newBuilder() |
|
113 |
.setClassid("user:claim") |
|
114 |
.setClassname("user:claim") |
|
115 |
.setSchemeid("dnet:provenanceActions") |
|
116 |
.setSchemename("dnet:provenanceActions") |
|
117 |
)); |
|
69 |
Result.Builder resultBuilder = Result.newBuilder().setMetadata(Result.Metadata.newBuilder().addContext(getContext(sourceId))); |
|
70 |
OafEntity.Builder entityBuilder = OafEntity.newBuilder().setId(targetId).setType(Type.result).setResult(resultBuilder); |
|
71 |
Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.entity).setLastupdatetimestamp(timestamp).setEntity(entityBuilder); |
|
118 | 72 |
|
119 |
final SubRelType subRelType = SubRelType.valueOf(relInfo[1]); |
|
120 |
final OafRel.Builder relBuilder = OafRel.newBuilder() |
|
121 |
.setSubRelType(subRelType) |
|
122 |
.setRelClass(relInfo[2]) |
|
123 |
.setRelType(RelType.valueOf(relInfo[0])) |
|
124 |
.setSource(sourceId).setTarget(targetId).setChild(false); |
|
125 |
|
|
126 |
switch (relInfo[0]) { |
|
127 |
case "resultProject": |
|
128 |
|
|
129 |
relBuilder.setResultProject(ResultProject.newBuilder() |
|
130 |
.setOutcome(Outcome.newBuilder().setRelMetadata( |
|
131 |
RelMetadata.newBuilder().setSemantics( |
|
132 |
semanticsBuilder |
|
133 |
.setSchemeid("dnet:result_project_relations") |
|
134 |
.setSchemename("dnet:result_project_relations") |
|
135 |
.build() |
|
136 |
)))); |
|
137 |
break; |
|
138 |
case "resultResult_publicationDataset_isRelatedTo": |
|
139 |
relBuilder.setResultResult(ResultResult.newBuilder() |
|
140 |
.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata( |
|
141 |
RelMetadata.newBuilder().setSemantics( |
|
142 |
semanticsBuilder |
|
143 |
.setSchemeid("dnet:result_result_relations") |
|
144 |
.setSchemename("dnet:result_result_relations") |
|
145 |
.build() |
|
146 |
)))); |
|
147 |
break; |
|
148 |
default: |
|
149 |
throw new MSROException("Semantics "+relInfo[0]+" not supported"); |
|
150 |
} |
|
151 |
|
|
152 |
builder.setRel(relBuilder); |
|
153 | 73 |
return Base64.encodeBase64String(builder.build().toByteArray()); |
154 | 74 |
} |
155 | 75 |
|
156 |
private String fullId(final String type, final String id) { |
|
157 |
final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id); |
|
158 |
return OafRowKeyDecoder.decode(fullId).getKey(); |
|
76 |
private Context getContext(final String sourceId) { |
|
77 |
return Context.newBuilder().setDataInfo(getDataInfo()).setId(sourceId).build(); |
|
159 | 78 |
} |
160 |
|
|
161 |
public String getClusterName() { |
|
162 |
return clusterName; |
|
163 |
} |
|
164 |
|
|
165 |
public void setClusterName(final String clusterName) { |
|
166 |
this.clusterName = clusterName; |
|
167 |
} |
|
168 |
|
|
169 |
public String getTableName() { |
|
170 |
return tableName; |
|
171 |
} |
|
172 |
|
|
173 |
public void setTableName(final String tableName) { |
|
174 |
this.tableName = tableName; |
|
175 |
} |
|
176 |
|
|
177 |
public UniqueServiceLocator getServiceLocator() { |
|
178 |
return serviceLocator; |
|
179 |
} |
|
180 |
|
|
181 |
public void setServiceLocator(final UniqueServiceLocator serviceLocator) { |
|
182 |
this.serviceLocator = serviceLocator; |
|
183 |
} |
|
184 |
|
|
185 |
public ClaimDatabaseUtils getClaimDatabaseUtils() { |
|
186 |
return claimDatabaseUtils; |
|
187 |
} |
|
188 |
|
|
189 |
public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) { |
|
190 |
this.claimDatabaseUtils = claimDatabaseUtils; |
|
191 |
} |
|
192 |
|
|
193 |
public String getSql() { |
|
194 |
return sql; |
|
195 |
} |
|
196 |
|
|
197 |
public void setSql(final String sql) { |
|
198 |
this.sql = sql; |
|
199 |
} |
|
200 | 79 |
} |
Also available in: Unified diff
Node for writing context updates on hbase and refactoring