Project

General

Profile

« Previous | Next » 

Revision 36298

[maven-release-plugin] copy for tag dnet-deduplication-1.0.1

View differences:

modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/pom.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3
	<parent>
4
		<groupId>eu.dnetlib</groupId>
5
		<artifactId>dnet-parent</artifactId>
6
		<version>1.0.0</version>
7
		<relativePath />
8
	</parent>
9
	<modelVersion>4.0.0</modelVersion>
10
	<groupId>eu.dnetlib</groupId>
11
	<artifactId>dnet-deduplication</artifactId>
12
	<packaging>jar</packaging>
13
	<version>1.0.1</version>
14
	<scm>
15
		<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-deduplication/tags/dnet-deduplication-1.0.1</developerConnection>
16
	</scm>
17
	<dependencies>
18
		<dependency>
19
			<groupId>eu.dnetlib</groupId>
20
			<artifactId>dnet-msro-service</artifactId>
21
			<version>[3.0.0,4.0.0)</version>
22
		</dependency>
23
		<dependency>
24
			<groupId>eu.dnetlib</groupId>
25
			<artifactId>dnet-hadoop-service-rmi</artifactId>
26
			<version>[1.0.0,2.0.0)</version>
27
		</dependency>
28
		<dependency>
29
			<groupId>eu.dnetlib</groupId>
30
			<artifactId>dnet-actionmanager-api</artifactId>
31
			<version>[3.0.0,4.0.0)</version>
32
		</dependency>
33
		<dependency>
34
			<groupId>eu.dnetlib</groupId>
35
			<artifactId>dnet-modular-ui</artifactId>
36
			<version>[3.0.0,4.0.0)</version>
37
		</dependency>		
38

  
39
		<dependency>
40
			<groupId>eu.dnetlib</groupId>
41
			<artifactId>dnet-index-solr-client</artifactId>
42
			<version>[2.0.0,3.0.0)</version>
43
		</dependency>	
44

  
45
		<dependency>
46
			<groupId>eu.dnetlib</groupId>
47
			<artifactId>dnet-openaireplus-mapping-utils</artifactId>
48
			<version>[3.0.0,4.0.0)</version>
49
		</dependency>
50

  
51

  
52
		<dependency>
53
			<groupId>javax.servlet</groupId>
54
			<artifactId>javax.servlet-api</artifactId>
55
			<version>${javax.servlet.version}</version>
56
			<scope>provided</scope>
57
		</dependency>
58
		<dependency>
59
			<groupId>com.fasterxml.jackson.core</groupId>
60
			<artifactId>jackson-databind</artifactId>
61
			<version>2.4.3</version>
62
		</dependency>
63
		<dependency>
64
			<groupId>com.google.guava</groupId>
65
			<artifactId>guava</artifactId>
66
			<version>${google.guava.version}</version>
67
		</dependency>
68
			
69
	</dependencies>
