Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

    
3
import java.util.List;
4

    
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7
import eu.dnetlib.data.hadoop.rmi.HadoopService;
8
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
9
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
10
import eu.dnetlib.data.proto.KindProtos.Kind;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.OafProtos.OafRel;
13
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
14
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
15
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
16
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject;
17
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
18
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
19
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
20
import eu.dnetlib.msro.rmi.MSROException;
21
import eu.dnetlib.utils.ontologies.OntologyLoader;
22
import org.apache.commons.codec.binary.Base64;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.apache.hadoop.util.StringUtils;
26

    
27
/**
28
 * Created by alessia on 23/10/15.
29
 */
30
public class ApplyClaimRelsJobNode extends AbstractClaimsToHBASE {
31

    
32
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
33

    
34
	private final String SEPARATOR = "_";
35

    
36

    
37
	@Override
38
	protected String execute(NodeToken token) throws Exception {
39
		//TODO: use claim.claim_date from the claim db
40
		long timestamp = System.currentTimeMillis();
41
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
42
		List<Claim> claimRels = getClaimDatabaseUtils().query(getSql());
43
		int totalWrites = 0;
44
		int valid = 0;
45
		int discardedClaims = 0;
46

    
47
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
48

    
49
		for (Claim claim : claimRels) {
50
			log.debug(claim);
51
			try {
52
				String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
53
				String targetId = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
54
				String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp);
55
				// adding relationship
56
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value);
57
				totalWrites++;
58

    
59
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
60
				String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp);
61
				//adding inverse relationship
62
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue);
63
				totalWrites++;
64
				incrementProcessed();
65
			} catch (IllegalArgumentException e) {
66
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
67
				discardedClaims++;
68
			}
69
		}
70

    
71
		log.info("totalClaimRels: " + getTotal());
72
		token.getEnv().setAttribute("claimRelsSize", getTotal());
73
		log.info("claim rels writeOps: " + totalWrites);
74
		token.getEnv().setAttribute("claimRelsWriteOps", totalWrites);
75
		log.info("validClaimRels: " + valid);
76
		token.getEnv().setAttribute("validClaimRels", valid);
77
		log.info("discardedClaimRels: " + discardedClaims);
78
		token.getEnv().setAttribute("discardedClaimRels", discardedClaims);
79

    
80
		return Arc.DEFAULT_ARC;
81
	}
82

    
83
	protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException {
84
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
85
		String[] relInfo = semantics.split(SEPARATOR);
86
		if (relInfo.length < 3) {
87
			throw new MSROException("Semantics " + semantics + " not supported: must be splittable in 3 by '_'");
88
		}
89
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
90

    
91
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp);
92
		builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false)
93
				.setProvenanceaction(
94
						Qualifier.newBuilder()
95
								.setClassid("user:claim")
96
								.setClassname("user:claim")
97
								.setSchemeid("dnet:provenanceActions")
98
								.setSchemename("dnet:provenanceActions")
99
				));
100

    
101
		final SubRelType subRelType = SubRelType.valueOf(relInfo[1]);
102
		final OafRel.Builder relBuilder = OafRel.newBuilder()
103
				.setSubRelType(subRelType)
104
				.setRelClass(relInfo[2])
105
				.setRelType(RelType.valueOf(relInfo[0]))
106
				.setSource(sourceId).setTarget(targetId).setChild(false);
107

    
108
		switch (relInfo[0]) {
109
		case "resultProject":
110

    
111
			relBuilder.setResultProject(ResultProject.newBuilder()
112
					.setOutcome(Outcome.newBuilder().setRelMetadata(
113
							RelMetadata.newBuilder().setSemantics(
114
									semanticsBuilder
115
											.setSchemeid("dnet:result_project_relations")
116
											.setSchemename("dnet:result_project_relations")
117
											.build()
118
							))));
119
			break;
120
		case "resultResult":
121
			relBuilder.setResultResult(ResultResult.newBuilder()
122
					.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata(
123
							RelMetadata.newBuilder().setSemantics(
124
									semanticsBuilder
125
											.setSchemeid("dnet:result_result_relations")
126
											.setSchemename("dnet:result_result_relations")
127
											.build()
128
							))));
129
			break;
130
		default:
131
			throw new MSROException("Semantics " + relInfo[0] + " not supported");
132
		}
133

    
134
		builder.setRel(relBuilder);
135
		return Base64.encodeBase64String(builder.build().toByteArray());
136
	}
137
}
(2-2/6)