Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

    
3
import java.util.List;
4

    
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7
import eu.dnetlib.data.hadoop.rmi.HadoopService;
8
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
9
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
10
import eu.dnetlib.data.proto.KindProtos.Kind;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.OafProtos.OafRel;
13
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
14
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
15
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
16
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject;
17
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
18
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
19
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
20
import eu.dnetlib.msro.rmi.MSROException;
21
import eu.dnetlib.utils.ontologies.OntologyLoader;
22
import org.apache.commons.codec.binary.Base64;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.apache.hadoop.util.StringUtils;
26

    
27
/**
28
 * Created by alessia on 23/10/15.
29
 */
30
public class ApplyClaimRelsJobNode extends AbstractClaimsToHBASE {
31

    
32
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
33

    
34
	private final String SEPARATOR = "_";
35

    
36
	@Override
37
	protected String execute(NodeToken token) throws Exception {
38
		//TODO: use claim.claim_date from the claim db
39
		long timestamp = System.currentTimeMillis();
40
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
41
		List<Claim> claimRels = getClaimDatabaseUtils().query(getSql());
42
		int totalWrites = 0;
43
		int valid = 0;
44
		int discardedClaims = 0;
45

    
46
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
47

    
48
		for (Claim claim : claimRels) {
49
			log.debug(claim);
50
			try {
51
				String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
52
				String targetId = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
53
				String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp);
54
				// adding relationship
55
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value);
56
				totalWrites++;
57

    
58
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
59
				String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp);
60
				//adding inverse relationship
61
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue);
62
				totalWrites++;
63
				incrementProcessed();
64
			} catch (IllegalArgumentException e) {
65
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
66
				discardedClaims++;
67
			}
68
		}
69

    
70
		log.info("totalClaimRels: " + getTotal());
71
		token.getEnv().setAttribute("claimRelsSize", getTotal());
72
		log.info("claim rels writeOps: " + totalWrites);
73
		token.getEnv().setAttribute("claimRelsWriteOps", totalWrites);
74
		log.info("validClaimRels: " + valid);
75
		token.getEnv().setAttribute("validClaimRels", valid);
76
		log.info("discardedClaimRels: " + discardedClaims);
77
		token.getEnv().setAttribute("discardedClaimRels", discardedClaims);
78

    
79
		return Arc.DEFAULT_ARC;
80
	}
81

    
82
	protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException {
83
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
84
		String[] relInfo = semantics.split(SEPARATOR);
85
		if (relInfo.length < 3) {
86
			throw new MSROException("Semantics " + semantics + " not supported: must be splittable in 3 by '_'");
87
		}
88
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
89

    
90
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp);
91
		builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false)
92
				.setProvenanceaction(
93
						Qualifier.newBuilder()
94
								.setClassid("user:claim")
95
								.setClassname("user:claim")
96
								.setSchemeid("dnet:provenanceActions")
97
								.setSchemename("dnet:provenanceActions")
98
				));
99

    
100
		final SubRelType subRelType = SubRelType.valueOf(relInfo[1]);
101
		final OafRel.Builder relBuilder = OafRel.newBuilder()
102
				.setSubRelType(subRelType)
103
				.setRelClass(relInfo[2])
104
				.setRelType(RelType.valueOf(relInfo[0]))
105
				.setSource(sourceId).setTarget(targetId).setChild(false);
106

    
107
		switch (relInfo[0]) {
108
		case "resultProject":
109

    
110
			relBuilder.setResultProject(ResultProject.newBuilder()
111
					.setOutcome(Outcome.newBuilder().setRelMetadata(
112
							RelMetadata.newBuilder().setSemantics(
113
									semanticsBuilder
114
											.setSchemeid("dnet:result_project_relations")
115
											.setSchemename("dnet:result_project_relations")
116
											.build()
117
							))));
118
			break;
119
		case "resultResult":
120
			relBuilder.setResultResult(ResultResult.newBuilder()
121
					.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata(
122
							RelMetadata.newBuilder().setSemantics(
123
									semanticsBuilder
124
											.setSchemeid("dnet:result_result_relations")
125
											.setSchemename("dnet:result_result_relations")
126
											.build()
127
							))));
128
			break;
129
		default:
130
			throw new MSROException("Semantics " + relInfo[0] + " not supported");
131
		}
132

    
133
		builder.setRel(relBuilder);
134
		return Base64.encodeBase64String(builder.build().toByteArray());
135
	}
136
}
(2-2/6)