Project

General

Profile

« Previous | Next » 

Revision 48027

Node for writing context updates on hbase and refactoring

View differences:

modules/dnet-openaireplus-workflows/trunk/src/test/java/leu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNodeTest.java
1
package leu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

  
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.proto.OafProtos.Oaf;
5
import eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimRelsJobNode;
6
import eu.dnetlib.msro.rmi.MSROException;
7
import org.junit.Test;
8
import org.postgresql.util.Base64;
9

  
10
/**
11
 * Created by Alessia Bardi on 26/06/2017.
12
 *
13
 * @author Alessia Bardi
14
 */
15
public class ApplyClaimRelsJobNodeTest {
16

  
17
	private ApplyClaimRelsJobNode applyClaims = new ApplyClaimRelsJobNode();
18

  
19

  
20
	@Test
21
	public void testGetValue() throws MSROException, InvalidProtocolBufferException {
22

  
23
		String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e";
24
		String semantics = "resultProject_outcome_produces";
25
		String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859";
26
		long time = System.currentTimeMillis();
27
		//final String sourceId, final String semantics, final String targetId, final long timestamp
28
		String res = applyClaims.getValue(sourceId, semantics, targetId, time);
29
		Oaf.Builder builder = Oaf.newBuilder().mergeFrom(Base64.decode(res));
30
		System.out.println(builder.build().toString());
31
	}
32

  
33
	@Test(expected = MSROException.class)
34
	public void testGetValueErrSem() throws MSROException {
35

  
36
		String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e";
37
		String semantics = "produces";
38
		String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859";
39
		long time = System.currentTimeMillis();
40
		//final String sourceId, final String semantics, final String targetId, final long timestamp
41
		String res = applyClaims.getValue(sourceId, semantics, targetId, time);
42
		System.out.println(res);
43
	}
44
}
modules/dnet-openaireplus-workflows/trunk/src/test/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNodeTest.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

  
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.proto.OafProtos.Oaf;
5
import eu.dnetlib.msro.rmi.MSROException;
6
import org.junit.Test;
7
import org.postgresql.util.Base64;
8

  
9
/**
10
 * Created by Alessia Bardi on 26/06/2017.
11
 *
12
 * @author Alessia Bardi
13
 */
14
public class ApplyClaimRelsJobNodeTest {
15

  
16
	private ApplyClaimRelsJobNode applyClaims = new ApplyClaimRelsJobNode();
17

  
18
	@Test
19
	public void testGetValue() throws MSROException, InvalidProtocolBufferException {
20

  
21
		String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e";
22
		String semantics = "resultProject_outcome_produces";
23
		String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859";
24
		System.out.println(getValue(sourceId, semantics, targetId));
25
	}
26

  
27
	@Test(expected = MSROException.class)
28
	public void testGetValueErrSem() throws MSROException, InvalidProtocolBufferException {
29

  
30
		String sourceId = "40|corda_______::9f752db0b5ec9ca23673ca7f4cb0808e";
31
		String semantics = "produces";
32
		String targetId = "50|userclaim___::6a8f649d968e734a6733c23a351c1859";
33
		getValue(sourceId, semantics, targetId);
34
	}
35

  
36
	private String getValue(final String rowKey, final String semantics, final String targetId) throws MSROException, InvalidProtocolBufferException {
37
		long time = System.currentTimeMillis();
38
		//final String sourceId, final String semantics, final String targetId, final long timestamp
39
		String res = applyClaims.getValue(rowKey, semantics, targetId, time);
40
		Oaf.Builder builder = Oaf.newBuilder().mergeFrom(Base64.decode(res));
41
		return builder.build().toString();
42
	}
43
}
modules/dnet-openaireplus-workflows/trunk/src/test/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimUpdatesJobNodeTest.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

  
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.proto.OafProtos.Oaf;
5
import eu.dnetlib.msro.rmi.MSROException;
6
import org.junit.Test;
7
import org.postgresql.util.Base64;
8

  
9
/**
10
 * Created by Alessia Bardi on 26/06/2017.
11
 *
12
 * @author Alessia Bardi
13
 */
