Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.data.hadoop.rmi.HadoopService;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
11
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
12
import eu.dnetlib.data.proto.KindProtos.Kind;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafRel;
15
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
16
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
17
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
18
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject;
19
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
20
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
21
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
22
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.msro.rmi.MSROException;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
import eu.dnetlib.utils.ontologies.OntologyLoader;
27
import org.apache.commons.codec.binary.Base64;
28
import org.apache.commons.io.IOUtils;
29
import org.apache.commons.logging.Log;
30
import org.apache.commons.logging.LogFactory;
31
import org.apache.hadoop.util.StringUtils;
32
import org.springframework.beans.factory.annotation.Autowired;
33

    
34
/**
35
 * Created by alessia on 23/10/15.
36
 */
37
public class ApplyClaimRelsJobNode extends SimpleJobNode {
38

    
39
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
40

    
41
	private final String SEPARATOR = "_";
42

    
43
	@Autowired
44
	private UniqueServiceLocator serviceLocator;
45

    
46
	@Autowired
47
	private ClaimDatabaseUtils claimDatabaseUtils;
48

    
49
	private String sql;
50

    
51
	private String clusterName;
52

    
53
	private String tableName;
54

    
55
	private String fetchSqlAsText(final String path) throws IOException {
56
		return IOUtils.toString(getClass().getResourceAsStream(path));
57
	}
58

    
59
	@Override
60
	protected String execute(NodeToken token) throws Exception {
61
		//TODO: use claim.claim_date from the claim db
62
		long timestamp = System.currentTimeMillis();
63

    
64
		List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
65
		int totalClaims = 0;
66
		int totalWrites = 0;
67

    
68
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
69

    
70
		for (Claim claim : claimRels) {
71
			log.debug(claim);
72
			totalClaims++;
73
			String sourceId = fullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
74
			String targetId = fullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
75
			String value = getValue(sourceId, targetId, claim.getSemantics(), timestamp);
76
			/*
77
			public void addHBaseColumn(final String clusterName,
78
			final String tableName,
79
			final String rowKey,
80
			final String columnFamily,
81
			final String qualifier,
82
			final String value)
83
			 */
84
			hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
85
			totalWrites++;
86

    
87
			String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
88
			String inverseValue = getValue(targetId, sourceId, inverseSemantics, timestamp);
89
			hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue);
90
			totalWrites++;
91

    
92
		}
93

    
94
		log.info("totalClaims: " + totalClaims);
95
		token.getEnv().setAttribute("claimSize", totalClaims);
96
		log.info("writeOps: " + totalWrites);
97
		token.getEnv().setAttribute("writeOps", totalWrites);
98

    
99
		return Arc.DEFAULT_ARC;
100
	}
101

    
102
	protected String getOpenAIREType(final String type){
103
		switch(type){
104
		case "publication":
105
		case "dataset":
106
			return "result";
107
		default:
108
			return type;
109
		}
110
	}
111

    
112

    
113
	private String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{
114
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
115
		String[] relInfo = semantics.split(SEPARATOR);
116
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
117

    
118
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp);
119
		builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false)
120
				.setProvenanceaction(
121
						Qualifier.newBuilder()
122
								.setClassid("user:claim")
123
								.setClassname("user:claim")
124
								.setSchemeid("dnet:provenanceActions")
125
								.setSchemename("dnet:provenanceActions")
126
				));
127

    
128
		final SubRelType subRelType = SubRelType.valueOf(relInfo[1]);
129
		final OafRel.Builder relBuilder = OafRel.newBuilder()
130
				.setSubRelType(subRelType)
131
				.setRelClass(relInfo[2])
132
				.setRelType(RelType.valueOf(relInfo[0]))
133
				.setSource(sourceId).setTarget(targetId).setChild(false);
134

    
135
		switch (relInfo[0]) {
136
		case "resultProject":
137

    
138
			relBuilder.setResultProject(ResultProject.newBuilder()
139
					.setOutcome(Outcome.newBuilder().setRelMetadata(
140
							RelMetadata.newBuilder().setSemantics(
141
									semanticsBuilder
142
											.setSchemeid("dnet:result_project_relations")
143
											.setSchemename("dnet:result_project_relations")
144
											.build()
145
							))));
146
			break;
147
		case "resultResult_publicationDataset_isRelatedTo":
148
			relBuilder.setResultResult(ResultResult.newBuilder()
149
					.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata(
150
							RelMetadata.newBuilder().setSemantics(
151
									semanticsBuilder
152
											.setSchemeid("dnet:result_result_relations")
153
											.setSchemename("dnet:result_result_relations")
154
											.build()
155
							))));
156
			break;
157
		default:
158
			throw new MSROException("Semantics "+relInfo[0]+" not supported");
159
		}
160

    
161
		builder.setRel(relBuilder);
162
		return Base64.encodeBase64String(builder.build().toByteArray());
163
	}
164

    
165
	private String fullId(final String type, final String id) {
166
		final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id);
167
		return OafRowKeyDecoder.decode(fullId).getKey();
168
	}
169

    
170
	public String getClusterName() {
171
		return clusterName;
172
	}
173

    
174
	public void setClusterName(final String clusterName) {
175
		this.clusterName = clusterName;
176
	}
177

    
178
	public String getTableName() {
179
		return tableName;
180
	}
181

    
182
	public void setTableName(final String tableName) {
183
		this.tableName = tableName;
184
	}
185

    
186
	public UniqueServiceLocator getServiceLocator() {
187
		return serviceLocator;
188
	}
189

    
190
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
191
		this.serviceLocator = serviceLocator;
192
	}
193

    
194
	public ClaimDatabaseUtils getClaimDatabaseUtils() {
195
		return claimDatabaseUtils;
196
	}
197

    
198
	public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) {
199
		this.claimDatabaseUtils = claimDatabaseUtils;
200
	}
201

    
202
	public String getSql() {
203
		return sql;
204
	}
205

    
206
	public void setSql(final String sql) {
207
		this.sql = sql;
208
	}
209
}
(1-1/5)