Revision 48075
Added by Claudio Atzori over 7 years ago
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/pom.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> |
|
3 |
<parent> |
|
4 |
<groupId>eu.dnetlib</groupId> |
|
5 |
<artifactId>dnet45-parent</artifactId> |
|
6 |
<version>1.0.0</version> |
|
7 |
<relativePath /> |
|
8 |
</parent> |
|
9 |
<modelVersion>4.0.0</modelVersion> |
|
10 |
<groupId>eu.dnetlib</groupId> |
|
11 |
<artifactId>dnet-deduplication</artifactId> |
|
12 |
<packaging>jar</packaging> |
|
13 |
<version>1.5.8</version> |
|
14 |
<scm> |
|
15 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-deduplication/tags/dnet-deduplication-1.5.8</developerConnection> |
|
16 |
</scm> |
|
17 |
<dependencies> |
|
18 |
<dependency> |
|
19 |
<groupId>eu.dnetlib</groupId> |
|
20 |
<artifactId>dnet-msro-service</artifactId> |
|
21 |
<version>[3.0.0,4.0.0)</version> |
|
22 |
</dependency> |
|
23 |
<dependency> |
|
24 |
<groupId>eu.dnetlib</groupId> |
|
25 |
<artifactId>dnet-hadoop-service-rmi</artifactId> |
|
26 |
<version>[1.0.0,2.0.0)</version> |
|
27 |
</dependency> |
|
28 |
<dependency> |
|
29 |
<groupId>eu.dnetlib</groupId> |
|
30 |
<artifactId>dnet-actionmanager-api</artifactId> |
|
31 |
<version>[4.0.0,5.0.0)</version> |
|
32 |
</dependency> |
|
33 |
<dependency> |
|
34 |
<groupId>eu.dnetlib</groupId> |
|
35 |
<artifactId>dnet-modular-ui</artifactId> |
|
36 |
<version>[3.0.0,4.0.0)</version> |
|
37 |
</dependency> |
|
38 |
|
|
39 |
<dependency> |
|
40 |
<groupId>eu.dnetlib</groupId> |
|
41 |
<artifactId>dnet-index-solr-client</artifactId> |
|
42 |
<version>[2.0.0,3.0.0)</version> |
|
43 |
</dependency> |
|
44 |
|
|
45 |
<dependency> |
|
46 |
<groupId>eu.dnetlib</groupId> |
|
47 |
<artifactId>dnet-openaireplus-mapping-utils</artifactId> |
|
48 |
<version>[6.0.0,7.0.0)</version> |
|
49 |
</dependency> |
|
50 |
|
|
51 |
|
|
52 |
<dependency> |
|
53 |
<groupId>javax.servlet</groupId> |
|
54 |
<artifactId>javax.servlet-api</artifactId> |
|
55 |
<version>${javax.servlet.version}</version> |
|
56 |
<scope>provided</scope> |
|
57 |
</dependency> |
|
58 |
<dependency> |
|
59 |
<groupId>com.fasterxml.jackson.core</groupId> |
|
60 |
<artifactId>jackson-databind</artifactId> |
|
61 |
<version>${jackson.version}</version> |
|
62 |
</dependency> |
|
63 |
<dependency> |
|
64 |
<groupId>com.google.guava</groupId> |
|
65 |
<artifactId>guava</artifactId> |
|
66 |
<version>${google.guava.version}</version> |
|
67 |
</dependency> |
|
68 |
|
|
69 |
<dependency> |
|
70 |
<groupId>junit</groupId> |
|
71 |
<artifactId>junit</artifactId> |
|
72 |
<version>${junit.version}</version> |
|
73 |
<scope>test</scope> |
|
74 |
</dependency> |
|
75 |
|
|
76 |
</dependencies> |
|
77 |
</project> |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/MinDistSearchHadoopJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.nio.file.FileSystems; |
|
4 |
import java.nio.file.Path; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Arc; |
|
7 |
import com.googlecode.sarasvati.Engine; |
|
8 |
import com.googlecode.sarasvati.NodeToken; |
|
9 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
10 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
11 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
12 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
|
13 |
import org.apache.commons.lang.StringUtils; |
|
14 |
import org.apache.commons.logging.Log; |
|
15 |
import org.apache.commons.logging.LogFactory; |
|
16 |
import org.springframework.beans.factory.annotation.Autowired; |
|
17 |
|
|
18 |
/** |
|
19 |
* Created by claudio on 14/10/15. |
|
20 |
*/ |
|
21 |
public class MinDistSearchHadoopJobNode extends DedupConfigurationAwareJobNode { |
|
22 |
|
|
23 |
private static final Log log = LogFactory.getLog(MinDistSearchHadoopJobNode.class); |
|
24 |
private final static String StatusParam = "MinDistSearchHadoopJobNode.status"; |
|
25 |
private final static String DepthParam = "mindist_recursion_depth"; |
|
26 |
private final static String UpdateCounterParam = "UpdateCounter.UPDATED"; |
|
27 |
private final static String DebugParam = "mindist_DEBUG"; |
|
28 |
@Autowired |
|
29 |
private UniqueServiceLocator serviceLocator; |
|
30 |
private boolean debug = false; |
|
31 |
private String outPathParam; |
|
32 |
|
|
33 |
@Override |
|
34 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
35 |
|
|
36 |
String depthString = token.getFullEnv().getAttribute(DepthParam); |
|
37 |
log.debug(String.format("found depthParam: '%s'", depthString)); |
|
38 |
if (StringUtils.isBlank(depthString)) { |
|
39 |
depthString = "0"; |
|
40 |
} |
|
41 |
|
|
42 |
int depth = Integer.valueOf(depthString); |
|
43 |
|
|
44 |
final String cluster = token.getEnv().getAttribute("cluster"); |
|
45 |
final String outputPath = getPath(token.getEnv().getAttribute("workDir"), depth); |
|
46 |
|
|
47 |
final HadoopService hadoopService = serviceLocator.getService(HadoopService.class); |
|
48 |
switch (getStatusFromEnv(token)) { |
|
49 |
|
|
50 |
case DATALOAD: |
|
51 |
|
|
52 |
setHadoopJob("dedupSimilarity2GraphJob"); |
|
53 |
|
|
54 |
job.getParameters().put("mapred.output.dir", getPath(token.getEnv().getAttribute("workDir"), depth) + "/out"); |
|
55 |
|
|
56 |
hadoopService.createHdfsDirectory(cluster, outputPath, true); |
|
57 |
|
|
58 |
break; |
|
59 |
case DEPTH_N: |
|
60 |
|
|
61 |
setHadoopJob("dedupMinDistGraphJob"); |
|
62 |
|
|
63 |
final String newOutputPath = getPath(token.getEnv().getAttribute("workDir"), depth + 1); |
|
64 |
hadoopService.createHdfsDirectory(cluster, newOutputPath, true); |
|
65 |
|
|
66 |
job.getParameters().put(DepthParam, String.valueOf(depth)); |
|
67 |
job.getParameters().put(DebugParam, String.valueOf(isDebug())); |
|
68 |
|
|
69 |
job.getParameters().put("mapred.input.dir", outputPath + "/out"); |
|
70 |
job.getParameters().put("mapred.output.dir", newOutputPath + "/out"); |
|
71 |
|
|
72 |
if (log.isDebugEnabled()) { |
|
73 |
log.debug(String.format("input job parameters: %s", job.getParameters())); |
|
74 |
} |
|
75 |
|
|
76 |
token.getFullEnv().setAttribute(DepthParam, String.valueOf(depth + 1)); |
|
77 |
token.getFullEnv().setAttribute(getOutPathParam(), newOutputPath + "/out"); |
|
78 |
|
|
79 |
break; |
|
80 |
} |
|
81 |
|
|
82 |
super.prepareJob(job, token); |
|
83 |
} |
|
84 |
|
|
85 |
private String getPath(final String basePath, final int depth) { |
|
86 |
Path path = FileSystems.getDefault().getPath(basePath, "depth_" + depth); |
|
87 |
return path.toAbsolutePath().toString(); |
|
88 |
} |
|
89 |
|
|
90 |
private STATUS getStatusFromEnv(final NodeToken token) { |
|
91 |
if(StringUtils.isBlank(token.getEnv().getAttribute(StatusParam))) { |
|
92 |
return STATUS.DATALOAD; |
|
93 |
} |
|
94 |
STATUS current = STATUS.DATALOAD; |
|
95 |
try { |
|
96 |
current = STATUS.valueOf(token.getEnv().getAttribute(StatusParam)); |
|
97 |
log.debug("found status: " + current.toString()); |
|
98 |
} catch (IllegalArgumentException e) {} |
|
99 |
return current; |
|
100 |
} |
|
101 |
|
|
102 |
@Override |
|
103 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) { |
|
104 |
return new BlackboardWorkflowJobListener(engine, token) { |
|
105 |
|
|
106 |
@Override |
|
107 |
protected void onDone(final BlackboardJob job) { |
|
108 |
|
|
109 |
final STATUS status = getStatusFromEnv(token); |
|
110 |
log.debug("complete phase: " + status); |
|
111 |
switch (status) { |
|
112 |
case DATALOAD: |
|
113 |
token.getFullEnv().setAttribute(StatusParam, STATUS.DEPTH_N.toString()); |
|
114 |
token.getFullEnv().setAttribute(DepthParam, "0"); |
|
115 |
complete("depth_n"); |
|
116 |
break; |
|
117 |
case DEPTH_N: |
|
118 |
|
|
119 |
if (log.isDebugEnabled()) { |
|
120 |
log.debug(String.format("return job parameters: %s=%s, %s=%s", DepthParam, job.getParameters().get(DepthParam), UpdateCounterParam, job.getParameters().get(UpdateCounterParam))); |
|
121 |
} |
|
122 |
|
|
123 |
final String counter = job.getParameters().get(UpdateCounterParam); |
|
124 |
if (StringUtils.isBlank(counter)) { |
|
125 |
token.getFullEnv().removeAttribute(StatusParam); |
|
126 |
token.getFullEnv().removeAttribute(DepthParam); |
|
127 |
log.info(String.format("done iteration %s:%s", UpdateCounterParam, 0)); |
|
128 |
complete(Arc.DEFAULT_ARC); |
|
129 |
} else { |
|
130 |
log.info(String.format("continue with next iteration %s:%s", UpdateCounterParam, counter)); |
|
131 |
complete("depth_n"); |
|
132 |
} |
|
133 |
|
|
134 |
break; |
|
135 |
} |
|
136 |
} |
|
137 |
|
|
138 |
private void complete(final String arc) { |
|
139 |
engine.complete(token, arc); |
|
140 |
engine.executeQueuedArcTokens(token.getProcess()); |
|
141 |
} |
|
142 |
}; |
|
143 |
} |
|
144 |
|
|
145 |
public boolean isDebug() { |
|
146 |
return debug; |
|
147 |
} |
|
148 |
|
|
149 |
public void setDebug(boolean debug) { |
|
150 |
this.debug = debug; |
|
151 |
} |
|
152 |
|
|
153 |
public String getOutPathParam() { |
|
154 |
return outPathParam; |
|
155 |
} |
|
156 |
|
|
157 |
public void setOutPathParam(String outPathParam) { |
|
158 |
this.outPathParam = outPathParam; |
|
159 |
} |
|
160 |
|
|
161 |
enum STATUS {DATALOAD, DEPTH_N} |
|
162 |
|
|
163 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/hadoop/SubmitHadoopJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.hadoop; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import org.apache.commons.lang.StringUtils; |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
|
|
11 |
import com.google.common.collect.Iterables; |
|
12 |
import com.googlecode.sarasvati.NodeToken; |
|
13 |
|
|
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
|
15 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobType; |
|
16 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
18 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
19 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
20 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
21 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
22 |
|
|
23 |
public class SubmitHadoopJobNode extends BlackboardJobNode { |
|
24 |
|
|
25 |
/** |
|
26 |
* logger. |
|
27 |
*/ |
|
28 |
private static final Log log = LogFactory.getLog(SubmitHadoopJobNode.class); |
|
29 |
|
|
30 |
public static final String OOZIE_REPORT_ACTIONS = "oozie.report.actions.csv"; |
|
31 |
|
|
32 |
@Resource |
|
33 |
private UniqueServiceLocator serviceLocator; |
|
34 |
|
|
35 |
private String hadoopJob; |
|
36 |
|
|
37 |
private String cluster; |
|
38 |
|
|
39 |
private boolean simulation = false; |
|
40 |
|
|
41 |
private String oozieReportActionsCsv; |
|
42 |
|
|
43 |
@Override |
|
44 |
protected String obtainServiceId(final NodeToken token) { |
|
45 |
return getServiceLocator().getServiceId(HadoopService.class); |
|
46 |
} |
|
47 |
|
|
48 |
@Override |
|
49 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
50 |
String type = getJobType(getHadoopJob()); |
|
51 |
|
|
52 |
log.info("submitting job " + getHadoopJob() + " type: " + type); |
|
53 |
|
|
54 |
job.setAction(type); |
|
55 |
job.getParameters().put("job.name", getHadoopJob()); |
|
56 |
job.getParameters().put("cluster", cluster(token)); |
|
57 |
job.getParameters().put("simulation", String.valueOf(isSimulation())); |
|
58 |
|
|
59 |
if (StringUtils.isNotBlank(getOozieReportActionsCsv())) { |
|
60 |
job.getParameters().put(OOZIE_REPORT_ACTIONS, getOozieReportActionsCsv()); |
|
61 |
} |
|
62 |
|
|
63 |
job.getParameters().putAll(parseJsonParameters(token)); |
|
64 |
} |
|
65 |
|
|
66 |
private String cluster(final NodeToken token) { |
|
67 |
if (token.getEnv().hasAttribute("cluster")) { |
|
68 |
String cluster = token.getEnv().getAttribute("cluster"); |
|
69 |
log.info("found override value in wfEnv for 'cluster' param: " + cluster); |
|
70 |
return cluster; |
|
71 |
} |
|
72 |
return getCluster(); |
|
73 |
} |
|
74 |
|
|
75 |
/** |
|
76 |
* reads the job type for the given job name |
|
77 |
* |
|
78 |
* @param jobName |
|
79 |
* @return |
|
80 |
* @throws ISLookUpException |
|
81 |
*/ |
|
82 |
private String getJobType(final String jobName) throws ISLookUpException { |
|
83 |
List<String> res = |
|
84 |
serviceLocator.getService(ISLookUpService.class).quickSearchProfile( |
|
85 |
"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName |
|
86 |
+ "']/@type/string()"); |
|
87 |
if (res.isEmpty()) { throw new IllegalStateException("unable to find job type for job: " + jobName); } |
|
88 |
|
|
89 |
final HadoopJobType type = HadoopJobType.valueOf(Iterables.getOnlyElement(res)); |
|
90 |
|
|
91 |
switch (type) { |
|
92 |
case mapreduce: |
|
93 |
return HadoopBlackboardActions.SUBMIT_MAPREDUCE_JOB.toString(); |
|
94 |
case admin: |
|
95 |
return HadoopBlackboardActions.SUBMIT_ADMIN_JOB.toString(); |
|
96 |
case oozie: |
|
97 |
return HadoopBlackboardActions.SUBMIT_OOZIE_JOB.toString(); |
|
98 |
default: |
|
99 |
throw new IllegalStateException("undefined job type: " + type.toString()); |
|
100 |
} |
|
101 |
} |
|
102 |
|
|
103 |
public String getHadoopJob() { |
|
104 |
return hadoopJob; |
|
105 |
} |
|
106 |
|
|
107 |
public void setHadoopJob(final String hadoopJob) { |
|
108 |
this.hadoopJob = hadoopJob; |
|
109 |
} |
|
110 |
|
|
111 |
public String getCluster() { |
|
112 |
return cluster; |
|
113 |
} |
|
114 |
|
|
115 |
public void setCluster(final String cluster) { |
|
116 |
this.cluster = cluster; |
|
117 |
} |
|
118 |
|
|
119 |
public boolean isSimulation() { |
|
120 |
return simulation; |
|
121 |
} |
|
122 |
|
|
123 |
public void setSimulation(final boolean simulation) { |
|
124 |
this.simulation = simulation; |
|
125 |
} |
|
126 |
|
|
127 |
public String getOozieReportActionsCsv() { |
|
128 |
return oozieReportActionsCsv; |
|
129 |
} |
|
130 |
|
|
131 |
public void setOozieReportActionsCsv(final String oozieReportActionsCsv) { |
|
132 |
this.oozieReportActionsCsv = oozieReportActionsCsv; |
|
133 |
} |
|
134 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/conf/Entity.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup.conf; |
|
2 |
|
|
3 |
import com.google.gson.GsonBuilder; |
|
4 |
|
|
5 |
/** |
|
6 |
* The Class Entity. |
|
7 |
*/ |
|
8 |
public class Entity { |
|
9 |
|
|
10 |
/** The name. */ |
|
11 |
private String name; |
|
12 |
|
|
13 |
/** The code. */ |
|
14 |
private String code; |
|
15 |
|
|
16 |
/** The label. */ |
|
17 |
private String label; |
|
18 |
|
|
19 |
public Entity() {} |
|
20 |
|
|
21 |
/** |
|
22 |
* Instantiates a new entity. |
|
23 |
* |
|
24 |
* @param name |
|
25 |
* the name |
|
26 |
* @param code |
|
27 |
* the code |
|
28 |
* @param label |
|
29 |
* the label |
|
30 |
*/ |
|
31 |
public Entity(final String name, final String code, final String label) { |
|
32 |
super(); |
|
33 |
this.setName(name); |
|
34 |
this.setCode(code); |
|
35 |
this.setLabel(label); |
|
36 |
} |
|
37 |
|
|
38 |
/** |
|
39 |
* Gets the name. |
|
40 |
* |
|
41 |
* @return the name |
|
42 |
*/ |
|
43 |
public String getName() { |
|
44 |
return name; |
|
45 |
} |
|
46 |
|
|
47 |
/** |
|
48 |
* Gets the code. |
|
49 |
* |
|
50 |
* @return the code |
|
51 |
*/ |
|
52 |
public String getCode() { |
|
53 |
return code; |
|
54 |
} |
|
55 |
|
|
56 |
/** |
|
57 |
* Gets the label. |
|
58 |
* |
|
59 |
* @return the label |
|
60 |
*/ |
|
61 |
public String getLabel() { |
|
62 |
return label; |
|
63 |
} |
|
64 |
|
|
65 |
public void setName(final String name) { |
|
66 |
this.name = name; |
|
67 |
} |
|
68 |
|
|
69 |
public void setCode(final String code) { |
|
70 |
this.code = code; |
|
71 |
} |
|
72 |
|
|
73 |
public void setLabel(final String label) { |
|
74 |
this.label = label; |
|
75 |
} |
|
76 |
|
|
77 |
/* |
|
78 |
* (non-Javadoc) |
|
79 |
* |
|
80 |
* @see java.lang.Object#toString() |
|
81 |
*/ |
|
82 |
@Override |
|
83 |
public String toString() { |
|
84 |
return new GsonBuilder().setPrettyPrinting().create().toJson(this); |
|
85 |
} |
|
86 |
|
|
87 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/BuildSimilarityMeshJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.Iterator; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Queue; |
|
7 |
|
|
8 |
import javax.annotation.Resource; |
|
9 |
import javax.xml.ws.wsaddressing.W3CEndpointReference; |
|
10 |
|
|
11 |
import org.antlr.stringtemplate.StringTemplate; |
|
12 |
import org.apache.commons.logging.Log; |
|
13 |
import org.apache.commons.logging.LogFactory; |
|
14 |
import org.dom4j.Document; |
|
15 |
import org.dom4j.DocumentException; |
|
16 |
import org.dom4j.Node; |
|
17 |
import org.dom4j.io.SAXReader; |
|
18 |
import org.springframework.beans.factory.annotation.Required; |
|
19 |
|
|
20 |
import com.google.common.collect.Lists; |
|
21 |
import com.google.common.collect.Queues; |
|
22 |
import com.googlecode.sarasvati.Arc; |
|
23 |
import com.googlecode.sarasvati.NodeToken; |
|
24 |
|
|
25 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
26 |
import eu.dnetlib.enabling.resultset.IterableResultSetFactory; |
|
27 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
28 |
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity; |
|
29 |
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder; |
|
30 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
31 |
|
|
32 |
public class BuildSimilarityMeshJobNode extends AsyncJobNode { |
|
33 |
|
|
34 |
private static final Log log = LogFactory.getLog(BuildSimilarityMeshJobNode.class); |
|
35 |
|
|
36 |
/** The result set factory. */ |
|
37 |
@Resource(name = "iterableResultSetFactory") |
|
38 |
private IterableResultSetFactory resultSetFactory; |
|
39 |
|
|
40 |
/** The result set client factory. */ |
|
41 |
@Resource(name = "resultSetClientFactory") |
|
42 |
private ResultSetClientFactory resultSetClientFactory; |
|
43 |
|
|
44 |
private StringTemplate similarity; |
|
45 |
|
|
46 |
private String inputEprParam; |
|
47 |
|
|
48 |
private String outputEprParam; |
|
49 |
|
|
50 |
@Override |
|
51 |
protected String execute(final NodeToken token) throws Exception { |
|
52 |
|
|
53 |
final String inputEpr = token.getEnv().getAttribute(getInputEprParam()); |
|
54 |
|
|
55 |
final Iterator<String> rsClient = resultSetClientFactory.getClient(inputEpr).iterator(); |
|
56 |
final Queue<Object> queue = Queues.newLinkedBlockingQueue(); |
|
57 |
final SAXReader reader = new SAXReader(); |
|
58 |
|
|
59 |
if (rsClient.hasNext()) { |
|
60 |
populateQueue(queue, reader, rsClient.next()); |
|
61 |
} |
|
62 |
|
|
63 |
final W3CEndpointReference eprOut = resultSetFactory.createIterableResultSet(new Iterable<String>() { |
|
64 |
|
|
65 |
@Override |
|
66 |
public Iterator<String> iterator() { |
|
67 |
return new Iterator<String>() { |
|
68 |
|
|
69 |
@Override |
|
70 |
public boolean hasNext() { |
|
71 |
synchronized (queue) { |
|
72 |
return !queue.isEmpty(); |
|
73 |
} |
|
74 |
} |
|
75 |
|
|
76 |
@Override |
|
77 |
public String next() { |
|
78 |
synchronized (queue) { |
|
79 |
final Object o = queue.poll(); |
|
80 |
while (queue.isEmpty() && rsClient.hasNext()) { |
|
81 |
populateQueue(queue, reader, rsClient.next()); |
|
82 |
} |
|
83 |
return buildSimilarity((Similarity) o); |
|
84 |
} |
|
85 |
} |
|
86 |
|
|
87 |
@Override |
|
88 |
public void remove() { |
|
89 |
throw new UnsupportedOperationException(); |
|
90 |
} |
|
91 |
}; |
|
92 |
} |
|
93 |
}); |
|
94 |
|
|
95 |
token.getEnv().setAttribute(getOutputEprParam(), eprOut.toString()); |
|
96 |
|
|
97 |
return Arc.DEFAULT_ARC; |
|
98 |
} |
|
99 |
|
|
100 |
private void populateQueue(final Queue<Object> q, final SAXReader r, final String xml) { |
|
101 |
try { |
|
102 |
final Document d = r.read(new StringReader(xml)); |
|
103 |
final String groupid = d.valueOf("//FIELD[@name='id']"); |
|
104 |
final List<?> items = d.selectNodes("//FIELD[@name='group']/ITEM"); |
|
105 |
final String entitytype = d.valueOf("//FIELD[@name='entitytype']"); |
|
106 |
final List<String> group = Lists.newArrayList(); |
|
107 |
for (final Object id : items) { |
|
108 |
group.add(((Node) id).getText()); |
|
109 |
} |
|
110 |
// compute the full mesh |
|
111 |
final Type type = Type.valueOf(entitytype); |
|
112 |
|
|
113 |
final List<Similarity> mesh = SimilarityMeshBuilder.build(type, group); |
|
114 |
// total += mesh.size(); |
|
115 |
if (log.isDebugEnabled()) { |
|
116 |
log.debug(String.format("built mesh for group '%s', size %d", groupid, mesh.size())); |
|
117 |
} |
|
118 |
for (final Similarity s : mesh) { |
|
119 |
if (log.isDebugEnabled()) { |
|
120 |
log.debug(String.format("adding to queue: %s", s.toString())); |
|
121 |
} |
|
122 |
q.add(s); |
|
123 |
} |
|
124 |
} catch (final DocumentException e) { |
|
125 |
log.error("invalid document: " + xml); |
|
126 |
} |
|
127 |
} |
|
128 |
|
|
129 |
private String buildSimilarity(final Similarity s) { |
|
130 |
final StringTemplate template = new StringTemplate(getSimilarity().getTemplate()); |
|
131 |
|
|
132 |
template.setAttribute("source", s.getPair().getKey()); |
|
133 |
template.setAttribute("target", s.getPair().getValue()); |
|
134 |
template.setAttribute("type", s.getType().toString()); |
|
135 |
|
|
136 |
final String res = template.toString(); |
|
137 |
return res; |
|
138 |
} |
|
139 |
|
|
140 |
public String getInputEprParam() { |
|
141 |
return inputEprParam; |
|
142 |
} |
|
143 |
|
|
144 |
public void setInputEprParam(final String inputEprParam) { |
|
145 |
this.inputEprParam = inputEprParam; |
|
146 |
} |
|
147 |
|
|
148 |
public String getOutputEprParam() { |
|
149 |
return outputEprParam; |
|
150 |
} |
|
151 |
|
|
152 |
public void setOutputEprParam(final String outputEprParam) { |
|
153 |
this.outputEprParam = outputEprParam; |
|
154 |
} |
|
155 |
|
|
156 |
public StringTemplate getSimilarity() { |
|
157 |
return similarity; |
|
158 |
} |
|
159 |
|
|
160 |
@Required |
|
161 |
public void setSimilarity(final StringTemplate similarity) { |
|
162 |
this.similarity = similarity; |
|
163 |
} |
|
164 |
|
|
165 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupConfigurationAwareJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.NodeToken; |
|
4 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
5 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
6 |
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode; |
|
7 |
import eu.dnetlib.pace.config.DedupConfig; |
|
8 |
import org.apache.commons.lang.StringUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
|
|
12 |
public class DedupConfigurationAwareJobNode extends SubmitHadoopJobNode { |
|
13 |
|
|
14 |
private static final Log log = LogFactory.getLog(DedupConfigurationAwareJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
15 |
|
|
16 |
private String dedupConfigSequenceParam; |
|
17 |
|
|
18 |
@Override |
|
19 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
20 |
super.prepareJob(job, token); |
|
21 |
|
|
22 |
final DedupConfigurationOrchestration dedupConfigurations = dedupConfigurations(token); |
|
23 |
final DedupConfig currentConf = dedupConfigurations.getConfigurations().peek(); |
|
24 |
|
|
25 |
log.debug("using dedup configuration: '" + currentConf + "'"); |
|
26 |
|
|
27 |
job.getParameters().put("dedup.conf", currentConf.toString()); |
|
28 |
|
|
29 |
token.getEnv().setAttribute("dedup.conf", currentConf.toString()); |
|
30 |
} |
|
31 |
|
|
32 |
protected DedupConfigurationOrchestration dedupConfigurations(final NodeToken token) { |
|
33 |
final String configs = token.getFullEnv().getAttribute(getDedupConfigSequenceParam()); |
|
34 |
if ((configs == null) || configs.trim().isEmpty()) |
|
35 |
throw new IllegalStateException("Cannot find dedup configurations in workflow env: '" + getDedupConfigSequenceParam() + "'"); |
|
36 |
|
|
37 |
return DedupConfigurationOrchestration.fromJSON(configs); |
|
38 |
} |
|
39 |
|
|
40 |
protected String getEntityType(final NodeToken token) { |
|
41 |
final String entityType = token.getEnv().getAttribute("entityType"); |
|
42 |
if (StringUtils.isBlank(entityType)) throw new IllegalStateException("Cannot find 'entityType' parameter in workflow env."); |
|
43 |
return entityType; |
|
44 |
} |
|
45 |
|
|
46 |
// ////////// |
|
47 |
|
|
48 |
public String getDedupConfigSequenceParam() { |
|
49 |
return dedupConfigSequenceParam; |
|
50 |
} |
|
51 |
|
|
52 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
53 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
54 |
} |
|
55 |
|
|
56 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupCheckEntitySequenceJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.Queue; |
|
4 |
|
|
5 |
import org.apache.commons.lang.StringUtils; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.springframework.beans.factory.annotation.Autowired; |
|
9 |
|
|
10 |
import com.google.common.base.Function; |
|
11 |
import com.google.common.base.Splitter; |
|
12 |
import com.google.common.collect.Iterables; |
|
13 |
import com.google.common.collect.Lists; |
|
14 |
import com.googlecode.sarasvati.Arc; |
|
15 |
import com.googlecode.sarasvati.NodeToken; |
|
16 |
|
|
17 |
import eu.dnetlib.msro.rmi.MSROException; |
|
18 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
19 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader; |
|
20 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
21 |
|
|
22 |
public class DedupCheckEntitySequenceJobNode extends AsyncJobNode { |
|
23 |
|
|
24 |
private static final Log log = LogFactory.getLog(DedupCheckEntitySequenceJobNode.class); |
|
25 |
|
|
26 |
@Autowired |
|
27 |
private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader; |
|
28 |
|
|
29 |
private String dedupConfigSequenceParam; |
|
30 |
|
|
31 |
private String entitySequence; |
|
32 |
|
|
33 |
@Override |
|
34 |
protected String execute(final NodeToken token) throws Exception { |
|
35 |
|
|
36 |
if (StringUtils.isBlank(getEntitySequence())) throw new MSROException("missing entity sequence, e.g. a csv: organization,person,result"); |
|
37 |
|
|
38 |
if (token.getFullEnv().hasAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER)) { |
|
39 |
log.info("reset env variable: " + DedupGrouperJobNode.DEDUP_GROUPER_LOOPER + " to zero"); |
|
40 |
token.getFullEnv().setAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER, 0); |
|
41 |
} |
|
42 |
|
|
43 |
if (!token.getEnv().hasAttribute("entitySequence")) { |
|
44 |
|
|
45 |
log.info("parsing config sequence: " + getEntitySequence()); |
|
46 |
|
|
47 |
token.getEnv().setAttribute("entitySequence", getEntitySequence()); |
|
48 |
|
|
49 |
final Iterable<String> sequence = Splitter.on(",").omitEmptyStrings().split(getEntitySequence()); |
|
50 |
final Queue<DedupConfigurationOrchestration> q = |
|
51 |
Lists.newLinkedList(Iterables.transform(sequence, new Function<String, DedupConfigurationOrchestration>() { |
|
52 |
|
|
53 |
@Override |
|
54 |
public DedupConfigurationOrchestration apply(final String entityName) { |
|
55 |
try { |
|
56 |
final DedupConfigurationOrchestration dco = Iterables.getFirst(dedupOrchestrationLoader.loadByEntityName(entityName), null); |
|
57 |
if (dco == null) throw new RuntimeException("unable to find DedupOrchestration profile for entity type: " + entityName); |
|
58 |
return dco; |
|
59 |
} catch (final Throwable e) { |
|
60 |
throw new RuntimeException("", e); |
|
61 |
} |
|
62 |
} |
|
63 |
})); |
|
64 |
|
|
65 |
log.info("built sequence of dedup orchestration profiles, size: " + q.size()); |
|
66 |
final DedupConfigurationOrchestration dco = q.remove(); |
|
67 |
log.info("closing mesh for entity: " + dco.getEntity().getName()); |
|
68 |
setDedupConfParams(token, dco); |
|
69 |
token.getEnv().setTransientAttribute("entitySequenceQueue", q); |
|
70 |
|
|
71 |
return Arc.DEFAULT_ARC; |
|
72 |
} |
|
73 |
|
|
74 |
@SuppressWarnings("unchecked") |
|
75 |
final Queue<DedupConfigurationOrchestration> q = (Queue<DedupConfigurationOrchestration>) token.getEnv().getTransientAttribute("entitySequenceQueue"); |
|
76 |
|
|
77 |
if (!q.isEmpty()) { |
|
78 |
log.info("remaining dedup orchestration profiles: " + q.size()); |
|
79 |
final DedupConfigurationOrchestration dco = q.remove(); |
|
80 |
log.info("closing mesh for entity: " + dco.getEntity().getName()); |
|
81 |
|
|
82 |
setDedupConfParams(token, dco); |
|
83 |
return Arc.DEFAULT_ARC; |
|
84 |
} |
|
85 |
|
|
86 |
log.info("completed closing mesh for entities: " + getEntitySequence()); |
|
87 |
return "done"; |
|
88 |
|
|
89 |
} |
|
90 |
|
|
91 |
private void setDedupConfParams(final NodeToken token, final DedupConfigurationOrchestration dco) { |
|
92 |
token.getEnv().setAttribute("entityType", dco.getEntity().getName()); |
|
93 |
token.getEnv().setAttribute("entityTypeId", dco.getEntity().getCode()); |
|
94 |
token.getEnv().setAttribute(getDedupConfigSequenceParam(), dco.toString()); |
|
95 |
} |
|
96 |
|
|
97 |
public String getEntitySequence() { |
|
98 |
return entitySequence; |
|
99 |
} |
|
100 |
|
|
101 |
public void setEntitySequence(final String entitySequence) { |
|
102 |
this.entitySequence = entitySequence; |
|
103 |
} |
|
104 |
|
|
105 |
public String getDedupConfigSequenceParam() { |
|
106 |
return dedupConfigSequenceParam; |
|
107 |
} |
|
108 |
|
|
109 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
110 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
111 |
} |
|
112 |
|
|
113 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/ResetCountersJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.lang.StringUtils; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
|
|
9 |
import com.google.common.base.Splitter; |
|
10 |
import com.googlecode.sarasvati.Arc; |
|
11 |
import com.googlecode.sarasvati.NodeToken; |
|
12 |
|
|
13 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
14 |
|
|
15 |
public class ResetCountersJobNode extends SimpleJobNode { |
|
16 |
|
|
17 |
private static final Log log = LogFactory.getLog(ResetCountersJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
18 |
|
|
19 |
private String attributesCSV; |
|
20 |
|
|
21 |
@Override |
|
22 |
protected String execute(final NodeToken token) throws Exception { |
|
23 |
|
|
24 |
if (StringUtils.isNotBlank(getAttributesCSV())) { |
|
25 |
|
|
26 |
log.info("got wf attributes CSV: " + getAttributesCSV()); |
|
27 |
|
|
28 |
final Splitter splitter = Splitter.on(",").trimResults().omitEmptyStrings(); |
|
29 |
final List<String> wfAttrs = splitter.splitToList(getAttributesCSV()); |
|
30 |
|
|
31 |
for (final String attr : wfAttrs) { |
|
32 |
resetWorkflowParam(token, attr); |
|
33 |
} |
|
34 |
|
|
35 |
} else { |
|
36 |
log.info("attribute list is empty, nothing to do here."); |
|
37 |
} |
|
38 |
|
|
39 |
return Arc.DEFAULT_ARC; |
|
40 |
} |
|
41 |
|
|
42 |
private void resetWorkflowParam(final NodeToken token, final String attribute) { |
|
43 |
final String count = token.getFullEnv().getAttribute(attribute); |
|
44 |
if (StringUtils.isNotBlank(count)) { |
|
45 |
log.info(String.format("found loop counter '%s', value '%s'", attribute, count)); |
|
46 |
token.getFullEnv().setAttribute(attribute, 0); |
|
47 |
|
|
48 |
log.info(String.format("set '%s', to 0", attribute)); |
|
49 |
} else { |
|
50 |
log.info("loop counter was not found in workflow env, nothing to do here."); |
|
51 |
} |
|
52 |
} |
|
53 |
|
|
54 |
public String getAttributesCSV() { |
|
55 |
return attributesCSV; |
|
56 |
} |
|
57 |
|
|
58 |
public void setAttributesCSV(final String attributesCSV) { |
|
59 |
this.attributesCSV = attributesCSV; |
|
60 |
} |
|
61 |
|
|
62 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupCheckConfigurationJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.springframework.beans.factory.annotation.Autowired; |
|
8 |
|
|
9 |
import com.googlecode.sarasvati.Arc; |
|
10 |
import com.googlecode.sarasvati.NodeToken; |
|
11 |
|
|
12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
13 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
14 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
15 |
import eu.dnetlib.msro.rmi.MSROException; |
|
16 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
17 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
18 |
|
|
19 |
public class DedupCheckConfigurationJobNode extends AsyncJobNode { |
|
20 |
|
|
21 |
private static final Log log = LogFactory.getLog(DedupCheckConfigurationJobNode.class); |
|
22 |
|
|
23 |
@Autowired |
|
24 |
private UniqueServiceLocator serviceLocator; |
|
25 |
|
|
26 |
private String dedupConfigSequenceParam; |
|
27 |
|
|
28 |
@Override |
|
29 |
protected String execute(final NodeToken token) throws Exception { |
|
30 |
|
|
31 |
final String dcoJson = token.getEnv().getAttribute(getDedupConfigSequenceParam()); |
|
32 |
|
|
33 |
final DedupConfigurationOrchestration dco = DedupConfigurationOrchestration.fromJSON(dcoJson); |
|
34 |
|
|
35 |
if (!existActionSetProfile(dco)) throw new MSROException("missing action set profile: " + dco.getActionSetId()); |
|
36 |
|
|
37 |
log.info("found action set profile: " + dco.getActionSetId()); |
|
38 |
|
|
39 |
return Arc.DEFAULT_ARC; |
|
40 |
} |
|
41 |
|
|
42 |
private boolean existActionSetProfile(final DedupConfigurationOrchestration dco) throws ISLookUpException { |
|
43 |
final String xquery = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'DedupOrchestrationDSResourceType' and .//ACTION_SET/@id='%s'] return 1"; |
|
44 |
log.info("looking for action set profile, id: " + dco.getActionSetId()); |
|
45 |
final List<String> actionSets = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(xquery, dco.getActionSetId())); |
|
46 |
return !actionSets.isEmpty(); |
|
47 |
} |
|
48 |
|
|
49 |
public String getDedupConfigSequenceParam() { |
|
50 |
return dedupConfigSequenceParam; |
|
51 |
} |
|
52 |
|
|
53 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
54 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
55 |
} |
|
56 |
|
|
57 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/functionality/modular/ui/workflows/values/ListDedupOrchestrationValues.java | ||
---|---|---|
1 |
package eu.dnetlib.functionality.modular.ui.workflows.values; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import com.google.common.collect.Lists; |
|
9 |
|
|
10 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
11 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
12 |
import eu.dnetlib.msro.workflows.util.ValidNodeValuesFetcher; |
|
13 |
|
|
14 |
public class ListDedupOrchestrationValues extends ValidNodeValuesFetcher { |
|
15 |
|
|
16 |
@Resource |
|
17 |
private UniqueServiceLocator serviceLocator; |
|
18 |
|
|
19 |
@Override |
|
20 |
protected List<DnetParamValue> obtainValues(final Map<String, String> params) throws Exception { |
|
21 |
|
|
22 |
final String xquery = |
|
23 |
"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='DedupOrchestrationDSResourceType'] return $x//ACTION_SET/@id/string()"; |
|
24 |
|
|
25 |
final List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery); |
|
26 |
final List<DnetParamValue> values = Lists.newArrayList(); |
|
27 |
|
|
28 |
for (final String s : result) { |
|
29 |
values.add(new DnetParamValue(s, s)); |
|
30 |
} |
|
31 |
|
|
32 |
return values; |
|
33 |
} |
|
34 |
|
|
35 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/FinalizeDedupIndexJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import static java.lang.String.format; |
|
4 |
|
|
5 |
import org.apache.commons.lang.StringUtils; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
|
|
9 |
import com.googlecode.sarasvati.NodeToken; |
|
10 |
|
|
11 |
import eu.dnetlib.data.provision.index.rmi.IndexService; |
|
12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException; |
|
13 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
14 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
15 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
16 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
|
17 |
import eu.dnetlib.msro.rmi.MSROException; |
|
18 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
19 |
|
|
20 |
public class FinalizeDedupIndexJobNode extends BlackboardJobNode { |
|
21 |
|
|
22 |
private static final Log log = LogFactory.getLog(FinalizeDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
@Override |
|
25 |
protected String obtainServiceId(final NodeToken token) { |
|
26 |
return getServiceLocator().getServiceId(IndexService.class); |
|
27 |
} |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
31 |
final String indexDsId = getEnvParam(token, "index_id"); |
|
32 |
|
|
33 |
log.info("preparing blackboard job DELETE_BY_QUERY index: " + indexDsId); |
|
34 |
|
|
35 |
final String backendId = getBackendId(indexDsId); |
|
36 |
if (StringUtils.isBlank(backendId)) |
|
37 |
throw new MSROException("empty index backend Id"); |
|
38 |
|
|
39 |
job.setAction("DELETE_BY_QUERY"); |
|
40 |
job.getParameters().put("id", indexDsId); |
|
41 |
job.getParameters().put("backend_Id", backendId); |
|
42 |
job.getParameters().put("query", |
|
43 |
buildQuery(getEnvParam(token, "entityType"), getEnvParam(token, "index.feed.timestamp"), getEnvParam(token, "actionset"))); |
|
44 |
} |
|
45 |
|
|
46 |
private String buildQuery(final String entityType, final String version, final String actionset) { |
|
47 |
final String query = |
|
48 |
String.format("__dsversion:{* TO %s} AND oaftype:%s AND actionset:%s", InputDocumentFactory.getParsedDateField(version), entityType, actionset); |
|
49 |
|
|
50 |
log.info("delete by query: " + query); |
|
51 |
|
|
52 |
return query; |
|
53 |
} |
|
54 |
|
|
55 |
private String getEnvParam(final NodeToken token, final String name) throws MSROException { |
|
56 |
final String value = token.getEnv().getAttribute(name); |
|
57 |
|
|
58 |
if (StringUtils.isBlank(value)) |
|
59 |
throw new MSROException(format("unable to finalize index feeding, cannot find property '%s' in the workflow env.", name)); |
|
60 |
|
|
61 |
return value; |
|
62 |
} |
|
63 |
|
|
64 |
public String getBackendId(final String indexDsId) throws ISLookUpDocumentNotFoundException, ISLookUpException { |
|
65 |
return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery( |
|
66 |
"//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='" + indexDsId + "']//BACKEND/text()"); |
|
67 |
} |
|
68 |
|
|
69 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupConfigurationSetterJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import org.apache.commons.lang.StringUtils; |
|
4 |
import org.springframework.beans.factory.annotation.Autowired; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Arc; |
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
|
|
9 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
10 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader; |
|
11 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
12 |
|
|
13 |
public class DedupConfigurationSetterJobNode extends AsyncJobNode { |
|
14 |
|
|
15 |
private String dedupConfigSequence; |
|
16 |
|
|
17 |
private String dedupConfigSequenceParam; |
|
18 |
|
|
19 |
@Autowired |
|
20 |
private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader; |
|
21 |
|
|
22 |
@Override |
|
23 |
protected String execute(final NodeToken token) throws Exception { |
|
24 |
|
|
25 |
if (StringUtils.isBlank(getDedupConfigSequence())) throw new IllegalArgumentException("missing configuration sequence"); |
|
26 |
|
|
27 |
final DedupConfigurationOrchestration dedupOrchestration = dedupOrchestrationLoader.loadByActionSetId(getDedupConfigSequence()); |
|
28 |
|
|
29 |
token.getEnv().setAttribute("entityType", dedupOrchestration.getEntity().getName()); |
|
30 |
token.getEnv().setAttribute("entityTypeId", dedupOrchestration.getEntity().getCode()); |
|
31 |
|
|
32 |
token.getEnv().setAttribute(getDedupConfigSequenceParam(), dedupOrchestration.toString()); |
|
33 |
|
|
34 |
return Arc.DEFAULT_ARC; |
|
35 |
} |
|
36 |
|
|
37 |
public String getDedupConfigSequence() { |
|
38 |
return dedupConfigSequence; |
|
39 |
} |
|
40 |
|
|
41 |
public void setDedupConfigSequence(final String dedupConfigSequence) { |
|
42 |
this.dedupConfigSequence = dedupConfigSequence; |
|
43 |
} |
|
44 |
|
|
45 |
public String getDedupConfigSequenceParam() { |
|
46 |
return dedupConfigSequenceParam; |
|
47 |
} |
|
48 |
|
|
49 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
50 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
51 |
} |
|
52 |
|
|
53 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/actions/PrepareConfiguredActionSetJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import com.google.common.collect.Lists; |
|
7 |
import com.google.common.collect.Maps; |
|
8 |
import com.google.gson.Gson; |
|
9 |
import com.googlecode.sarasvati.Arc; |
|
10 |
import com.googlecode.sarasvati.NodeToken; |
|
11 |
import eu.dnetlib.actionmanager.set.RawSet; |
|
12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
13 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
14 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
15 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
16 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
17 |
import org.apache.commons.lang.StringUtils; |
|
18 |
import org.apache.commons.logging.Log; |
|
19 |
import org.apache.commons.logging.LogFactory; |
|
20 |
import org.springframework.beans.factory.annotation.Autowired; |
|
21 |
|
|
22 |
/** |
|
23 |
* The Class PrepareConfiguredActionSetJobNode. |
|
24 |
*/ |
|
25 |
public class PrepareConfiguredActionSetJobNode extends SimpleJobNode { |
|
26 |
|
|
27 |
/** |
|
28 |
* logger. |
|
29 |
*/ |
|
30 |
private static final Log log = LogFactory.getLog(PrepareConfiguredActionSetJobNode.class); |
|
31 |
|
|
32 |
/** |
|
33 |
* The dedup config sequence param. |
|
34 |
*/ |
|
35 |
private String dedupConfigSequenceParam; |
|
36 |
|
|
37 |
/** |
|
38 |
* The job property. |
|
39 |
*/ |
|
40 |
private String jobProperty; |
|
41 |
|
|
42 |
/** |
|
43 |
* The action set path param name. |
|
44 |
*/ |
|
45 |
private String actionSetPathParam; |
|
46 |
|
|
47 |
/** |
|
48 |
* The service locator. |
|
49 |
*/ |
|
50 |
@Autowired |
|
51 |
private UniqueServiceLocator serviceLocator; |
|
52 |
|
|
53 |
/* |
|
54 |
* (non-Javadoc) |
|
55 |
* |
|
56 |
* @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
57 |
*/ |
|
58 |
@Override |
|
59 |
protected String execute(final NodeToken token) throws Exception { |
|
60 |
|
|
61 |
final List<Map<String, String>> setList = Lists.newArrayList(); |
|
62 |
|
|
63 |
final Map<String, String> set = Maps.newHashMap(); |
|
64 |
|
|
65 |
final String actionSetId = getActionSetId(token); |
|
66 |
final ISLookUpService isLookUpService = serviceLocator.getService(ISLookUpService.class); |
|
67 |
final String basePath = isLookUpService.getResourceProfileByQuery( |
|
68 |
"/RESOURCE_PROFILE[./HEADER/RESOURCE_TYPE/@value='ActionManagerServiceResourceType']//SERVICE_PROPERTIES/PROPERTY[@key='basePath']/@value/string()"); |
|
69 |
if (StringUtils.isBlank(basePath)) { |
|
70 |
throw new IllegalStateException("missing basePath in ActionManagerService"); |
|
71 |
} |
|
72 |
|
|
73 |
final String actionSetDirectory = isLookUpService.getResourceProfileByQuery( |
|
74 |
"/RESOURCE_PROFILE[./HEADER/RESOURCE_TYPE/@value='ActionManagerSetDSResourceType' and .//SET/@id = '"+actionSetId+"']//SET/@ directory/string()"); |
|
75 |
|
|
76 |
if (StringUtils.isBlank(actionSetDirectory)) { |
|
77 |
throw new IllegalStateException("missing directory in ActionSet profile: " + actionSetId); |
|
78 |
} |
|
79 |
|
|
80 |
final String rawSetId = RawSet.newInstance().getId(); |
|
81 |
set.put("rawset", rawSetId); |
|
82 |
set.put("creationDate", DateUtils.now_ISO8601()); |
|
83 |
set.put("set", actionSetId); |
|
84 |
set.put("enabled", "true"); |
|
85 |
set.put("jobProperty", getJobProperty()); |
|
86 |
|
|
87 |
token.getEnv().setAttribute(set.get("jobProperty"), set.get("rawset")); |
|
88 |
|
|
89 |
final String path = basePath + "/" + actionSetDirectory + "/" + rawSetId; |
|
90 |
log.info("using action set path: " + path); |
|
91 |
token.getEnv().setAttribute(getActionSetPathParam(), path); |
|
92 |
|
|
93 |
setList.add(set); |
|
94 |
final String sets = new Gson().toJson(setList); |
|
95 |
log.debug("built set: " + sets); |
|
96 |
|
|
97 |
token.getEnv().setAttribute("sets", sets); |
|
98 |
|
|
99 |
return Arc.DEFAULT_ARC; |
|
100 |
} |
|
101 |
|
|
102 |
/** |
|
103 |
* Gets the action set id. |
|
104 |
* |
|
105 |
* @param token the token |
|
106 |
* @return the action set id |
|
107 |
*/ |
|
108 |
private String getActionSetId(final NodeToken token) { |
|
109 |
final String json = token.getEnv().getAttribute(getDedupConfigSequenceParam()); |
|
110 |
final DedupConfigurationOrchestration dco = DedupConfigurationOrchestration.fromJSON(json); |
|
111 |
final String actionSetId = dco.getActionSetId(); |
|
112 |
log.info("found actionSetId in workflow env: " + actionSetId); |
|
113 |
return actionSetId; |
|
114 |
} |
|
115 |
|
|
116 |
/** |
|
117 |
* Gets the dedup config sequence param. |
|
118 |
* |
|
119 |
* @return the dedup config sequence param |
|
120 |
*/ |
|
121 |
public String getDedupConfigSequenceParam() { |
|
122 |
return dedupConfigSequenceParam; |
|
123 |
} |
|
124 |
|
|
125 |
/** |
|
126 |
* Sets the dedup config sequence param. |
|
127 |
* |
|
128 |
* @param dedupConfigSequenceParam the new dedup config sequence param |
|
129 |
*/ |
|
130 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
131 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
132 |
} |
|
133 |
|
|
134 |
/** |
|
135 |
* Gets the job property. |
|
136 |
* |
|
137 |
* @return the job property |
|
138 |
*/ |
|
139 |
public String getJobProperty() { |
|
140 |
return jobProperty; |
|
141 |
} |
|
142 |
|
|
143 |
/** |
|
144 |
* Sets the job property. |
|
145 |
* |
|
146 |
* @param jobProperty the new job property |
|
147 |
*/ |
|
148 |
public void setJobProperty(final String jobProperty) { |
|
149 |
this.jobProperty = jobProperty; |
|
150 |
} |
|
151 |
|
|
152 |
public String getActionSetPathParam() { |
|
153 |
return actionSetPathParam; |
|
154 |
} |
|
155 |
|
|
156 |
public void setActionSetPathParam(final String actionSetPathParam) { |
|
157 |
this.actionSetPathParam = actionSetPathParam; |
|
158 |
} |
|
159 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/functionality/modular/ui/dedup/EntityType.java | ||
---|---|---|
1 |
package eu.dnetlib.functionality.modular.ui.dedup; |
|
2 |
|
|
3 |
/** |
|
4 |
* Utility Class EntityType, helps the user interface to display the different entity types. |
|
5 |
*/ |
|
6 |
public class EntityType { |
|
7 |
|
|
8 |
/** The id. */ |
|
9 |
private String id; |
|
10 |
|
|
11 |
/** The type. */ |
|
12 |
private String type; |
|
13 |
|
|
14 |
/** The label. */ |
|
15 |
private String label; |
|
16 |
|
|
17 |
/** |
|
18 |
* Instantiates a new entity type. |
|
19 |
*/ |
|
20 |
public EntityType() {}; |
|
21 |
|
|
22 |
/** |
|
23 |
* Instantiates a new entity type. |
|
24 |
* |
|
25 |
* @param id |
|
26 |
* the id |
|
27 |
* @param type |
|
28 |
* the type |
|
29 |
* @param label |
|
30 |
* the label |
|
31 |
*/ |
|
32 |
public EntityType(final String id, final String type, final String label) { |
|
33 |
super(); |
|
34 |
setId(id); |
|
35 |
setType(type); |
|
36 |
setLabel(label); |
|
37 |
} |
|
38 |
|
|
39 |
/** |
|
40 |
* Gets the id. |
|
41 |
* |
|
42 |
* @return the id |
|
43 |
*/ |
|
44 |
public String getId() { |
|
45 |
return id; |
|
46 |
} |
|
47 |
|
|
48 |
/** |
|
49 |
* Sets the id. |
|
50 |
* |
|
51 |
* @param id |
|
52 |
* the new id |
|
53 |
*/ |
|
54 |
public void setId(final String id) { |
|
55 |
this.id = id; |
|
56 |
} |
|
57 |
|
|
58 |
/** |
|
59 |
* Gets the type. |
|
60 |
* |
|
61 |
* @return the type |
|
62 |
*/ |
|
63 |
public String getType() { |
|
64 |
return type; |
|
65 |
} |
|
66 |
|
|
67 |
/** |
|
68 |
* Sets the type. |
|
69 |
* |
|
70 |
* @param type |
|
71 |
* the new type |
|
72 |
*/ |
|
73 |
public void setType(final String type) { |
|
74 |
this.type = type; |
|
75 |
} |
|
76 |
|
|
77 |
/** |
|
78 |
* Gets the label. |
|
79 |
* |
|
80 |
* @return the label |
|
81 |
*/ |
|
82 |
public String getLabel() { |
|
83 |
return label; |
|
84 |
} |
|
85 |
|
|
86 |
/** |
|
87 |
* Sets the label. |
|
88 |
* |
|
89 |
* @param label |
|
90 |
* the new label |
|
91 |
*/ |
|
92 |
public void setLabel(final String label) { |
|
93 |
this.label = label; |
|
94 |
} |
|
95 |
|
|
96 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupDuplicateScanJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import org.apache.commons.collections.CollectionUtils; |
|
4 |
import org.apache.commons.logging.Log; |
|
5 |
import org.apache.commons.logging.LogFactory; |
|
6 |
|
|
7 |
import com.googlecode.sarasvati.Arc; |
|
8 |
import com.googlecode.sarasvati.Engine; |
|
9 |
import com.googlecode.sarasvati.NodeToken; |
|
10 |
|
|
11 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
12 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
13 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
|
14 |
|
|
15 |
public class DedupDuplicateScanJobNode extends DedupConfigurationAwareJobNode { |
|
16 |
|
|
17 |
private static final Log log = LogFactory.getLog(DedupDuplicateScanJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
18 |
|
|
19 |
private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener { |
|
20 |
|
|
21 |
public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) { |
|
22 |
super(engine, token); |
|
23 |
} |
|
24 |
|
|
25 |
@Override |
|
26 |
protected void onDone(final BlackboardJob job) { |
|
27 |
|
|
28 |
final DedupConfigurationOrchestration confs = dedupConfigurations(getToken()); |
|
29 |
|
|
30 |
confs.getConfigurations().poll(); |
|
31 |
|
|
32 |
log.info("checking dedup configs queue, size: " + confs.getConfigurations().size()); |
|
33 |
|
|
34 |
if (CollectionUtils.isEmpty(confs.getConfigurations())) { |
|
35 |
log.info("dedup similarity scan done"); |
|
36 |
super.complete(job, "done"); |
|
37 |
} else { |
|
38 |
log.debug("remaining confs: " + confs); |
|
39 |
|
|
40 |
getToken().getEnv().setAttribute(getDedupConfigSequenceParam(), confs.toString()); |
|
41 |
|
|
42 |
super.complete(job, Arc.DEFAULT_ARC); |
|
43 |
} |
|
44 |
} |
|
45 |
} |
|
46 |
|
|
47 |
@Override |
|
48 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) { |
|
49 |
return new DedupBlackboardWorkflowJobListener(engine, token); |
|
50 |
} |
|
51 |
|
|
52 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.5.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupSimilarityToActionsJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import org.apache.commons.lang.StringUtils; |
|
4 |
import org.apache.commons.logging.Log; |
|
5 |
import org.apache.commons.logging.LogFactory; |
|
6 |
|
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
|
|
9 |
import eu.dnetlib.data.proto.DedupSimilarityProtos.DedupSimilarity; |
|
10 |
import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
|
11 |
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType; |
|
12 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
13 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
14 |
import eu.dnetlib.msro.rmi.MSROException; |
|
15 |
|
|
16 |
public class DedupSimilarityToActionsJobNode extends DedupConfigurationAwareJobNode { |
|
17 |
|
|
18 |
private static final Log log = LogFactory.getLog(DedupSimilarityToActionsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
19 |
|
|
20 |
@Override |
|
21 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
22 |
super.prepareJob(job, token); |
|
23 |
|
|
24 |
final String entityType = token.getEnv().getAttribute("entityType"); |
|
25 |
if (StringUtils.isBlank(entityType)) |
|
26 |
throw new MSROException("unable to find wf param: entityType"); |
|
27 |
|
|
28 |
String cf = "_" + SubRelType.dedupSimilarity + "_" + DedupSimilarity.RelName.isSimilarTo; |
|
29 |
switch (Type.valueOf(entityType)) { |
|
30 |
|
|
31 |
case organization: |
|
32 |
cf = RelType.organizationOrganization + cf; |
|
33 |
break; |
|
34 |
case person: |
|
35 |
cf = RelType.personPerson + cf; |
|
36 |
break; |
|
37 |
case result: |
|
38 |
cf = RelType.resultResult + cf; |
|
39 |
break; |
|
40 |
default: |
|
41 |
throw new MSROException("invalid parameter entityType: " + entityType); |
|
42 |
} |
|
43 |
|
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-deduplication-1.5.8