14
public class ApplyClaimUpdatesJobNodeTest {
15

  
16
	private ApplyClaimUpdatesJobNode applyClaims = new ApplyClaimUpdatesJobNode();
17

  
18

  
19
	@Test
20
	public void testGetValueEGI() throws MSROException, InvalidProtocolBufferException {
21
		String context = "egi";
22
		String rowkey = "50|userclaim___::6a8f649d968e734a6733c23a351c1859";
23
		System.out.println(getValue(context, rowkey));
24
	}
25

  
26
	@Test
27
	public void testGetValueEGILongId() throws MSROException, InvalidProtocolBufferException {
28
		String context = "egi::classification::natsc::earths::other";
29
		String rowkey = "50|userclaim___::6a8f649d968e734a6733c23a351c1859";
30
		System.out.println(getValue(context, rowkey));
31
	}
32

  
33
	private String getValue(final String context, final String rowkey) throws MSROException, InvalidProtocolBufferException {
34
		String semantics = "relatedTo";
35
		long time = System.currentTimeMillis();
36
		//final String sourceId, final String semantics, final String targetId, final long timestamp
37
		String res = applyClaims.getValue(context, semantics, rowkey, time);
38
		Oaf.Builder builder = Oaf.newBuilder().mergeFrom(Base64.decode(res));
39
		return builder.build().toString();
40
	}
41
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/AbstractClaimsToHBASE.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

  
3
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
4
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
5
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
6
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
7
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
8
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
9
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
10
import eu.dnetlib.msro.workflows.util.ProgressProvider;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.springframework.beans.factory.annotation.Autowired;
14

  
15
/**
16
 * Created by Alessia Bardi on 28/06/2017.
17
 *
18
 * @author Alessia Bardi
19
 */
20
public abstract class AbstractClaimsToHBASE extends SimpleJobNode implements ProgressJobNode {
21

  
22
	private static final Log log = LogFactory.getLog(AbstractClaimsToHBASE.class);
23

  
24
	private String sql;
25
	private String countQuery;
26

  
27
	private int total = 0;
28
	private int processed = 0;
29

  
30
	private String clusterName;
31
	private String tableName;
32

  
33
	@Autowired
34
	private UniqueServiceLocator serviceLocator;
35

  
36
	@Autowired
37
	private ClaimDatabaseUtils claimDatabaseUtils;
38

  
39
	@Override
40
	public ProgressProvider getProgressProvider() {
41
		return new ProgressProvider() {
42

  
43
			@Override
44
			public boolean isInaccurate() {
45
				return false;
46
			}
47

  
48
			@Override
49
			public int getTotalValue() {
50
				return total;
51
			}
52

  
53
			@Override
54
			public int getCurrentValue() {
55
				return processed;
56
			}
57
		};
58
	}
59

  
60
	protected String getOpenAIREType(final String type){
61
		switch(type){
62
		case "publication":
63
		case "dataset":
64
			return "result";
65
		default:
66
			return type;
67
		}
68
	}
69

  
70
	protected String getFullId(final String type, final String id) {
71
		final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id);
72
		return OafRowKeyDecoder.decode(fullId).getKey();
73
	}
74

  
75
	protected static DataInfo getDataInfo(){
76
		return DataInfo.newBuilder().setTrust("0.91").setInferred(false)
77
				.setProvenanceaction(
78
						Qualifier.newBuilder()
79
								.setClassid("user:claim")
80
								.setClassname("user:claim")
81
								.setSchemeid("dnet:provenanceActions")
82
								.setSchemename("dnet:provenanceActions")).build();
83

  
84
	}
85
	public String getSql() {
86
		return sql;
87
	}
88

  
89
	public void setSql(final String sql) {
90
		this.sql = sql;
91
	}
92

  
93
	public String getCountQuery() {
94
		return countQuery;
95
	}
96

  
97
	public void setCountQuery(final String countQuery) {
98
		this.countQuery = countQuery;
99
	}
100

  
101
	public int getTotal() {
102
		return total;
103
	}
104

  
105
	public void setTotal(final int total) {
106
		this.total = total;
107
	}
108

  
109
	public int getProcessed() {
110
		return processed;
111
	}
112

  
113
	public void setProcessed(final int processed) {
114
		this.processed = processed;
115
	}
116

  
117
	public void incrementProcessed(){
118
		processed++;
119
	}
120

  
121
	public String getClusterName() {
122
		return clusterName;
123
	}
124

  
125
	public void setClusterName(final String clusterName) {
126
		this.clusterName = clusterName;
127
	}
128

  
129
	public String getTableName() {
130
		return tableName;
131
	}
132

  
133
	public void setTableName(final String tableName) {
134
		this.tableName = tableName;
135
	}
136

  
137
	public UniqueServiceLocator getServiceLocator() {
138
		return serviceLocator;
139
	}
140

  
141
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
142
		this.serviceLocator = serviceLocator;
143
	}
