Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.data.hadoop.rmi.HadoopService;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
11
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
12
import eu.dnetlib.data.proto.KindProtos.Kind;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafRel;
15
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
16
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
17
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
18
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject;
19
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
20
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
21
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
22
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.msro.rmi.MSROException;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
import eu.dnetlib.utils.ontologies.OntologyLoader;
27
import org.apache.commons.codec.binary.Base64;
28
import org.apache.commons.io.IOUtils;
29
import org.apache.commons.logging.Log;
30
import org.apache.commons.logging.LogFactory;
31
import org.apache.hadoop.util.StringUtils;
32
import org.springframework.beans.factory.annotation.Autowired;
33

    
34
/**
35
 * Created by alessia on 23/10/15.
36
 */
37
public class ApplyClaimRelsJobNode extends SimpleJobNode {
38

    
39
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
40

    
41
	private final String SEPARATOR = "_";
42

    
43
	@Autowired
44
	private UniqueServiceLocator serviceLocator;
45

    
46
	@Autowired
47
	private ClaimDatabaseUtils claimDatabaseUtils;
48

    
49
	private String sql;
50

    
51
	private String clusterName;
52

    
53
	private String tableName;
54

    
55
	private String fetchSqlAsText(final String path) throws IOException {
56
		return IOUtils.toString(getClass().getResourceAsStream(path));
57
	}
58

    
59
	@Override
60
	protected String execute(NodeToken token) throws Exception {
61
		//TODO: use claim.claim_date from the claim db
62
		long timestamp = System.currentTimeMillis();
63

    
64
		List<ClaimRel> claimRels = this.claimDatabaseUtils.query(sql);
65
		int totalClaims = 0;
66
		int totalWrites = 0;
67

    
68
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
69

    
70
		for (ClaimRel claim : claimRels) {
71
			log.debug(claim);
72
			totalClaims++;
73
			String sourceId = fullId(claim.getSourceType(), claim.getSource());
74
			String targetId = fullId(claim.getTargetType(), claim.getTarget());
75
			String value = getValue(sourceId, targetId, claim.getSemantics(), timestamp);
76
			/*
77
			public void addHBaseColumn(final String clusterName,
78
			final String tableName,
79
			final String rowKey,
80
			final String columnFamily,
81
			final String qualifier,
82
			final String value)
83
			 */
84
			hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
85
			totalWrites++;
86

    
87
			String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
88
			String inverseValue = getValue(targetId, sourceId, inverseSemantics, timestamp);
89
			hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue);
90
			totalWrites++;
91

    
92
		}
93

    
94
		log.info("totalClaims: " + totalClaims);
95
		token.getEnv().setAttribute("claimSize", totalClaims);
96
		log.info("writeOps: " + totalWrites);
97
		token.getEnv().setAttribute("writeOps", totalWrites);
98

    
99
		return Arc.DEFAULT_ARC;
100
	}
101

    
102

    
103
	private String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{
104
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
105
		String[] relInfo = semantics.split(SEPARATOR);
106
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
107

    
108
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp);
109
		builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false)
110
				.setProvenanceaction(
111
						Qualifier.newBuilder()
112
								.setClassid("user:claim")
113
								.setClassname("user:claim")
114
								.setSchemeid("dnet:provenanceActions")
115
								.setSchemename("dnet:provenanceActions")
116
				));
117

    
118
		final SubRelType subRelType = SubRelType.valueOf(relInfo[1]);
119
		final OafRel.Builder relBuilder = OafRel.newBuilder()
120
				.setSubRelType(subRelType)
121
				.setRelClass(relInfo[2])
122
				.setRelType(RelType.valueOf(relInfo[0]))
123
				.setSource(sourceId).setTarget(targetId).setChild(false);
124

    
125
		switch (relInfo[0]) {
126
		case "resultProject":
127

    
128
			relBuilder.setResultProject(ResultProject.newBuilder()
129
					.setOutcome(Outcome.newBuilder().setRelMetadata(
130
							RelMetadata.newBuilder().setSemantics(
131
									semanticsBuilder
132
											.setSchemeid("dnet:result_project_relations")
133
											.setSchemename("dnet:result_project_relations")
134
											.build()
135
							))));
136
			break;
137
		case "resultResult_publicationDataset_isRelatedTo":
138
			relBuilder.setResultResult(ResultResult.newBuilder()
139
					.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata(
140
							RelMetadata.newBuilder().setSemantics(
141
									semanticsBuilder
142
											.setSchemeid("dnet:result_result_relations")
143
											.setSchemename("dnet:result_result_relations")
144
											.build()
145
							))));
146
			break;
147
		default:
148
			throw new MSROException("Semantics "+relInfo[0]+" not supported");
149
		}
150

    
151
		builder.setRel(relBuilder);
152
		return Base64.encodeBase64String(builder.build().toByteArray());
153
	}
154

    
155
	private String fullId(final String type, final String id) {
156
		final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id);
157
		return OafRowKeyDecoder.decode(fullId).getKey();
158
	}
159

    
160
	public String getClusterName() {
161
		return clusterName;
162
	}
163

    
164
	public void setClusterName(final String clusterName) {
165
		this.clusterName = clusterName;
166
	}
167

    
168
	public String getTableName() {
169
		return tableName;
170
	}
171

    
172
	public void setTableName(final String tableName) {
173
		this.tableName = tableName;
174
	}
175

    
176
}
(1-1/4)