Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

    
3
import java.util.concurrent.atomic.AtomicInteger;
4

    
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7
import eu.dnetlib.data.hadoop.rmi.HadoopService;
8
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
9
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
10
import eu.dnetlib.data.proto.KindProtos.Kind;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.OafProtos.OafRel;
13
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
14
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
15
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
16
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject;
17
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
18
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
19
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
20
import eu.dnetlib.msro.rmi.MSROException;
21
import eu.dnetlib.utils.ontologies.OntologyLoader;
22
import org.apache.commons.codec.binary.Base64;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.apache.hadoop.util.StringUtils;
26

    
27
/**
28
 * Created by alessia on 23/10/15.
29
 */
30
public class ApplyClaimRelsJobNode extends AbstractClaimsToHBASE {
31

    
32
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
33

    
34
	private final String SEPARATOR = "_";
35

    
36
	@Override
37
	protected String execute(NodeToken token) throws Exception {
38
		//TODO: use claim.claim_date from the claim db
39
		long timestamp = System.currentTimeMillis();
40
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
41

    
42
		final AtomicInteger totalWrites = new AtomicInteger(0);
43
		final AtomicInteger discardedClaims = new AtomicInteger(0);
44

    
45
		final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
46

    
47
		getClaimDatabaseUtils().query(getSql()).forEach(claim -> {
48
			log.debug(claim);
49
			try {
50
				String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
51
				String targetId = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
52
				String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp);
53
				// adding relationship
54
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value);
55
				totalWrites.incrementAndGet();
56

    
57
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
58
				String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp);
59
				//adding inverse relationship
60
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue);
61
				totalWrites.incrementAndGet();
62
				incrementProcessed();
63
			} catch (Exception e) {
64
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
65
				discardedClaims.incrementAndGet();
66
			}
67
		});
68
		log.info("totalClaimRels: " + getTotal());
69
		token.getEnv().setAttribute("claimRelsSize", getTotal());
70
		log.info("claim rels writeOps: " + totalWrites.intValue());
71
		token.getEnv().setAttribute("claimRelsWriteOps", totalWrites.intValue());
72
		log.info("discardedClaimRels: " + discardedClaims.intValue());
73
		token.getEnv().setAttribute("discardedClaimRels", discardedClaims.intValue());
74

    
75
		return Arc.DEFAULT_ARC;
76
	}
77

    
78
	protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException {
79
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
80
		String[] relInfo = semantics.split(SEPARATOR);
81
		if (relInfo.length < 3) {
82
			throw new MSROException("Semantics " + semantics + " not supported: must be splittable in 3 by '_'");
83
		}
84
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
85

    
86
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp);
87
		builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false)
88
				.setProvenanceaction(
89
						Qualifier.newBuilder()
90
								.setClassid("user:claim")
91
								.setClassname("user:claim")
92
								.setSchemeid("dnet:provenanceActions")
93
								.setSchemename("dnet:provenanceActions")
94
				));
95

    
96
		final SubRelType subRelType = SubRelType.valueOf(relInfo[1]);
97
		final OafRel.Builder relBuilder = OafRel.newBuilder()
98
				.setSubRelType(subRelType)
99
				.setRelClass(relInfo[2])
100
				.setRelType(RelType.valueOf(relInfo[0]))
101
				.setSource(sourceId).setTarget(targetId).setChild(false);
102

    
103
		switch (relInfo[0]) {
104
		case "resultProject":
105

    
106
			relBuilder.setResultProject(ResultProject.newBuilder()
107
					.setOutcome(Outcome.newBuilder().setRelMetadata(
108
							RelMetadata.newBuilder().setSemantics(
109
									semanticsBuilder
110
											.setSchemeid("dnet:result_project_relations")
111
											.setSchemename("dnet:result_project_relations")
112
											.build()
113
							))));
114
			break;
115
		case "resultResult":
116
			relBuilder.setResultResult(ResultResult.newBuilder()
117
					.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata(
118
							RelMetadata.newBuilder().setSemantics(
119
									semanticsBuilder
120
											.setSchemeid("dnet:result_result_relations")
121
											.setSchemename("dnet:result_result_relations")
122
											.build()
123
							))));
124
			break;
125
		default:
126
			throw new MSROException("Semantics " + relInfo[0] + " not supported");
127
		}
128

    
129
		builder.setRel(relBuilder);
130
		return Base64.encodeBase64String(builder.build().toByteArray());
131
	}
132
}
(2-2/6)