144

  
145
	public ClaimDatabaseUtils getClaimDatabaseUtils() {
146
		return claimDatabaseUtils;
147
	}
148

  
149
	public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) {
150
		this.claimDatabaseUtils = claimDatabaseUtils;
151
	}
152
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNode.java
5 5
import com.googlecode.sarasvati.Arc;
6 6
import com.googlecode.sarasvati.NodeToken;
7 7
import eu.dnetlib.data.hadoop.rmi.HadoopService;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9 8
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
10 9
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
11 10
import eu.dnetlib.data.proto.KindProtos.Kind;
......
18 17
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
19 18
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
20 19
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
21
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
22
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
23 20
import eu.dnetlib.msro.rmi.MSROException;
24
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
import eu.dnetlib.msro.workflows.util.ProgressProvider;
27 21
import eu.dnetlib.utils.ontologies.OntologyLoader;
28 22
import org.apache.commons.codec.binary.Base64;
29 23
import org.apache.commons.logging.Log;
30 24
import org.apache.commons.logging.LogFactory;
31 25
import org.apache.hadoop.util.StringUtils;
32
import org.springframework.beans.factory.annotation.Autowired;
33 26

  
34 27
/**
35 28
 * Created by alessia on 23/10/15.
36 29
 */
37
public class ApplyClaimRelsJobNode extends SimpleJobNode implements ProgressJobNode {
30
public class ApplyClaimRelsJobNode extends AbstractClaimsToHBASE {
38 31

  
39 32
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
40 33

  
41 34
	private final String SEPARATOR = "_";
42 35

  
43
	@Autowired
44
	private UniqueServiceLocator serviceLocator;
45 36

  
46
	@Autowired
47
	private ClaimDatabaseUtils claimDatabaseUtils;
48

  
49
	private String sql;
50
	private String countQuery;
51
	private int total = 0;
52
	private int processed = 0;
53

  
54
	private String clusterName;
55

  
56
	private String tableName;
57

  
58
//	private String fetchSqlAsText(final String path) throws IOException {
59
//		return IOUtils.toString(getClass().getResourceAsStream(path));
60
//	}
61

  
62 37
	@Override
63 38
	protected String execute(NodeToken token) throws Exception {
64 39
		//TODO: use claim.claim_date from the claim db
65 40
		long timestamp = System.currentTimeMillis();
66
		total = this.claimDatabaseUtils.count(countQuery);
67
		List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
41
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
42
		List<Claim> claimRels = getClaimDatabaseUtils().query(getSql());
68 43
		int totalWrites = 0;
69 44
		int valid = 0;
70 45
		int discardedClaims = 0;
71 46

  
72
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
47
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
73 48

  
74 49
		for (Claim claim : claimRels) {
75 50
			log.debug(claim);
76 51
			try {
77
				String sourceId = fullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
78
				String targetId = fullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
52
				String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
53
				String targetId = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
79 54
				String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp);
80

  
81
			/*
82
			public void addHBaseColumn(final String clusterName,
83
			final String tableName,
84
			final String rowKey,
85
			final String columnFamily,
86
			final String qualifier,
87
			final String value)
88
			 */
89
				hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
90
				processed++;
55
				// adding relationship
56
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value);
91 57
				totalWrites++;
92 58

  
93 59
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
94 60
				String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp);
95
				hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue);
61
				//adding inverse relationship
62
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue);
96 63
				totalWrites++;
97
			}catch(IllegalArgumentException e){
98
				log.error("Discarding claim "+claim+". Cause: "+e.getMessage());
64
				incrementProcessed();
65
			} catch (IllegalArgumentException e) {
66
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
99 67
				discardedClaims++;
100 68
			}
101 69
		}
102 70

  
103
		log.info("totalClaims: " + total);
104
		token.getEnv().setAttribute("claimSize", total);
105
		log.info("writeOps: " + totalWrites);
106
		token.getEnv().setAttribute("writeOps", totalWrites);
107
		log.info("validClaims: " + valid);
108
		token.getEnv().setAttribute("validClaims", valid);
109
		log.info("discardedClaims: " + discardedClaims);
110
		token.getEnv().setAttribute("discardedClaims", discardedClaims);
71
		log.info("totalClaimRels: " + getTotal());
72
		token.getEnv().setAttribute("claimRelsSize", getTotal());
73
		log.info("claim rels writeOps: " + totalWrites);
74
		token.getEnv().setAttribute("claimRelsWriteOps", totalWrites);
75
		log.info("validClaimRels: " + valid);
76
		token.getEnv().setAttribute("validClaimRels", valid);
77
		log.info("discardedClaimRels: " + discardedClaims);
78
		token.getEnv().setAttribute("discardedClaimRels", discardedClaims);
111 79

  
112 80
		return Arc.DEFAULT_ARC;
113 81
	}
114 82

  
115
	protected String getOpenAIREType(final String type){
116
		switch(type){
117
		case "publication":
118
		case "dataset":
119
			return "result";
120
		default:
121
			return type;
122
		}
123
	}
124

  
125

  
126
	public String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{
83
	protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException {
127 84
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
128 85
		String[] relInfo = semantics.split(SEPARATOR);
129
		if(relInfo.length < 3){
130
			throw new MSROException("Semantics "+semantics+" not supported: must be splittable in 3 by '_'");
86
		if (relInfo.length < 3) {
87
			throw new MSROException("Semantics " + semantics + " not supported: must be splittable in 3 by '_'");
131 88
		}
132 89
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
133 90

  
......
171 128
							))));
172 129
			break;
173 130
		default:
174
			throw new MSROException("Semantics "+relInfo[0]+" not supported");
131
			throw new MSROException("Semantics " + relInfo[0] + " not supported");
175 132
		}
176 133

  
177 134
		builder.setRel(relBuilder);
178 135
		return Base64.encodeBase64String(builder.build().toByteArray());
179 136
	}
180

  
181
	private String fullId(final String type, final String id) {
182
		final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id);
183
		return OafRowKeyDecoder.decode(fullId).getKey();
184
	}
185

  
186
	public String getClusterName() {
187
		return clusterName;
188
	}
189

  
190
	public void setClusterName(final String clusterName) {
191
		this.clusterName = clusterName;
192
	}
193

  
194
	public String getTableName() {
195
		return tableName;
196
	}
197

  
198
	public void setTableName(final String tableName) {
199
		this.tableName = tableName;
200
	}
201

  
202
	public UniqueServiceLocator getServiceLocator() {
203
		return serviceLocator;
204
	}
205

  
206
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
207
		this.serviceLocator = serviceLocator;
208
	}
209

  
210
	public ClaimDatabaseUtils getClaimDatabaseUtils() {
211
		return claimDatabaseUtils;
212
	}
213

  
214
	public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) {
215
		this.claimDatabaseUtils = claimDatabaseUtils;
216
	}
217

  
218
	public String getSql() {
219
		return sql;
220
	}
221

  
222
	public void setSql(final String sql) {
223
		this.sql = sql;
224
	}
225

  
226
	public String getCountQuery() {
227
		return countQuery;
228
	}
229

  
230
	public void setCountQuery(final String countQuery) {
231
		this.countQuery = countQuery;
232
	}
233

  
234
	@Override
235
	public ProgressProvider getProgressProvider() {
236
		return new ProgressProvider() {
237

  
238
			@Override
239
			public boolean isInaccurate() {
240
				return false;
241
			}
242

  
243
			@Override
244
			public int getTotalValue() {
245
				return total;
246
			}
247

  
248
			@Override
249
			public int getCurrentValue() {
250
				return processed;
251
			}
252
		};
253
	}