70
</project>
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/test/java/eu/dnetlib/msro/workflows/dedup/SimilarityMeshBuilderTest.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import java.util.List;
4

  
5
import org.junit.Before;
6
import org.junit.Test;
7

  
8
import com.google.common.collect.Lists;
9

  
10
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityMeshBuilder;
11
import eu.dnetlib.miscutils.collections.Pair;
12

  
13
public class SimilarityMeshBuilderTest {
14

  
15
	private List<String> list;
16

  
17
	@Before
18
	public void setUp() throws Exception {
19
		list = Lists.newArrayList();
20
		for (int i = 0; i < 10; i++) {
21
			list.add(i + "");
22
		}
23
	}
24

  
25
	@Test
26
	public void test() {
27
		final List<Pair<String, String>> combinations = SimilarityMeshBuilder.build("50", list);
28

  
29
		System.out.println(combinations);
30
		System.out.println(combinations.size());
31

  
32
	}
33

  
34
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/utils/SimilarityMeshBuilder.java
1
package eu.dnetlib.msro.workflows.hadoop.utils;
2

  
3
import java.util.List;
4

  
5
import com.google.common.collect.Lists;
6

  
7
import eu.dnetlib.miscutils.collections.Pair;
8

  
9
public class SimilarityMeshBuilder {
10

  
11
	public static List<Pair<String, String>> build(final String typePrefix, final List<String> list) {
12
		final List<Pair<String, String>> res = Lists.newArrayList();
13
		for (int i = 0; i < list.size(); i++) {
14
			for (int j = 0; j < list.size(); j++) {
15

  
16
				if (!list.get(i).equals(list.get(j))) {
17

  
18
					final String source = typePrefix + "|" + list.get(i);
19
					final String target = typePrefix + "|" + list.get(j);
20
					res.add(new Pair<String, String>(source, target));
21
				}
22
			}
23
		}
24
		return res;
25
	}
26
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/dedup/BuildSimilarityMeshJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.List;
6
import java.util.Queue;
7

  
8
import javax.annotation.Resource;
9
import javax.xml.ws.wsaddressing.W3CEndpointReference;
10

  
11
import org.antlr.stringtemplate.StringTemplate;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.Document;
15
import org.dom4j.DocumentException;
16
import org.dom4j.Node;
17
import org.dom4j.io.SAXReader;
18
import org.springframework.beans.factory.annotation.Required;
19

  
20
import com.google.common.collect.Lists;
21
import com.google.common.collect.Queues;
22
import com.googlecode.sarasvati.Arc;
23
import com.googlecode.sarasvati.NodeToken;
24

  
25
import eu.dnetlib.data.proto.TypeProtos.Type;
26
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
27
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
28
import eu.dnetlib.miscutils.collections.Pair;
29
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder;
30
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
31

  
32
public class BuildSimilarityMeshJobNode extends AsyncJobNode {
33

  
34
	private static final Log log = LogFactory.getLog(BuildSimilarityMeshJobNode.class);
35

  
36
	/** The result set factory. */
37
	@Resource(name = "iterableResultSetFactory")
38
	private IterableResultSetFactory resultSetFactory;
39

  
40
	/** The result set client factory. */
41
	@Resource(name = "resultSetClientFactory")
42
	private ResultSetClientFactory resultSetClientFactory;
43

  
44
	private StringTemplate similarity;
45

  
46
	private String inputEprParam;
47

  
48
	private String outputEprParam;
49

  
50
	@Override
51
	protected String execute(final NodeToken token) throws Exception {
52

  
53
		final String inputEpr = token.getEnv().getAttribute(getInputEprParam());
54

  
55
		final Iterator<String> rsClient = resultSetClientFactory.getClient(inputEpr).iterator();
56
		final Queue<Object> queue = Queues.newLinkedBlockingQueue();
57
		final SAXReader reader = new SAXReader();
58

  
59
		if (rsClient.hasNext()) {
60
			populateQueue(queue, reader, rsClient.next());
61
		}
62

  
63
		final W3CEndpointReference eprOut = resultSetFactory.createIterableResultSet(new Iterable<String>() {
64

  
65
			@Override
66
			public Iterator<String> iterator() {
67
				return new Iterator<String>() {
68

  
69
					@Override
70
					public boolean hasNext() {
71
						synchronized (queue) {
72
							return !queue.isEmpty();
73
						}
74
					}
75

  
76
					@Override
77
					@SuppressWarnings("unchecked")
78
					public String next() {
79
						synchronized (queue) {
80
							final Object o = queue.poll();
81
							while (queue.isEmpty() && rsClient.hasNext()) {
82
								populateQueue(queue, reader, rsClient.next());
83
							}
84
							return buildSimilarity((Pair<String, String>) o);
85
						}
86
					}
87

  
88
					@Override
89
					public void remove() {
90
						throw new UnsupportedOperationException();
91
					}
92
				};
93
			}
94
		});
95

  
96
		token.getEnv().setAttribute(getOutputEprParam(), eprOut.toString());
97

  
98
		return Arc.DEFAULT_ARC;
99
	}
100

  
101
	private void populateQueue(final Queue<Object> q, final SAXReader r, final String xml) {
102
		try {
103
			final Document d = r.read(new StringReader(xml));
104
			final String groupid = d.valueOf("//FIELD[@name='id']");
105
			final List<?> items = d.selectNodes("//FIELD[@name='group']/ITEM");
106
			final String entitytype = d.valueOf("//FIELD[@name='entitytype']");
107
			final List<String> group = Lists.newArrayList();
108
			for (final Object id : items) {
109
				group.add(((Node) id).getText());
110
			}
111
			// compute the full mesh
112
			final String type = String.valueOf(Type.valueOf(entitytype).getNumber());
113
			final List<Pair<String, String>> mesh = SimilarityMeshBuilder.build(type, group);
114
			// total += mesh.size();
115
			if (log.isDebugEnabled()) {
116
				log.debug(String.format("built mesh for group '%s', size %d", groupid, mesh.size()));
117
			}
118
			for (final Pair<String, String> p : mesh) {
119
				if (log.isDebugEnabled()) {
120
					log.debug(String.format("adding to queue: %s", p.toString()));
121
				}
122
				q.add(p);
123
			}
124
		} catch (final DocumentException e) {
125
			log.error("invalid document: " + xml);
126
		}
127
	}
128

  
129
	private String buildSimilarity(final Pair<String, String> p) {
130
		final StringTemplate template = new StringTemplate(getSimilarity().getTemplate());
131

  
132
		template.setAttribute("source", p.getKey());
133
		template.setAttribute("target", p.getValue());
134

  
135
		final String res = template.toString();
136
		return res;
137
	}
138

  
139
	public String getInputEprParam() {
140
		return inputEprParam;
141
	}
142

  
143
	public void setInputEprParam(final String inputEprParam) {
144
		this.inputEprParam = inputEprParam;
145
	}
146

  
147
	public String getOutputEprParam() {
148
		return outputEprParam;
149
	}
150

  
151
	public void setOutputEprParam(final String outputEprParam) {
152
		this.outputEprParam = outputEprParam;
153
	}
154

  
155
	public StringTemplate getSimilarity() {
156
		return similarity;
157
	}
158

  
159
	@Required
160
	public void setSimilarity(final StringTemplate similarity) {
161
		this.similarity = similarity;
162
	}
163

  
164
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/DeleteHBaseRecordsJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop.hbase;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import org.apache.commons.io.IOUtils;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10

  
11
import com.googlecode.sarasvati.Engine;
12
import com.googlecode.sarasvati.NodeToken;
13
import com.googlecode.sarasvati.env.Env;
14

  
15
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
16
import eu.dnetlib.data.hadoop.rmi.HadoopService;
17
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
18
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
19
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
20
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
21
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
22
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
23
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
24
import eu.dnetlib.msro.workflows.util.ProgressProvider;
25
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
26
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
27

  
28
public class DeleteHBaseRecordsJobNode extends BlackboardJobNode implements ProgressJobNode {
29

  
30
	private static final Log log = LogFactory.getLog(DeleteHBaseRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
31

  
32
	private String inputEprParam;
33
	private String hbaseTableProperty;
34
	private String cluster;
35
	private String xslt;
36

  
37
	private boolean simulation = false;
38

  
39
	private ProgressProvider progressProvider;
40

  
41
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
42

  
43
	@Override
44
	protected String obtainServiceId(final NodeToken token) {
45
		return getServiceLocator().getServiceId(HadoopService.class);
46
	}
47

  
48
	@Override
49
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
50
		log.info("Invoking blackboard method");
51

  
52
		job.setAction(HadoopBlackboardActions.DELETE_EPR_HBASE.toString());
53
		job.getParameters().put("input_epr", DnetXsltFunctions.encodeBase64(prepareEpr(token)));
54
		job.getParameters().put("xslt", DnetXsltFunctions.encodeBase64(prepareXslt()));
55
		job.getParameters().put("table", getPropertyFetcher().getProperty(getHbaseTableProperty()));
56
		job.getParameters().put("cluster", cluster);
57
		job.getParameters().put("simulation", String.valueOf(isSimulation()));
58
	}
59

  
60
	@Override
61
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
62
		return new BlackboardWorkflowJobListener(engine, token) {
63

  
64
			@Override
65
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
66
				final String count = responseParams.get("count");
67
				log.info(String.format("Deleted %s objects from HBase table %s, cluster %s", count, getPropertyFetcher().getProperty(getHbaseTableProperty()),
68
						getCluster()));
69
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", count);
70
			}
71
		};
72
	}
73

  
74
	private String prepareEpr(final NodeToken token) throws ResultSetException {
75
		final String epr = token.getEnv().getAttribute(inputEprParam);
76
		final ResultsetProgressProvider resultsetProgressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
77

  
78
		setProgressProvider(resultsetProgressProvider);
79

  
80
		return resultsetProgressProvider.getEpr().toString();
81
	}
82

  
83
	private String prepareXslt() throws IOException {
84
		return (xslt == null) || xslt.isEmpty() ? "" : IOUtils.toString(getClass().getResourceAsStream(xslt));
85
	}
86

  
87
	public String getInputEprParam() {
88
		return inputEprParam;
89
	}
90

  
91
	public void setInputEprParam(final String inputEprParam) {
92
		this.inputEprParam = inputEprParam;
93
	}
94

  
95
	public String getHbaseTableProperty() {
96
		return hbaseTableProperty;
97
	}
98

  
99
	public void setHbaseTableProperty(final String hbaseTableProperty) {
100
		this.hbaseTableProperty = hbaseTableProperty;
101
	}
102

  
103
	@Required
104
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
105
		this.processCountingResultSetFactory = processCountingResultSetFactory;
106
	}
107

  
108
	@Override
109
	public ProgressProvider getProgressProvider() {
110
		return progressProvider;
111
	}
112

  
113
	public void setProgressProvider(final ProgressProvider progressProvider) {
114
		this.progressProvider = progressProvider;
115
	}
116

  
117
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
118
		return processCountingResultSetFactory;
119
	}
