Project

General

Profile

« Previous | Next » 

Revision 47679

Classes and configuration for writing claimed relationships into HBASE

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

  
3
import java.io.IOException;
4
import java.util.List;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8
import eu.dnetlib.data.hadoop.rmi.HadoopService;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
11
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
12
import eu.dnetlib.data.proto.KindProtos.Kind;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafRel;
15
import eu.dnetlib.data.proto.RelMetadataProtos.RelMetadata;
16
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
17
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
18
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject;
19
import eu.dnetlib.data.proto.ResultProjectProtos.ResultProject.Outcome;
20
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult;
21
import eu.dnetlib.data.proto.ResultResultProtos.ResultResult.PublicationDataset;
22
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.msro.rmi.MSROException;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
import eu.dnetlib.utils.ontologies.OntologyLoader;
27
import org.apache.commons.codec.binary.Base64;
28
import org.apache.commons.io.IOUtils;
29
import org.apache.commons.logging.Log;
30
import org.apache.commons.logging.LogFactory;
31
import org.apache.hadoop.util.StringUtils;
32
import org.springframework.beans.factory.annotation.Autowired;
33

  
34
/**
35
 * Created by alessia on 23/10/15.
36
 */
37
public class ApplyClaimRelsJobNode extends SimpleJobNode {
38

  
39
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
40

  
41
	private final String SEPARATOR = "_";
42

  
43
	@Autowired
44
	private UniqueServiceLocator serviceLocator;
45

  
46
	@Autowired
47
	private ClaimDatabaseUtils claimDatabaseUtils;
48

  
49
	private String sql;
50

  
51
	private String clusterName;
52

  
53
	private String tableName;
54

  
55
	private String fetchSqlAsText(final String path) throws IOException {
56
		return IOUtils.toString(getClass().getResourceAsStream(path));
57
	}
58

  
59
	@Override
60
	protected String execute(NodeToken token) throws Exception {
61
		//TODO: use claim.claim_date from the claim db
62
		long timestamp = System.currentTimeMillis();
63

  
64
		List<ClaimRel> claimRels = this.claimDatabaseUtils.query(sql);
65
		int totalClaims = 0;
66
		int totalWrites = 0;
67

  
68
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
69

  
70
		for (ClaimRel claim : claimRels) {
71
			log.debug(claim);
72
			totalClaims++;
73
			String sourceId = fullId(claim.getSourceType(), claim.getSource());
74
			String targetId = fullId(claim.getTargetType(), claim.getTarget());
75
			String value = getValue(sourceId, targetId, claim.getSemantics(), timestamp);
76
			/*
77
			public void addHBaseColumn(final String clusterName,
78
			final String tableName,
79
			final String rowKey,
80
			final String columnFamily,
81
			final String qualifier,
82
			final String value)
83
			 */
84
			hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
85
			totalWrites++;
86

  
87
			String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
88
			String inverseValue = getValue(targetId, sourceId, inverseSemantics, timestamp);
89
			hadoopService.addHBaseColumn(clusterName, tableName, targetId, inverseSemantics, sourceId, inverseValue);
90
			totalWrites++;
91

  
92
		}
93

  
94
		log.info("totalClaims: " + totalClaims);
95
		token.getEnv().setAttribute("claimSize", totalClaims);
96
		log.info("writeOps: " + totalWrites);
97
		token.getEnv().setAttribute("writeOps", totalWrites);
98

  
99
		return Arc.DEFAULT_ARC;
100
	}
101

  
102

  
103
	private String getValue(final String sourceId, final String semantics, final String targetId, final long timestamp) throws MSROException{
104
		log.debug(StringUtils.format("%s -- %s -- %s", sourceId, semantics, targetId));
105
		String[] relInfo = semantics.split(SEPARATOR);
106
		Qualifier.Builder semanticsBuilder = Qualifier.newBuilder().setClassid(relInfo[2]).setClassname(relInfo[2]);
107

  
108
		Oaf.Builder builder = Oaf.newBuilder().setKind(Kind.relation).setLastupdatetimestamp(timestamp);
109
		builder.setDataInfo(DataInfo.newBuilder().setTrust("0.91").setInferred(false)
110
				.setProvenanceaction(
111
						Qualifier.newBuilder()
112
								.setClassid("user:claim")
113
								.setClassname("user:claim")
114
								.setSchemeid("dnet:provenanceActions")
115
								.setSchemename("dnet:provenanceActions")
116
				));
117

  
118
		final SubRelType subRelType = SubRelType.valueOf(relInfo[1]);
119
		final OafRel.Builder relBuilder = OafRel.newBuilder()
120
				.setSubRelType(subRelType)
121
				.setRelClass(relInfo[2])
122
				.setRelType(RelType.valueOf(relInfo[0]))
123
				.setSource(sourceId).setTarget(targetId).setChild(false);
124

  
125
		switch (relInfo[0]) {
126
		case "resultProject":
127

  
128
			relBuilder.setResultProject(ResultProject.newBuilder()
129
					.setOutcome(Outcome.newBuilder().setRelMetadata(
130
							RelMetadata.newBuilder().setSemantics(
131
									semanticsBuilder
132
											.setSchemeid("dnet:result_project_relations")
133
											.setSchemename("dnet:result_project_relations")
134
											.build()
135
							))));
136
			break;
137
		case "resultResult_publicationDataset_isRelatedTo":
138
			relBuilder.setResultResult(ResultResult.newBuilder()
139
					.setPublicationDataset(PublicationDataset.newBuilder().setRelMetadata(
140
							RelMetadata.newBuilder().setSemantics(
141
									semanticsBuilder
142
											.setSchemeid("dnet:result_result_relations")
143
											.setSchemename("dnet:result_result_relations")
144
											.build()
145
							))));
146
			break;
147
		default:
148
			throw new MSROException("Semantics "+relInfo[0]+" not supported");
149
		}
150

  
151
		builder.setRel(relBuilder);
152
		return Base64.encodeBase64String(builder.build().toByteArray());
153
	}
154

  
155
	private String fullId(final String type, final String id) {
156
		final String fullId = AbstractDNetXsltFunctions.oafSimpleId(type, id);
157
		return OafRowKeyDecoder.decode(fullId).getKey();
158
	}
159

  
160
	public String getClusterName() {
161
		return clusterName;
162
	}
163

  
164
	public void setClusterName(final String clusterName) {
165
		this.clusterName = clusterName;
166
	}
167

  
168
	public String getTableName() {
169
		return tableName;
170
	}
171

  
172
	public void setTableName(final String tableName) {
173
		this.tableName = tableName;
174
	}
175

  
176
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ClaimRel.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

  
3
/**
4
 * Created by Alessia Bardi on 23/06/2017.
5
 *
6
 * @author Alessia Bardi
7
 */
8
public class ClaimRel {
9

  
10
	private String source, sourceType, target, targetType, semantics;
11

  
12
	public String getSource() {
13
		return source;
14
	}
15

  
16
	public ClaimRel setSource(final String source) {
17
		this.source = source;
18
		return this;
19
	}
20

  
21
	public String getSourceType() {
22
		return sourceType;
23
	}
24

  
25
	public ClaimRel setSourceType(final String sourceType) {
26
		this.sourceType = sourceType;
27
		return this;
28
	}
29

  
30
	public String getTarget() {
31
		return target;
32
	}
33

  
34
	public ClaimRel setTarget(final String target) {
35
		this.target = target;
36
		return this;
37
	}
38

  
39
	public String getTargetType() {
40
		return targetType;
41
	}
42

  
43
	public ClaimRel setTargetType(final String targetType) {
44
		this.targetType = targetType;
45
		return this;
46
	}
47

  
48
	public String getSemantics() {
49
		return semantics;
50
	}
51

  
52
	public ClaimRel setSemantics(final String semantics) {
53
		this.semantics = semantics;
54
		return this;
55
	}
56

  
57
	@Override
58
	public String toString() {
59
		return "ClaimRel{" +
60
				"source='" + source + '\'' +
61
				", sourceType='" + sourceType + '\'' +
62
				", target='" + target + '\'' +
63
				", targetType='" + targetType + '\'' +
64
				", semantics='" + semantics + '\'' +
65
				'}';
66
	}
67
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ClaimDatabaseUtils.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2

  
3
import java.util.List;
4

  
5
import org.springframework.beans.factory.annotation.Autowired;
6
import org.springframework.jdbc.core.JdbcTemplate;
7
import org.springframework.stereotype.Component;
8

  
9
/**
10
 * Created by Alessia Bardi on 23/06/2017.
11
 *
12
 * @author Alessia Bardi
13
 */
14
@Component
15
public class ClaimDatabaseUtils {
16

  
17
	@Autowired
18
	JdbcTemplate claimsJdbcTemplate;
19

  
20
	public List<ClaimRel> query(String sqlQuery){
21
		return this.claimsJdbcTemplate.query(sqlQuery,
22
				(rs, rowNum) -> new ClaimRel().setSemantics(rs.getString("semantics"))
23
						.setSource(rs.getString("source_id")).setSourceType(rs.getString("source_type"))
24
						.setTarget(rs.getString("target_id")).setTargetType(rs.getString("target_type")));
25
	}
26

  
27

  
28
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/utils/ontologies/OntologyLoader.java
1
package eu.dnetlib.utils.ontologies;
2

  
3
import javax.annotation.PostConstruct;
4

  
5
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
6
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
7
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
8
import org.springframework.beans.factory.annotation.Autowired;
9
import org.springframework.stereotype.Component;
10

  
11
/**
12
 * Created by claudio on 12/12/2016.
13
 */
14
@Component
15
public class OntologyLoader {
16

  
17
	private static UniqueServiceLocator staticServiceLocator;
18

  
19
	@Autowired
20
	private UniqueServiceLocator serviceLocator;
21

  
22
// <TERM code="merges" encoding="organizationOrganization_dedup_merges" english_name="merges" native_name="merges">
23
	public static String fetchInverse(final String relType) throws ISLookUpException {
24
		final String xquery = "let $x:= /RESOURCE_PROFILE["
25
				+ " .//RESOURCE_TYPE/@value = 'OntologyDSResourceType' and "
26
				+ " .//TERM/@encoding='"+relType+"']"
27
				+ "let $y:= $x//TERM[./@encoding='"+relType+"']//RELATION[./@type='inverseOf']/@code/string() "
28
				+ "return $x//TERM[./@code = $y]/@encoding/string()";
29
		return staticServiceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
30
	}
31

  
32
	@PostConstruct
33
	public void init() {
34
		OntologyLoader.staticServiceLocator = serviceLocator;
35
	}
36

  
37
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/hbase/tmp_claims2hbase.xml
1
<RESOURCE_PROFILE>
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="9bd3c6d5-4be1-4209-967c-02371827b119_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
4
        <RESOURCE_TYPE value="WorkflowDSResourceType"/>
5
        <RESOURCE_KIND value="WorkflowDSResources"/>
6
        <RESOURCE_URI value=""/>
7
        <DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/>
8
    </HEADER>
9
    <BODY>
10
        <WORKFLOW_NAME>[TMP] Claim to HBase -- merge the nodes in db2hbase</WORKFLOW_NAME>
11
        <WORKFLOW_TYPE>Data Load</WORKFLOW_TYPE>
12
        <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
13
        <CONFIGURATION start="manual">
14
            <NODE isStart="true" name="start">
15
                <DESCRIPTION>start</DESCRIPTION>
16
                <PARAMETERS/>
17
                <ARCS>
18
                    <ARC to="queryClaimRels"/>
19
                </ARCS>
20
            </NODE>
21
            <NODE name="applyClaimRels" type="ApplyClaimRels">
22
                <DESCRIPTION>Apply Claim Rels</DESCRIPTION>
23
                <PARAMETERS>
24
                    <PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM>
25
                    <PARAM managedBy="system" name="tableName" required="true" type="string">db_openaireplus_services_beta</PARAM>
26
                    <PARAM managedBy="system" name="sql" required="true" type="string">/eu/dnetlib/msro/openaireplus/workflows/hbase/queryClaimsRel.sql</PARAM>
27
                </PARAMETERS>
28
                <ARCS>
29
                    <ARC to="applyClaimUpdates"/>
30
                </ARCS>
31
            </NODE>
32
            <NODE name="applyClaimUpdates" type="ApplyClaimUpdates">
33
                <DESCRIPTION>query Claim updates</DESCRIPTION>
34
                <PARAMETERS>
35
	                <PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM>
36
	                <PARAM managedBy="system" name="tableName" required="true" type="string">db_openaireplus_services_beta</PARAM>
37
                    <PARAM managedBy="system" name="sql" required="true" type="string">/eu/dnetlib/msro/openaireplus/workflows/hbase/queryClaimsUpdate.sql</PARAM>
38
                </PARAMETERS>
39
                <ARCS>
40
                    <ARC to="success"/>
41
                </ARCS>
42
            </NODE>
43
        </CONFIGURATION>
44
        <STATUS>
45
        </STATUS>
46
    </BODY>
47
</RESOURCE_PROFILE>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/hbase/queryClaimsRel.sql
1
SELECT xml, provenance FROM claims WHERE type = 'rels2actions' and set = 'userclaim_result_project'	or set = 'userclaim_result_result'
1
--- SELECT xml, provenance FROM claims WHERE type = 'rels2actions' and set = 'userclaim_result_project'	or set = 'userclaim_result_result'
2

  
3
SELECT source_type, source_id, target_type, target_id, semantics from claim where approved=true and source_type !='context';
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/hbase/queryClaimsUpdate.sql
1
SELECT
2
	regexp_replace(xml, '<\?xml version="1\.0" encoding="UTF-8"\?>', '', 'i') AS xml,
3
	provenance
4
FROM claims
5
WHERE type = 'updates2actions' AND set = 'userclaim_dmf'
1
-- SELECT
2
-- 	regexp_replace(xml, '<\?xml version="1\.0" encoding="UTF-8"\?>', '', 'i') AS xml,
3
-- 	provenance
4
-- FROM claims
5
-- WHERE type = 'updates2actions' AND set = 'userclaim_dmf'
6 6

  
7
SELECT source_type, source_id, target_type, target_id, semantics from claim where approved=true and source_type ='context';
7 8

  
9

  
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/applicationContext-dnet-openaire-claims.properties
1
dnet.openaire.claims.db.url=jdbc:postgresql://localhost:5432/dnet_openaireplus
2
dnet.openaire.claims.db.username=dnet
3
dnet.openaire.claims.db.password=dnetPwd
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/applicationContext-claims.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<beans xmlns="http://www.springframework.org/schema/beans"
3
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
4
	xmlns:p="http://www.springframework.org/schema/p" xmlns:util="http://www.springframework.org/schema/util"
5
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
6
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
7
		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
8
		
9
	<bean id="claimsJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"
10
		p:dataSource-ref="claimsDatasource"/>
11

  
12
	<bean id="claimsDatasource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
13
		<property name="driverClassName" value="org.postgresql.Driver"/>
14
		<property name="url" value="${dnet.openaire.claims.db.url}"/>
15
		<property name="username" value="${dnet.openaire.claims.db.username}"/>
16
		<property name="password" value="${dnet.openaire.claims.db.password}"/>
17
	</bean>
18
</beans>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
309 309
          class="eu.dnetlib.functionality.modular.ui.workflows.values.ListActionManagerSetsValues"
310 310
          p:name="actionSets"/>
311 311

  
312
    <bean id="wfNodeApplyClaimRels"
313
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimRelsJobNode"
314
          scope="prototype"/>
315

  
312 316
</beans>
modules/dnet-openaireplus-workflows/trunk/pom.xml
159 159
            <artifactId>mockito-core</artifactId>
160 160
            <version>1.9.5</version>
161 161
        </dependency>
162
        <dependency>
163
            <groupId>org.springframework</groupId>
164
            <artifactId>spring-jdbc</artifactId>
165
            <version>${spring.version}</version>
166
        </dependency>
162 167
	</dependencies>
163 168

  
164 169
</project>

Also available in: Unified diff