254 137
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimUpdatesJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2 2

  
3
import java.io.IOException;
4 3
import java.util.List;
5 4

  
6 5
import com.googlecode.sarasvati.Arc;
7 6
import com.googlecode.sarasvati.NodeToken;
8 7
import eu.dnetlib.data.hadoop.rmi.HadoopService;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
11
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
12 8
import eu.dnetlib.data.proto.KindProtos.Kind;
13 9
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafRel;
15
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
16
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
17
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
18
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject;
19
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
20
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
21
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
22
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
10
import eu.dnetlib.data.proto.OafProtos.OafEntity;
11
import eu.dnetlib.data.proto.ResultProtos.Result;
12
import eu.dnetlib.data.proto.ResultProtos.Result.Context;
13
import eu.dnetlib.data.proto.TypeProtos.Type;
24 14
import eu.dnetlib.msro.rmi.MSROException;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
import eu.dnetlib.utils.ontologies.OntologyLoader;
27 15
import org.apache.commons.codec.binary.Base64;
28
import org.apache.commons.io.IOUtils;
29 16
import org.apache.commons.logging.Log;
30 17
import org.apache.commons.logging.LogFactory;
31 18
import org.apache.hadoop.util.StringUtils;
32
import org.springframework.beans.factory.annotation.Autowired;
33 19

  
34 20
/**
35 21
 * Created by alessia on 23/10/15.
36 22
 */
37
public class ApplyClaimUpdatesJobNode extends SimpleJobNode {
23
public class ApplyClaimUpdatesJobNode extends AbstractClaimsToHBASE {
38 24

  
39 25
	private static final Log log = LogFactory.getLog(ApplyClaimUpdatesJobNode.class);
40 26

  
41
	private final String SEPARATOR = "_";
42

  
43
	@Autowired
44
	private UniqueServiceLocator serviceLocator;
45

  
46
	@Autowired
47
	private ClaimDatabaseUtils claimDatabaseUtils;
48

  
49
	private String sql;
50

  
51
	private String clusterName;
52

  
53
	private String tableName;
54

  
55
	private String fetchSqlAsText(final String path) throws IOException {
56
		return IOUtils.toString(getClass().getResourceAsStream(path));
57
	}
58

  
59 27
	@Override
60 28
	protected String execute(NodeToken token) throws Exception {
61 29
		//TODO: use claim.claim_date from the claim db
62 30
		long timestamp = System.currentTimeMillis();
63 31

  
64
		List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
32
		List<Claim> claimUpdates = this.getClaimDatabaseUtils().query(getSql());
65 33
		int totalClaims = 0;
66 34
		int totalWrites = 0;
35
		int discardedClaims = 0;
67 36

  
68
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
37
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
69 38

  
70
		for (Claim claim : claimRels) {
39
		for (Claim claim : claimUpdates) {
40
			try{
71 41
			log.debug(claim);
72 42
			totalClaims++;
73 43
			String contextId = claim.getSource();
74
			String targetId = fullId(claim.getTargetType(), claim.getTarget());
44
			String rowKey = getFullId(getOpenAIREType(claim.getTargetType()), claim.getTarget());
75 45

  
76
			String value = getValue(contextId, targetId, claim.getSemantics(), timestamp);
77
			/*
78
			public void addHBaseColumn(final String clusterName,
79
			final String tableName,
80
			final String rowKey,
81
			final String columnFamily,
82
			final String qualifier,
83
			final String value)
84
			 */
85
			hadoopService.addHBaseColumn(clusterName, tableName, contextId, claim.getSemantics(), targetId, value);
46
			String value = getValue(contextId, rowKey, claim.getSemantics(), timestamp);
47
			hadoopService.addHBaseColumn(getClusterName(), getTableName(), rowKey, "result", "update_" + System.nanoTime(), value);
86 48
			totalWrites++;
87

  
88
			String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
89
			String inverseValue = getValue(targetId, contextId, inverseSemantics, timestamp);
90
			hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, contextId, inverseValue);
91
			totalWrites++;
92

  
49
			incrementProcessed();
50
			} catch (IllegalArgumentException e) {
51
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
52
				discardedClaims++;
53
			}
93 54
		}
94 55

  
95
		log.info("totalClaims: " + totalClaims);
96
		token.getEnv().setAttribute("claimSize", totalClaims);
97
		log.info("writeOps: " + totalWrites);
98
		token.getEnv().setAttribute("writeOps", totalWrites);
56
		log.info("Total Claim Updates: " + totalClaims);
57
		token.getEnv().setAttribute("claimUpdatesSize", totalClaims);
58
		log.info("Claim updates writeOps: " + totalWrites);
59
		token.getEnv().setAttribute("claimUpdatesWriteOps", totalWrites);
60
		log.info("Discarded Claim Updates: " + discardedClaims);
61
		token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims);
99 62

  
100 63
		return Arc.DEFAULT_ARC;
101 64
	}
102 65

  
103

  
104
	private String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{
66
	protected String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException {
105 67
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
106
		String[] relInfo = semantics.split(SEPARATOR);
107
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
108 68

  
109
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp);
110
		builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false)
111
				.setProvenanceaction(
112
						Qualifier.newBuilder()
113
								.setClassid("user:claim")
114
								.setClassname("user:claim")
115
								.setSchemeid("dnet:provenanceActions")
116
								.setSchemename("dnet:provenanceActions")
117
				));