120

  
121
	public String getXslt() {
122
		return xslt;
123
	}
124

  
125
	public void setXslt(final String xslt) {
126
		this.xslt = xslt;
127
	}
128

  
129
	public String getCluster() {
130
		return cluster;
131
	}
132

  
133
	public void setCluster(final String cluster) {
134
		this.cluster = cluster;
135
	}
136

  
137
	public boolean isSimulation() {
138
		return simulation;
139
	}
140

  
141
	public void setSimulation(final boolean simulation) {
142
		this.simulation = simulation;
143
	}
144

  
145
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/DropHBaseTableJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop.hbase;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8

  
9
import eu.dnetlib.data.hadoop.rmi.HadoopService;
10

  
11
/**
12
 * The Class DropHBaseTableJobNode.
13
 */
14
public class DropHBaseTableJobNode extends AbstractHBaseAdminJobNode {
15

  
16
	/** The Constant log. */
17
	private static final Log log = LogFactory.getLog(DropHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
18

  
19
	/*
20
	 * (non-Javadoc)
21
	 * 
22
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
23
	 */
24
	@Override
25
	protected String execute(final NodeToken token) throws Exception {
26

  
27
		final String tableName = tableName(token);
28
		final String cluster = cluster(token);
29

  
30
		log.info("Dropping hbase table '" + tableName + "' on cluster: '" + cluster + "'");
31

  
32
		getServiceLocator().getService(HadoopService.class).dropHbaseTable(cluster, tableName);
33

  
34
		return Arc.DEFAULT_ARC;
35
	}
36

  
37
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/ExistHBaseTableJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop.hbase;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.NodeToken;
7

  
8
import eu.dnetlib.data.hadoop.rmi.HadoopService;
9

  
10
public class ExistHBaseTableJobNode extends AbstractHBaseAdminJobNode {
11

  
12
	private static final Log log = LogFactory.getLog(ExistHBaseTableJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
13

  
14
	private String existOutNode;
15

  
16
	private String dontExistOutNode;
17

  
18
	@Override
19
	protected String execute(final NodeToken token) throws Exception {
20
		final String tableName = tableName(token);
21
		final String cluster = cluster(token);
22

  
23
		log.info("checking table existance: '" + tableName + "' on cluster: '" + cluster + "'");
24

  
25
		boolean exists = getServiceLocator().getService(HadoopService.class).existHbaseTable(cluster, tableName);
26

  
27
		log.info("table '" + tableName + "' exists: " + exists);
28

  
29
		return exists ? getExistOutNode() : getDontExistOutNode();
30
	}
31

  
32
	public String getExistOutNode() {
33
		return existOutNode;
34
	}
35

  
36
	public void setExistOutNode(final String existOutNode) {
37
		this.existOutNode = existOutNode;
38
	}
39

  
40
	public String getDontExistOutNode() {
41
		return dontExistOutNode;
42
	}
43

  
44
	public void setDontExistOutNode(final String dontExistOutNode) {
45
		this.dontExistOutNode = dontExistOutNode;
46
	}
47

  
48
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/DeleteHdfsPathJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop;
2

  
3
import com.googlecode.sarasvati.NodeToken;
4

  
5
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
6
import eu.dnetlib.data.hadoop.rmi.HadoopService;
7
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
8
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
9

  
10
public class DeleteHdfsPathJobNode extends BlackboardJobNode {
11

  
12
	private String cluster;
13

  
14
	@Override
15
	protected String obtainServiceId(final NodeToken token) {
16
		return getServiceLocator().getServiceId(HadoopService.class);
17
	}
18

  
19
	@Override
20
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
21

  
22
		job.setAction(HadoopBlackboardActions.DELETE_HDFS_PATH.toString());
23
		job.getParameters().put("cluster", getCluster());
24

  
25
		// The "path" parameter is set by the following call
26
		job.getParameters().putAll(parseJsonParameters(token));
27
	}
28

  
29
	public String getCluster() {
30
		return cluster;
31
	}
32

  
33
	public void setCluster(final String cluster) {
34
		this.cluster = cluster;
35
	}
36

  
37
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/FindIndexJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop;
2

  
3
import java.util.List;
4

  
5
import javax.annotation.Resource;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9

  
10
import com.google.common.collect.Iterables;
11
import com.googlecode.sarasvati.NodeToken;
12

  
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
14
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
15
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
16

  
17
public class FindIndexJobNode extends SimpleJobNode {
18

  
19
	/**
20
	 * logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(FindIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	@Resource
25
	private UniqueServiceLocator serviceLocator;
26

  
27
	/**
28
	 * if non null, overrides format from env.
29
	 */
30
	private String mdFormat;
31

  
32
	/**
33
	 * if non null, overrides format from env.
34
	 */
35
	private String layout;
36

  
37
	/**
38
	 * if non null, overrides format from env.
39
	 */
40
	private String interpretation;
41

  
42
	/**
43
	 * {@inheritDoc}
44
	 * 
45
	 * @see com.googlecode.sarasvati.mem.MemNode#execute(com.googlecode.sarasvati.Engine, com.googlecode.sarasvati.NodeToken)
46
	 */
47
	@Override
48
	public String execute(final NodeToken token) {
49
		final String envFormat = token.getFullEnv().getAttribute("format");
50
		final String envLayout = token.getFullEnv().getAttribute("layout");
51
		final String envInterpretation = token.getFullEnv().getAttribute("interpretation");
52

  
53
		final String format = handleOverride(token, "format", envFormat, getMdFormat());
54
		final String layout = handleOverride(token, "layout", envLayout, getLayout());
55
		final String interp = handleOverride(token, "interpretation", envInterpretation, getInterpretation());
56

  
57
		String mdRef = format + "-" + layout + "-" + interp;
58
		log.info("searching index for [" + mdRef + "]");
59

  
60
		final String indexId = findIndex(format, layout, interp);
61
		token.getEnv().setAttribute("index_id", indexId);
62

  
63
		if (indexId == null || indexId.isEmpty()) {
64
			log.info("no index was found for [" + mdRef + "]");
65
			return "notFound";
66
		} else {
67
			log.info("index found for [" + mdRef + "]: " + indexId);
68
			return "found";
69
		}
70
	}
71

  
72
	private String findIndex(final String format, final String layout, final String interp) {
73
		final String xquery =
74
				"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexDSResourceType']" + "return $x[.//METADATA_FORMAT = '" + format
75
						+ "' and .//METADATA_FORMAT_LAYOUT='" + layout + "' and .//METADATA_FORMAT_INTERPRETATION = '" + interp
76
						+ "']//RESOURCE_IDENTIFIER/@value/string()";
77
		try {
78
			log.info("xquery: " + xquery);
79
			List<String> ids = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
80
			log.info("found indexDS ids: " + ids);
81
			if (ids == null || ids.isEmpty()) { return null; }
82
			if (ids.size() > 1) { throw new IllegalStateException("found more than one index of given format: " + format + ", layout: " + layout
83
					+ ", interpretation: " + interp); }
84
			return Iterables.getOnlyElement(ids);
85
		} catch (Exception e) {
86
			return null;
87
		}
88
	}
89

  
90
	private String handleOverride(final NodeToken token, final String name, final String env, final String override) {
91
		if (override != null) {
92
			token.getEnv().setAttribute(name, override);
93
		}
94
		return override != null ? override : env;
95
	}
96

  
97
	public String getMdFormat() {
98
		return mdFormat;
99
	}
100

  
101
	public void setMdFormat(final String mdFormat) {
102
		this.mdFormat = mdFormat;
103
	}
104

  
105
	public String getLayout() {
106
		return layout;
107
	}
108

  
109
	public void setLayout(final String layout) {
110
		this.layout = layout;
111
	}
112

  
113
	public String getInterpretation() {
114
		return interpretation;
115
	}
116

  
117
	public void setInterpretation(final String interpretation) {
118
		this.interpretation = interpretation;
119
	}
120

  
121
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/SubmitHadoopJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop;
2

  
3
import java.util.List;
4

  
5
import javax.annotation.Resource;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9

  
10
import com.google.common.collect.Iterables;
11
import com.googlecode.sarasvati.NodeToken;
12

  
13
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
14
import eu.dnetlib.data.hadoop.rmi.HadoopJobType;
15
import eu.dnetlib.data.hadoop.rmi.HadoopService;
16
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
19
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
20
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
21

  
22
public class SubmitHadoopJobNode extends BlackboardJobNode {
23

  
24
	/**
25
	 * logger.
26
	 */
27
	private static final Log log = LogFactory.getLog(SubmitHadoopJobNode.class);
28

  
29
	@Resource
30
	private UniqueServiceLocator serviceLocator;
31

  
32
	private String hadoopJob;
33

  
34
	private String cluster;
35

  
36
	private boolean simulation = false;
37

  
38
	@Override
39
	protected String obtainServiceId(final NodeToken token) {
40
		return getServiceLocator().getServiceId(HadoopService.class);
41
	}
42

  
43
	@Override
44
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
45
		String type = getJobType(getHadoopJob());
46

  
47
		log.info("submitting job " + getHadoopJob() + " type: " + type);
48

  
49
		job.setAction(type);
50
		job.getParameters().put("job.name", getHadoopJob());
51
		job.getParameters().put("cluster", cluster(token));
52
		job.getParameters().put("simulation", String.valueOf(isSimulation()));
53

  
54
		job.getParameters().putAll(parseJsonParameters(token));
55
	}
56

  
57
	private String cluster(final NodeToken token) {
58
		if (token.getEnv().hasAttribute("cluster")) {
59
			String cluster = token.getEnv().getAttribute("cluster");
60
			log.info("found override value in wfEnv for 'cluster' param: " + cluster);
61
			return cluster;
62
		}
63
		return getCluster();
64
	}
65

  
66
	/**
67
	 * reads the job type for the given job name
68
	 *
69
	 * @param jobName
70
	 * @return
71
	 * @throws ISLookUpException
72
	 */
73
	private String getJobType(final String jobName) throws ISLookUpException {
74
		List<String> res =
75
				serviceLocator.getService(ISLookUpService.class).quickSearchProfile(
76
						"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName
77
						+ "']/@type/string()");
78
		if (res.isEmpty()) { throw new IllegalStateException("unable to find job type for job: " + jobName); }
79

  
80
		final HadoopJobType type = HadoopJobType.valueOf(Iterables.getOnlyElement(res));
81

  
82
		switch (type) {
83
		case mapreduce:
84
			return HadoopBlackboardActions.SUBMIT_MAPREDUCE_JOB.toString();
85
		case admin:
86
			return HadoopBlackboardActions.SUBMIT_ADMIN_JOB.toString();
87
		case oozie:
88
			return HadoopBlackboardActions.SUBMIT_OOZIE_JOB.toString();
89
		default:
90
			throw new IllegalStateException("undefined job type: " + type.toString());
91
		}
92
	}
93

  
94
	public String getHadoopJob() {
95
		return hadoopJob;
96
	}
97

  
98
	public void setHadoopJob(final String hadoopJob) {
99
		this.hadoopJob = hadoopJob;
100
	}
101

  
102
	public String getCluster() {
103
		return cluster;
104
	}
105

  
106
	public void setCluster(final String cluster) {
107
		this.cluster = cluster;
108
	}
109

  
110
	public boolean isSimulation() {
111
		return simulation;
112
	}
113

  
114
	public void setSimulation(final boolean simulation) {
115
		this.simulation = simulation;
116
	}
117
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/resources/eu/dnetlib/msro/workflows/applicationContext-workflow.properties
1
# temporary name
2
dnet.dedup.db.name = dnet_openaireplus
3

  
4
dnet.dedup.index.format = STDL
5
dnet.dedup.index.collection = STDL-index-dedup
6

  
7
dnet.openaire.model.relclasses.xquery = distinct-values(for $x in /RESOURCE_PROFILE[./HEADER/RESOURCE_TYPE/@value='OntologyDSResourceType']//TERM return concat($x/@code/string(), ":", $x//RELATION[./@type = 'inverseOf']/@code/string()))
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/resources/eu/dnetlib/test/profiles/meta/workflows/reset.base.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
3
    <HEADER>
4
        <RESOURCE_IDENTIFIER value="ce304c65-5836-4cf0-9a48-53472b9f6f35_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
5
        <RESOURCE_TYPE value="WorkflowDSResourceType"/>
6
        <RESOURCE_KIND value="WorkflowDSResources"/>
7
        <RESOURCE_URI value=""/>
8
        <DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/>
9
    </HEADER>
10
    <BODY>
11
        <WORKFLOW_NAME>Reset HBase</WORKFLOW_NAME>
12
        <WORKFLOW_TYPE>Data Load</WORKFLOW_TYPE>
13
        <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
14
        <CONFIGURATION start="manual">
15
			<NODE name="checkTable" type="CheckHBaseTable" isStart="true">
16
				<DESCRIPTION>check hbase table</DESCRIPTION>
17
				<PARAMETERS>
18
					<PARAM name="hbaseTableProperty" type="string" managedBy="system" required="true">hbase.mapred.datatable</PARAM>
19
					<PARAM name="cluster" type="string" managedBy="system" required="true">DM</PARAM>
20
					<PARAM name="tableColumnsParamName" type="string" managedBy="system" required="true">hTableColumns</PARAM>
21
					<PARAM name="existOutNode" type="string" managedBy="system" required="true">drop</PARAM>
22
					<PARAM name="dontExistOutNode" type="string" required="true" managedBy="system">define</PARAM>					
23
				</PARAMETERS>
24
				<ARCS>
25
					<ARC to="drop" name="drop" />
26
					<ARC to="define" name ="define" />
27
				</ARCS>
28
			</NODE>
29
			<NODE name="drop" type="DropHBaseTable">
30
				<DESCRIPTION>drop hbase table</DESCRIPTION>
31
				<PARAMETERS>
32
					<PARAM name="hbaseTableProperty" type="string" managedBy="system" required="true">hbase.mapred.datatable</PARAM>
33
					<PARAM name="cluster" type="string" managedBy="system" required="true">DM</PARAM>
34
				</PARAMETERS>
35
				<ARCS>
36
					<ARC to="define" />
37
				</ARCS>
38
			</NODE>        
39
			<NODE name="define" type="DefineHBaseOpenaireSchema">
40
				<DESCRIPTION>define OpenAIRE hbase table</DESCRIPTION>
41
				<PARAMETERS>
42
					<PARAM name="tableColumnsParamName" type="string" managedBy="system" required="true">hTableColumns</PARAM>
43
					<PARAM name="hbaseTableProperty" type="string" managedBy="system" required="true">hbase.mapred.datatable</PARAM>
44
					<PARAM name="cluster" type="string" managedBy="system" required="true">DM</PARAM>
45
				</PARAMETERS>
46
				<ARCS>
47
					<ARC to="create" />
48
				</ARCS>
49
			</NODE>
50
			<NODE name="create" type="CreateHBaseTable">
51
				<DESCRIPTION>create hbase table</DESCRIPTION>
52
				<PARAMETERS>
53
					<PARAM name="hbaseTableProperty" type="string" managedBy="system" required="true">hbase.mapred.datatable</PARAM>
54
					<PARAM name="cluster" type="string" managedBy="system" required="true">DM</PARAM>
55
					<PARAM name="tableColumnsParamName" type="string" managedBy="system" required="true">hTableColumns</PARAM>
56
				</PARAMETERS>
57
				<ARCS>
58
					<ARC to="success" />
59
				</ARCS>
60
			</NODE>
61
        </CONFIGURATION>
62
        <STATUS />
63
    </BODY>
64
</RESOURCE_PROFILE>
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/resources/eu/dnetlib/msro/workflows/dedup/dedupConfPace.xquery.st
1
/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'DedupConfigurationDSResourceType']//CONFIGURATION[@id='$configCode$']//ENTITY[@name = '$entityType$']/PACE/text()
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/resources/patch.dedup.sql
1
--DEDUP SPECIFIC
2

  
3
CREATE TABLE groups (
4
    id character varying(255) NOT NULL,
5
    entitytype character varying(255) NOT NULL,
6
    date timestamp without time zone default now(),
7
    _dnet_resource_identifier_ character varying(2048) DEFAULT ((('temp_'::text || md5((clock_timestamp())::text)) || '_'::text) || md5((random())::text)),
8
    PRIMARY KEY(id)
9
);
10

  
11

  
12

  
13
CREATE TABLE similarity_groups (
14
    groupid character varying(255) NOT NULL REFERENCES groups(id) ON DELETE CASCADE,
15
    objidentifier character varying(255) NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
16
    _dnet_resource_identifier_ character varying(2048) DEFAULT ((('temp_'::text || md5((clock_timestamp())::text)) || '_'::text) || md5((random())::text)),
17
    PRIMARY KEY(groupid, objidentifier)
18
);
19

  
20
CREATE TABLE dissimilarities (
21
    id1 character varying(255) NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
22
    id2 character varying(255) NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
23
    _dnet_resource_identifier_ character varying(2048) DEFAULT ((('temp_'::text || md5((clock_timestamp())::text)) || '_'::text) || md5((random())::text)),
24
    PRIMARY KEY(id1, id2)
25
);
26

  
27

  
28

  
29
CREATE TABLE entities (
30
    id character varying(255) NOT NULL,
31
    entitytype character varying(255) NOT NULL,
32
    _dnet_resource_identifier_ character varying(2048) DEFAULT ((('temp_'::text || md5((clock_timestamp())::text)) || '_'::text) || md5((random())::text)),
33
    PRIMARY KEY(id)
34
);
35

  
36

  
37

  
38

  
39

  
40

  
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/resources/eu/dnetlib/msro/workflows/dedup/dedupConfWf.xquery.st
1
/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'DedupConfigurationDSResourceType']//CONFIGURATION[@id='$configCode$']//ENTITY[@name = '$entityType$']/WORKFLOW/text()
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/resources/eu/dnetlib/msro/workflows/dedup/queryDissimilarities.sql
1
select e.entitytype, d.id1, d.id2 from dissimilarities d join entities e on (d.id1 = e.id)
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/ReuseHdfsRecordsJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop;
2

  
3
import com.googlecode.sarasvati.NodeToken;
4

  
5
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
6

  
7
public class ReuseHdfsRecordsJobNode extends SimpleJobNode {
8

  
9
	private boolean reuseMdRecords;
10

  
11
	@Override
12
	protected String execute(final NodeToken token) throws Exception {
13
		return String.valueOf(isReuseMdRecords());
14
	}
15

  
16
	public boolean isReuseMdRecords() {
17
		return reuseMdRecords;
18
	}
19

  
20
	public void setReuseMdRecords(final boolean reuseMdRecords) {
21
		this.reuseMdRecords = reuseMdRecords;
22
	}
23

  
24
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/resources/eu/dnetlib/msro/workflows/dedup/dissimilarity_2_hbase.xsl
1
<?xml version="1.0" encoding="UTF-8"?>
2
<xsl:stylesheet version="1.0"
3
	xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:dc="http://purl.org/dc/elements/1.1/"
4
	xmlns:dr="http://www.driver-repository.eu/namespace/dr" xmlns:dri="http://www.driver-repository.eu/namespace/dri" 
5
	xmlns:oaa="http://namespace.openaire.eu/oaa" xmlns:oaf="http://namespace.openaire.eu/oaf"
6
	xmlns:dnet="eu.dnetlib.data.transform.xml.DNetDbToHbaseXsltFunctions">
7
	
8
	<xsl:output omit-xml-declaration="yes" indent="yes"/>
9

  
10
	<xsl:template match="/">
11

  
12
		<xsl:variable name="type" select="/ROW/FIELD[@name = 'entitytype']" />
13
		<xsl:variable name="source" select="dnet:oafSimpleId($type, /ROW/FIELD[@name = 'id1'])" />
14
		<xsl:variable name="target" select="dnet:oafSimpleId($type, /ROW/FIELD[@name = 'id2'])" />
15

  
16
		<ROWS>
17
			<xsl:if test="string-length($source) &gt; 0 and string-length($target) &gt; 0">
18
				<ROW key="{$source}" columnFamily="resultResult_dedupSimilarity_isSimilarTo">
19
					<QUALIFIER name="{$target}" type="base64"></QUALIFIER>
20
				</ROW>
21
				<ROW key="{$target}" columnFamily="resultResult_dedupSimilarity_isSimilarTo">
22
					<QUALIFIER name="{$source}" type="base64"></QUALIFIER>
23
				</ROW>				
24
			</xsl:if>
25
		</ROWS>
26
	
27
	</xsl:template>
28

  
29
</xsl:stylesheet>
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupGrouperJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9

  
10
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12

  
13
public class DedupGrouperJobNode extends DedupConfigurationAwareJobNode {
14

  
15
	// TODO factor out this constant, it should be a configuration parameter
16
	public static final int DEDUP_GROUPER_MAX_LOOPS = 10;
17

  
18
	public static final String DEDUP_GROUPER_LOOPER = "dedup.grouper.looper";
19
	public static final String DEDUP_GROUPER_CURR_WRITTEN_RELS = "dedup.grouper.written.rels";
20
	public static final String DEDUP_GROUPER_PREV_WRITTEN_RELS = "dedup.grouper.prev.written.rels";
21

  
22
	private static final Log log = LogFactory.getLog(DedupGrouperJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener {
25

  
26
		public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) {
27
			super(engine, token);
28
		}
29

  
30
		@Override
31
		protected void onDone(final BlackboardJob job) {
32

  
33
			final int times = currentIteration(getToken());
34
			final String curr = job.getParameters().get(DEDUP_GROUPER_CURR_WRITTEN_RELS);
35

  
36
			if (times == 0) {
37
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, -1);
38
			}
39

  
40
			if ((times >= DEDUP_GROUPER_MAX_LOOPS) || isStable(getToken(), curr)) {
41
				super.complete(job, "done");
42
			} else {
43
				log.info("incrementing dedup.grouper.looper to " + (times + 1));
44
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_LOOPER, times + 1);
45
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, curr);
46
				super.complete(job, Arc.DEFAULT_ARC);
47
			}
48
		}
49
	}
50

  
51
	private int currentIteration(final NodeToken token) {
52
		try {
53
			final String sTimes = token.getFullEnv().getAttribute(DEDUP_GROUPER_LOOPER);
54
			log.info("read dedup.grouper.looper from fullEnv: '" + sTimes + "'");
55
			return Integer.parseInt(sTimes);
56
		} catch (final NumberFormatException e) {
57
			log.info("got empty dedup.grouper.looper, initializing to 0");
58
			return 0;
59
		}
60
	}
61

  
62
	private boolean isStable(final NodeToken token, final String sCurr) {
63
		final String sPrev = token.getFullEnv().getAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS);
64

  
65
		log.info("Comparing written rels, prev=" + sPrev + ", curr=" + sCurr);
66
		try {
67
			final boolean b = Integer.parseInt(sCurr) == Integer.parseInt(sPrev);
68
			if (b) {
69
				log.info("  --- The number of written rels is STABLE");
70
			}
71
			return b;
72
		} catch (final Exception e) {
73
			log.error("Invalid parsing of written rels counters - curr: " + sCurr + ", prev: " + sPrev);
74
			return false;
75
		}
76
	}
77

  
78
	@Override
79
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
80
		return new DedupBlackboardWorkflowJobListener(engine, token);
81
	}
82

  
83
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupSimilarityToActionsJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import org.apache.commons.lang.StringUtils;
4
import org.apache.commons.logging.Log;
5
import org.apache.commons.logging.LogFactory;
6

  
7
import com.googlecode.sarasvati.NodeToken;
8

  
9
import eu.dnetlib.data.proto.DedupSimilarityProtos.DedupSimilarity;
10
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
11
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
12
import eu.dnetlib.data.proto.TypeProtos.Type;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.msro.rmi.MSROException;
15

  
16
public class DedupSimilarityToActionsJobNode extends DedupConfigurationAwareJobNode {
17

  
18
	private static final Log log = LogFactory.getLog(DedupSimilarityToActionsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
19

  
20
	@Override
21
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
22
		super.prepareJob(job, token);
23

  
24
		final String entityType = token.getEnv().getAttribute("entityType");
25
		if (StringUtils.isBlank(entityType))
26
			throw new MSROException("unable to find wf param: entityType");
27

  
28
		String cf = "_" + SubRelType.dedupSimilarity + "_" + DedupSimilarity.RelName.isSimilarTo;
29
		switch (Type.valueOf(entityType)) {
30

  
31
		case organization:
32
			cf = RelType.organizationOrganization + cf;
33
			break;
34
		case person:
35
			cf = RelType.personPerson + cf;
36
			break;
37
		case result:
38
			cf = RelType.resultResult + cf;
39
			break;
40
		default:
41
			throw new MSROException("invalid parameter entityType: " + entityType);
42
		}
43

  
44
		log.info("using similarity CF: " + cf);
45
		job.getParameters().put("similarityCF", cf);
46
		token.getEnv().setAttribute("similarityCF", cf);
47
	}
48

  
49
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/hadoop/StoreHdfsRecordsJobNode.java
1
package eu.dnetlib.msro.workflows.hadoop;
2

  
3
import java.util.Map;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Required;
8

  
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11
import com.googlecode.sarasvati.env.Env;
12

  
13
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
14
import eu.dnetlib.data.hadoop.rmi.HadoopService;
15
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
17
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
18
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
19
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
20
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
21
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
22
import eu.dnetlib.msro.workflows.util.ProgressProvider;
23
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
24
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
25

  
26
public class StoreHdfsRecordsJobNode extends BlackboardJobNode implements ProgressJobNode {
27

  
28
	private static final Log log = LogFactory.getLog(StoreHdfsRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
29

  
30
	private String inputEprParam;
31
	private String hdfsPathParam;
32
	private String cluster;
33

  
34
	private ProgressProvider progressProvider;
35

  
36
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
37

  
38
	@Override
39
	protected String obtainServiceId(final NodeToken token) {
40
		return getServiceLocator().getServiceId(HadoopService.class);
41
	}
42

  
43
	@Override
44
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
45
		log.info("Invoking blackboard method");
46

  
47
		job.setAction(HadoopBlackboardActions.IMPORT_EPR_HDFS.toString());
48
		job.getParameters().put("input_epr", DnetXsltFunctions.encodeBase64(prepareEpr(token)));
49
		job.getParameters().put("path", token.getEnv().getAttribute(getHdfsPathParam()));
50
		job.getParameters().put("cluster", getCluster());
51
	}
52

  
53
	@Override
54
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
55
		return new BlackboardWorkflowJobListener(engine, token) {
56

  
57
			@Override
58
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
59
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", responseParams.get("count"));
60
			}
61
		};
62
	}
63

  
64
	private String prepareEpr(final NodeToken token) throws ResultSetException {
65
		final String epr = token.getEnv().getAttribute(inputEprParam);
66
		final ResultsetProgressProvider resultsetProgressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
67

  
68
		setProgressProvider(resultsetProgressProvider);
69

  
70
		return resultsetProgressProvider.getEpr().toString();
71
	}
72

  
73
	public String getInputEprParam() {
74
		return inputEprParam;
75
	}
76

  
77
	public void setInputEprParam(final String inputEprParam) {
78
		this.inputEprParam = inputEprParam;
79
	}
80

  
81
	@Required
82
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
83
		this.processCountingResultSetFactory = processCountingResultSetFactory;
84
	}