69
		Result.Builder resultBuilder = Result.newBuilder().setMetadata(Result.Metadata.newBuilder().addContext(getContext(sourceId)));
70
		OafEntity.Builder entityBuilder = OafEntity.newBuilder().setId(targetId).setType(Type.result).setResult(resultBuilder);
71
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.entity).setLastupdatetimestamp(timestamp).setEntity(entityBuilder);
118 72

  
119
		final SubRelType subRelType = SubRelType.valueOf(relInfo[1]);
120
		final OafRel.Builder relBuilder = OafRel.newBuilder()
121
				.setSubRelType(subRelType)
122
				.setRelClass(relInfo[2])
123
				.setRelType(RelType.valueOf(relInfo[0]))
124
				.setSource(sourceId).setTarget(targetId).setChild(false);
125

  
126
		switch (relInfo[0]) {
127
		case "resultProject":
128

  
129
			relBuilder.setResultProject(ResultProject.newBuilder()
130
					.setOutcome(Outcome.newBuilder().setRelMetadata(
131
							RelMetadata.newBuilder().setSemantics(
132
									semanticsBuilder
133
											.setSchemeid("dnet:result_project_relations")
134
											.setSchemename("dnet:result_project_relations")
135
											.build()
136
							))));
137
			break;
138
		case "resultResult_publicationDataset_isRelatedTo":
139
			relBuilder.setResultResult(ResultResult.newBuilder()
140
					.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata(
141
							RelMetadata.newBuilder().setSemantics(
142
									semanticsBuilder
143
											.setSchemeid("dnet:result_result_relations")
144
											.setSchemename("dnet:result_result_relations")
145
											.build()
146
							))));
147
			break;
148
		default:
149
			throw new MSROException("Semantics "+relInfo[0]+" not supported");
150
		}
151

  
152
		builder.setRel(relBuilder);
153 73
		return Base64.encodeBase64String(builder.build().toByteArray());
154 74
	}
155 75

  
156
	private String fullId(final String type, final String id) {
157
		final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id);
158
		return OafRowKeyDecoder.decode(fullId).getKey();
76
	private Context getContext(final String sourceId) {
77
		return Context.newBuilder().setDataInfo(getDataInfo()).setId(sourceId).build();
159 78
	}
160

  
161
	public String getClusterName() {
162
		return clusterName;
163
	}
164

  
165
	public void setClusterName(final String clusterName) {
166
		this.clusterName = clusterName;
167
	}
168

  
169
	public String getTableName() {
170
		return tableName;
171
	}
172

  
173
	public void setTableName(final String tableName) {
174
		this.tableName = tableName;
175
	}
176

  
177
	public UniqueServiceLocator getServiceLocator() {
178
		return serviceLocator;
179
	}
180

  
181
	public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
182
		this.serviceLocator = serviceLocator;
183
	}
184

  
185
	public ClaimDatabaseUtils getClaimDatabaseUtils() {
186
		return claimDatabaseUtils;
187
	}
188

  
189
	public void setClaimDatabaseUtils(final ClaimDatabaseUtils claimDatabaseUtils) {
190
		this.claimDatabaseUtils = claimDatabaseUtils;
191
	}
192

  
193
	public String getSql() {
194
		return sql;
195
	}
196

  
197
	public void setSql(final String sql) {
198
		this.sql = sql;
199
	}
200 79
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ClaimDatabaseUtils.java
2 2

  
3 3
import java.util.List;
4 4

  
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
7
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
8
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
5 9
import eu.dnetlib.enabling.database.DataSourceFactory;
6 10
import eu.dnetlib.enabling.database.utils.JdbcTemplateFactory;
7 11
import org.springframework.beans.factory.annotation.Autowired;
......
38 42
		return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).queryForObject(sqlCountQuery, Integer.class);
39 43
	}
40 44

  
41

  
42 45
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/hbase/tmp_claims2hbase.xml
11 11
        <WORKFLOW_TYPE>Data Load</WORKFLOW_TYPE>
12 12
        <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
13 13
        <CONFIGURATION start="manual">
14
            <NODE isStart="true" name="start">
15
                <DESCRIPTION>start</DESCRIPTION>
16
                <PARAMETERS/>
17
                <ARCS>
18
                    <ARC to="queryClaimRels"/>
19
                </ARCS>
20
            </NODE>
21
            <NODE name="applyClaimRels" type="ApplyClaimRels">
14
            <NODE name="applyClaimRels" type="ApplyClaimRels" isStart="true">
22 15
                <DESCRIPTION>Apply Claim Rels</DESCRIPTION>
23 16
                <PARAMETERS>
24 17
                    <PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM>
25 18
                    <PARAM managedBy="user" name="tableName" required="true" type="string"></PARAM>
26 19
                    <PARAM managedBy="user" name="sql" required="true" type="string">SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM>
27
                    <PARAM managedBy="user" name="countSql" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM>
20
                    <PARAM managedBy="user" name="countQuery" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM>
28 21
                </PARAMETERS>
29 22
                <ARCS>
30 23
                    <ARC to="applyClaimUpdates"/>
......
36 29
	                <PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM>
37 30
	                <PARAM managedBy="user" name="tableName" required="true" type="string"></PARAM>
38 31
                    <PARAM managedBy="user" name="sql" required="true" type="string">SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM>
39
                    <PARAM managedBy="user" name="countSql" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM>
32
                    <PARAM managedBy="user" name="countQuery" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM>
40 33
                </PARAMETERS>
41 34
                <ARCS>
42 35
                    <ARC to="success"/>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/applicationContext-claims.xml
1 1
<?xml version="1.0" encoding="UTF-8"?>
2 2
<beans xmlns="http://www.springframework.org/schema/beans"
3
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
4
	xmlns:p="http://www.springframework.org/schema/p" xmlns:util="http://www.springframework.org/schema/util"
5
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
6
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
7
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
8
		
9
	<!--<bean id="claimsJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"-->
10
		<!--p:dataSource-ref="claimsDatasource"/>-->
3
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
	xmlns:p="http://www.springframework.org/schema/p"
5
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
6

  
11 7
	<bean id="claimsJdbcTemplateFactory"
12 8
	      class="eu.dnetlib.enabling.database.utils.JdbcTemplateFactory"
13 9
	      p:dataSourceFactory-ref="claimsDataSourceFactory" />
14 10

  
15
	<!--<bean id="claimsDatasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">-->
16
		<!--<property name="driverClassName" value="org.postgresql.Driver"/>-->
17
		<!--<property name="url" value="${dnet.openaire.claims.db.url}"/>-->
18
		<!--<property name="username" value="${dnet.openaire.claims.db.username}"/>-->
19
		<!--<property name="password" value="${dnet.openaire.claims.db.password}"/>-->
20
	<!--</bean>-->
21 11
	<bean id="claimsDataSourceFactory"
22 12
	      class="eu.dnetlib.enabling.database.DataSourceFactoryImpl"
23 13
	      p:driverClassName="org.postgresql.Driver"
......
25 15
	      p:username="${dnet.openaire.claims.db.username}"
26 16
	      p:password="${dnet.openaire.claims.db.password}" />
27 17

  
18
	<bean id="claimDatabaseUtils" class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ClaimDatabaseUtils" />
28 19

  
29
	<bean id="claimDatabaseUtils" class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ClaimDatabaseUtils" />
30 20
</beans>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
313 313
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimRelsJobNode"
314 314
          scope="prototype"/>
315 315

  
316
    <bean id="wfNodeApplyClaimUpdates"
317
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimUpdatesJobNode"
318
          scope="prototype"/>
319

  
316 320
</beans>

Also available in: Unified diff