85

  
86
	@Override
87
	public ProgressProvider getProgressProvider() {
88
		return progressProvider;
89
	}
90

  
91
	public void setProgressProvider(final ProgressProvider progressProvider) {
92
		this.progressProvider = progressProvider;
93
	}
94

  
95
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
96
		return processCountingResultSetFactory;
97
	}
98

  
99
	public String getCluster() {
100
		return cluster;
101
	}
102

  
103
	public void setCluster(final String cluster) {
104
		this.cluster = cluster;
105
	}
106

  
107
	public String getHdfsPathParam() {
108
		return hdfsPathParam;
109
	}
110

  
111
	public void setHdfsPathParam(final String hdfsPathParam) {
112
		this.hdfsPathParam = hdfsPathParam;
113
	}
114

  
115
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/actions/UpdateSetsJobNode.java
1
package eu.dnetlib.msro.workflows.actions;
2

  
3
import java.io.StringReader;
4
import java.util.List;
5
import java.util.Map;
6

  
7
import javax.annotation.Resource;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.dom4j.Attribute;
12
import org.dom4j.Document;
13
import org.dom4j.Element;
14
import org.dom4j.io.SAXReader;
15

  
16
import com.google.gson.Gson;
17
import com.googlecode.sarasvati.Arc;
18
import com.googlecode.sarasvati.NodeToken;
19

  
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
21
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
22
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
23
import eu.dnetlib.miscutils.datetime.DateUtils;
24
import eu.dnetlib.msro.rmi.MSROException;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26

  
27
public class UpdateSetsJobNode extends SimpleJobNode {
28

  
29
	/**
30
	 * logger.
31
	 */
32
	private static final Log log = LogFactory.getLog(UpdateSetsJobNode.class);
33

  
34
	@Resource
35
	private UniqueServiceLocator serviceLocator;
36

  
37
	@Override
38
	protected String execute(final NodeToken token) throws Exception {
39

  
40
		@SuppressWarnings("unchecked")
41
		final List<Map<String, String>> sets = new Gson().fromJson(token.getEnv().getAttribute("sets"), List.class);
42

  
43
		final String lastUpdate = DateUtils.now_ISO8601();
44
		for (Map<String, String> set : sets) {
45

  
46
			// update only the enabled sets.
47
			if (isEnabled(set)) {
48
				log.info("updating set: " + set.toString());
49
				addLatestRawSet(set, lastUpdate);
50
			} else {
51
				log.info("skip set update: " + set.toString());
52
			}
53
		}
54

  
55
		return Arc.DEFAULT_ARC;
56
	}
57

  
58
	private boolean isEnabled(final Map<String, String> set) {
59
		return set.containsKey("enabled") && set.get("enabled").equals("true");
60
	}
61

  
62
	public void addLatestRawSet(final Map<String, String> set, final String lastUpdate) throws MSROException {
63
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//SET/@id = '" + set.get("set")
64
				+ "' return $x";
65
		try {
66
			final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(q);
67
			final Document doc = new SAXReader().read(new StringReader(profile));
68
			final String profId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
69
			final Element latest = (Element) doc.selectSingleNode("//RAW_SETS/LATEST");
70
			final Element expired = ((Element) doc.selectSingleNode("//RAW_SETS")).addElement("EXPIRED");
71

  
72
			for (Object o : latest.attributes()) {
73
				final Attribute a = (Attribute) o;
74
				expired.addAttribute(a.getName(), a.getValue());
75
			}
76

  
77
			latest.addAttribute("id", set.get("rawset"));
78
			latest.addAttribute("creationDate", set.get("creationDate"));
79
			latest.addAttribute("lastUpdate", lastUpdate);
80

  
81
			serviceLocator.getService(ISRegistryService.class).updateProfile(profId, doc.asXML(), "ActionManagerSetDSResourceType");
82
		} catch (Exception e) {
83
			String msg = "Error updating profile of set: " + set;
84
			log.error(msg);
85
			throw new MSROException(msg, e);
86
		}
87
	}
88

  
89
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupConfigurationSetterJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import org.apache.commons.lang.StringUtils;
4

  
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7

  
8
import eu.dnetlib.data.proto.TypeProtos.Type;
9
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
10

  
11
public class DedupConfigurationSetterJobNode extends AsyncJobNode {
12

  
13
	public static final String CONF_SEPARATOR = ":";
14

  
15
	private String entityType;
16

  
17
	private String dedupConfigSequence;
18

  
19
	private String dedupConfigSequenceParam;
20

  
21
	@Override
22
	protected String execute(final NodeToken token) throws Exception {
23

  
24
		if (StringUtils.isBlank(getEntityType())) throw new IllegalArgumentException("missing entity type parameter");
25
		if (StringUtils.isBlank(getDedupConfigSequence())) throw new IllegalArgumentException("missing configuration sequence");
26

  
27
		token.getEnv().setAttribute("entityType", getEntityType());
28
		token.getEnv().setAttribute("entityTypeId", Type.valueOf(getEntityType()).getNumber());
29

  
30
		token.getEnv().setAttribute(getDedupConfigSequenceParam(), getDedupConfigSequence());
31

  
32
		return Arc.DEFAULT_ARC;
33
	}
34

  
35
	public String getEntityType() {
36
		return entityType;
37
	}
38

  
39
	public void setEntityType(final String entityType) {
40
		this.entityType = entityType;
41
	}
42

  
43
	public String getDedupConfigSequence() {
44
		return dedupConfigSequence;
45
	}
46

  
47
	public void setDedupConfigSequence(final String dedupConfigSequence) {
48
		this.dedupConfigSequence = dedupConfigSequence;
49
	}
50

  
51
	public String getDedupConfigSequenceParam() {
52
		return dedupConfigSequenceParam;
53
	}
54

  
55
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
56
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
57
	}
58

  
59
}
modules/dnet-deduplication/tags/dnet-deduplication-1.0.1/src/main/java/eu/dnetlib/msro/workflows/dedup/FinalizeDedupIndexJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import static java.lang.String.format;
4

  
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
import com.googlecode.sarasvati.NodeToken;
10

  
11
import eu.dnetlib.data.provision.index.rmi.IndexService;
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
14
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff