Revision 37515
Added by Claudio Atzori over 9 years ago
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/deploy.info | ||
---|---|---|
1 |
{ |
|
2 |
"type_source": "SVN", |
|
3 |
"goal": "package -U source:jar", |
|
4 |
"url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-deduplication/trunk", |
|
5 |
"deploy_repository": "dnet4-snapshots", |
|
6 |
"version": "4", |
|
7 |
"mail": "alessia.bardi@isti.cnr.it", |
|
8 |
"deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", |
|
9 |
"name": "dnet-deduplication" |
|
10 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/test/java/eu/dnetlib/msro/workflows/dedup/conf/DedupConfigurationOrchestrationTest.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup.conf; |
|
2 |
|
|
3 |
import static org.junit.Assert.assertNotNull; |
|
4 |
import static org.junit.Assert.assertTrue; |
|
5 |
|
|
6 |
import java.io.IOException; |
|
7 |
import java.util.Queue; |
|
8 |
|
|
9 |
import org.junit.Before; |
|
10 |
import org.junit.Test; |
|
11 |
|
|
12 |
import com.google.common.collect.Lists; |
|
13 |
|
|
14 |
import eu.dnetlib.pace.config.DedupConfig; |
|
15 |
|
|
16 |
public class DedupConfigurationOrchestrationTest { |
|
17 |
|
|
18 |
public DedupConfigurationOrchestration dco; |
|
19 |
|
|
20 |
@Before |
|
21 |
public void setUp() throws IOException { |
|
22 |
|
|
23 |
final Entity e = new Entity("result", "50", "Publication"); |
|
24 |
|
|
25 |
final String actionSetId = "001"; |
|
26 |
final Queue<DedupConfig> configurations = Lists.newLinkedList(); |
|
27 |
|
|
28 |
configurations.add(DedupConfig.loadDefault()); |
|
29 |
|
|
30 |
dco = new DedupConfigurationOrchestration(e, actionSetId, configurations); |
|
31 |
assertNotNull(dco); |
|
32 |
assertNotNull(dco.getActionSetId()); |
|
33 |
assertNotNull(dco.getEntity()); |
|
34 |
assertNotNull(dco.getConfigurations()); |
|
35 |
} |
|
36 |
|
|
37 |
@Test |
|
38 |
public void testSerialization() { |
|
39 |
|
|
40 |
final String json = dco.toString(); |
|
41 |
final DedupConfigurationOrchestration anotherDco = DedupConfigurationOrchestration.fromJSON(json); |
|
42 |
assertNotNull(anotherDco); |
|
43 |
assertTrue(json.equals(anotherDco.toString())); |
|
44 |
} |
|
45 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/test/java/eu/dnetlib/msro/workflows/dedup/SimilarityMeshBuilderTest.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.junit.Before; |
|
6 |
import org.junit.Test; |
|
7 |
|
|
8 |
import com.google.common.collect.Lists; |
|
9 |
|
|
10 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
11 |
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity; |
|
12 |
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder; |
|
13 |
|
|
14 |
public class SimilarityMeshBuilderTest { |
|
15 |
|
|
16 |
private List<String> list; |
|
17 |
|
|
18 |
@Before |
|
19 |
public void setUp() throws Exception { |
|
20 |
list = Lists.newArrayList(); |
|
21 |
for (int i = 0; i < 10; i++) { |
|
22 |
list.add(i + ""); |
|
23 |
} |
|
24 |
} |
|
25 |
|
|
26 |
@Test |
|
27 |
public void test() { |
|
28 |
final List<Similarity> combinations = SimilarityMeshBuilder.build(Type.result, list); |
|
29 |
|
|
30 |
System.out.println(combinations); |
|
31 |
System.out.println(combinations.size()); |
|
32 |
|
|
33 |
} |
|
34 |
|
|
35 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/PrepareConfiguredActionSetJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
|
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import com.google.common.collect.Maps; |
|
11 |
import com.google.gson.Gson; |
|
12 |
import com.googlecode.sarasvati.Arc; |
|
13 |
import com.googlecode.sarasvati.NodeToken; |
|
14 |
|
|
15 |
import eu.dnetlib.actionmanager.set.RawSet; |
|
16 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
17 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
18 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
19 |
|
|
20 |
/** |
|
21 |
* The Class PrepareConfiguredActionSetJobNode. |
|
22 |
*/ |
|
23 |
public class PrepareConfiguredActionSetJobNode extends SimpleJobNode { |
|
24 |
|
|
25 |
/** |
|
26 |
* logger. |
|
27 |
*/ |
|
28 |
private static final Log log = LogFactory.getLog(PrepareConfiguredActionSetJobNode.class); |
|
29 |
|
|
30 |
/** The dedup config sequence param. */ |
|
31 |
private String dedupConfigSequenceParam; |
|
32 |
|
|
33 |
/** The job property. */ |
|
34 |
private String jobProperty; |
|
35 |
|
|
36 |
/* |
|
37 |
* (non-Javadoc) |
|
38 |
* |
|
39 |
* @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
40 |
*/ |
|
41 |
@Override |
|
42 |
protected String execute(final NodeToken token) throws Exception { |
|
43 |
|
|
44 |
final List<Map<String, String>> setList = Lists.newArrayList(); |
|
45 |
|
|
46 |
final Map<String, String> set = Maps.newHashMap(); |
|
47 |
|
|
48 |
set.put("rawset", RawSet.newInstance().getId()); |
|
49 |
set.put("creationDate", DateUtils.now_ISO8601()); |
|
50 |
set.put("set", getActionSetId(token)); |
|
51 |
set.put("enabled", "true"); |
|
52 |
set.put("jobProperty", getJobProperty()); |
|
53 |
|
|
54 |
token.getEnv().setAttribute(set.get("jobProperty"), set.get("rawset")); |
|
55 |
|
|
56 |
setList.add(set); |
|
57 |
final String sets = new Gson().toJson(setList); |
|
58 |
log.debug("built set: " + sets); |
|
59 |
|
|
60 |
token.getEnv().setAttribute("sets", sets); |
|
61 |
|
|
62 |
return Arc.DEFAULT_ARC; |
|
63 |
} |
|
64 |
|
|
65 |
/** |
|
66 |
* Gets the action set id. |
|
67 |
* |
|
68 |
* @param token |
|
69 |
* the token |
|
70 |
* @return the action set id |
|
71 |
*/ |
|
72 |
private String getActionSetId(final NodeToken token) { |
|
73 |
final String json = token.getEnv().getAttribute(getDedupConfigSequenceParam()); |
|
74 |
final DedupConfigurationOrchestration dco = DedupConfigurationOrchestration.fromJSON(json); |
|
75 |
final String actionSetId = dco.getActionSetId(); |
|
76 |
log.info("found actionSetId in workflow env: " + actionSetId); |
|
77 |
return actionSetId; |
|
78 |
} |
|
79 |
|
|
80 |
/** |
|
81 |
* Gets the dedup config sequence param. |
|
82 |
* |
|
83 |
* @return the dedup config sequence param |
|
84 |
*/ |
|
85 |
public String getDedupConfigSequenceParam() { |
|
86 |
return dedupConfigSequenceParam; |
|
87 |
} |
|
88 |
|
|
89 |
/** |
|
90 |
* Sets the dedup config sequence param. |
|
91 |
* |
|
92 |
* @param dedupConfigSequenceParam |
|
93 |
* the new dedup config sequence param |
|
94 |
*/ |
|
95 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
96 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
97 |
} |
|
98 |
|
|
99 |
/** |
|
100 |
* Gets the job property. |
|
101 |
* |
|
102 |
* @return the job property |
|
103 |
*/ |
|
104 |
public String getJobProperty() { |
|
105 |
return jobProperty; |
|
106 |
} |
|
107 |
|
|
108 |
/** |
|
109 |
* Sets the job property. |
|
110 |
* |
|
111 |
* @param jobProperty |
|
112 |
* the new job property |
|
113 |
*/ |
|
114 |
public void setJobProperty(final String jobProperty) { |
|
115 |
this.jobProperty = jobProperty; |
|
116 |
} |
|
117 |
|
|
118 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/CleanActionSetsProfileJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import javax.annotation.Resource; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.springframework.beans.factory.annotation.Required; |
|
8 |
|
|
9 |
import com.googlecode.sarasvati.Arc; |
|
10 |
import com.googlecode.sarasvati.NodeToken; |
|
11 |
|
|
12 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
13 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
14 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
15 |
|
|
16 |
public class CleanActionSetsProfileJobNode extends SimpleJobNode { |
|
17 |
|
|
18 |
/** |
|
19 |
* logger. |
|
20 |
*/ |
|
21 |
private static final Log log = LogFactory.getLog(CleanActionSetsProfileJobNode.class); |
|
22 |
|
|
23 |
@Resource |
|
24 |
private UniqueServiceLocator serviceLocator; |
|
25 |
|
|
26 |
private String xupdate; |
|
27 |
|
|
28 |
@Override |
|
29 |
protected String execute(final NodeToken token) throws Exception { |
|
30 |
log.info("updating Action Sets profiles: " + getXupdate()); |
|
31 |
serviceLocator.getService(ISRegistryService.class).executeXUpdate(getXupdate()); |
|
32 |
return Arc.DEFAULT_ARC; |
|
33 |
} |
|
34 |
|
|
35 |
public String getXupdate() { |
|
36 |
return xupdate; |
|
37 |
} |
|
38 |
|
|
39 |
@Required |
|
40 |
public void setXupdate(final String xupdate) { |
|
41 |
this.xupdate = xupdate; |
|
42 |
} |
|
43 |
|
|
44 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/UpdateSetsJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Map; |
|
6 |
|
|
7 |
import javax.annotation.Resource; |
|
8 |
|
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.dom4j.Attribute; |
|
12 |
import org.dom4j.Document; |
|
13 |
import org.dom4j.Element; |
|
14 |
import org.dom4j.io.SAXReader; |
|
15 |
|
|
16 |
import com.google.gson.Gson; |
|
17 |
import com.googlecode.sarasvati.Arc; |
|
18 |
import com.googlecode.sarasvati.NodeToken; |
|
19 |
|
|
20 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
21 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
22 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
23 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
24 |
import eu.dnetlib.msro.rmi.MSROException; |
|
25 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
26 |
|
|
27 |
public class UpdateSetsJobNode extends SimpleJobNode { |
|
28 |
|
|
29 |
/** |
|
30 |
* logger. |
|
31 |
*/ |
|
32 |
private static final Log log = LogFactory.getLog(UpdateSetsJobNode.class); |
|
33 |
|
|
34 |
@Resource |
|
35 |
private UniqueServiceLocator serviceLocator; |
|
36 |
|
|
37 |
@Override |
|
38 |
protected String execute(final NodeToken token) throws Exception { |
|
39 |
|
|
40 |
@SuppressWarnings("unchecked") |
|
41 |
final List<Map<String, String>> sets = new Gson().fromJson(token.getEnv().getAttribute("sets"), List.class); |
|
42 |
|
|
43 |
final String lastUpdate = DateUtils.now_ISO8601(); |
|
44 |
for (Map<String, String> set : sets) { |
|
45 |
|
|
46 |
// update only the enabled sets. |
|
47 |
if (isEnabled(set)) { |
|
48 |
log.info("updating set: " + set.toString()); |
|
49 |
addLatestRawSet(set, lastUpdate); |
|
50 |
} else { |
|
51 |
log.info("skip set update: " + set.toString()); |
|
52 |
} |
|
53 |
} |
|
54 |
|
|
55 |
return Arc.DEFAULT_ARC; |
|
56 |
} |
|
57 |
|
|
58 |
private boolean isEnabled(final Map<String, String> set) { |
|
59 |
return set.containsKey("enabled") && set.get("enabled").equals("true"); |
|
60 |
} |
|
61 |
|
|
62 |
public void addLatestRawSet(final Map<String, String> set, final String lastUpdate) throws MSROException { |
|
63 |
final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//SET/@id = '" + set.get("set") |
|
64 |
+ "' return $x"; |
|
65 |
try { |
|
66 |
final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(q); |
|
67 |
final Document doc = new SAXReader().read(new StringReader(profile)); |
|
68 |
final String profId = doc.valueOf("//RESOURCE_IDENTIFIER/@value"); |
|
69 |
final Element latest = (Element) doc.selectSingleNode("//RAW_SETS/LATEST"); |
|
70 |
final Element expired = ((Element) doc.selectSingleNode("//RAW_SETS")).addElement("EXPIRED"); |
|
71 |
|
|
72 |
for (Object o : latest.attributes()) { |
|
73 |
final Attribute a = (Attribute) o; |
|
74 |
expired.addAttribute(a.getName(), a.getValue()); |
|
75 |
} |
|
76 |
|
|
77 |
latest.addAttribute("id", set.get("rawset")); |
|
78 |
latest.addAttribute("creationDate", set.get("creationDate")); |
|
79 |
latest.addAttribute("lastUpdate", lastUpdate); |
|
80 |
|
|
81 |
serviceLocator.getService(ISRegistryService.class).updateProfile(profId, doc.asXML(), "ActionManagerSetDSResourceType"); |
|
82 |
} catch (Exception e) { |
|
83 |
String msg = "Error updating profile of set: " + set; |
|
84 |
log.error(msg); |
|
85 |
throw new MSROException(msg, e); |
|
86 |
} |
|
87 |
} |
|
88 |
|
|
89 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/PromoteActionsJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import org.apache.commons.lang.StringUtils; |
|
4 |
|
|
5 |
import com.googlecode.sarasvati.NodeToken; |
|
6 |
|
|
7 |
import eu.dnetlib.actionmanager.rmi.ActionManagerService; |
|
8 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
9 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
10 |
|
|
11 |
public class PromoteActionsJobNode extends BlackboardJobNode { |
|
12 |
|
|
13 |
public static final String ALL_SETS = "ALL SETS"; |
|
14 |
|
|
15 |
private String set; |
|
16 |
|
|
17 |
@Override |
|
18 |
protected String obtainServiceId(final NodeToken token) { |
|
19 |
return getServiceLocator().getServiceId(ActionManagerService.class); |
|
20 |
} |
|
21 |
|
|
22 |
@Override |
|
23 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
24 |
job.setAction("PROMOTE"); |
|
25 |
if (!StringUtils.isBlank(getSet()) && !getSet().equals(ALL_SETS)) { |
|
26 |
job.getParameters().put("set", getSet()); |
|
27 |
token.getEnv().setAttribute("set", getSet()); |
|
28 |
} |
|
29 |
} |
|
30 |
|
|
31 |
public String getSet() { |
|
32 |
return set; |
|
33 |
} |
|
34 |
|
|
35 |
public void setSet(final String set) { |
|
36 |
this.set = set; |
|
37 |
} |
|
38 |
|
|
39 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/GarbageSetsJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import org.apache.commons.logging.Log; |
|
4 |
import org.apache.commons.logging.LogFactory; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.NodeToken; |
|
7 |
|
|
8 |
import eu.dnetlib.actionmanager.rmi.ActionManagerService; |
|
9 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
10 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
11 |
|
|
12 |
public class GarbageSetsJobNode extends BlackboardJobNode { |
|
13 |
|
|
14 |
/** |
|
15 |
* logger. |
|
16 |
*/ |
|
17 |
private static final Log log = LogFactory.getLog(GarbageSetsJobNode.class); |
|
18 |
|
|
19 |
@Override |
|
20 |
protected String obtainServiceId(final NodeToken token) { |
|
21 |
return getServiceLocator().getServiceId(ActionManagerService.class); |
|
22 |
} |
|
23 |
|
|
24 |
@Override |
|
25 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
26 |
log.info("preparing garbage collection for Action sets Job"); |
|
27 |
job.setAction("GARBAGE"); |
|
28 |
} |
|
29 |
|
|
30 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/PrepareActionSetsJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.actions; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
|
|
9 |
import com.google.common.base.Predicate; |
|
10 |
import com.google.common.collect.Maps; |
|
11 |
import com.google.gson.Gson; |
|
12 |
import com.googlecode.sarasvati.Arc; |
|
13 |
import com.googlecode.sarasvati.NodeToken; |
|
14 |
|
|
15 |
import eu.dnetlib.actionmanager.set.RawSet; |
|
16 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
17 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
18 |
|
|
19 |
public class PrepareActionSetsJobNode extends SimpleJobNode { |
|
20 |
|
|
21 |
/** |
|
22 |
* logger. |
|
23 |
*/ |
|
24 |
private static final Log log = LogFactory.getLog(PrepareActionSetsJobNode.class); |
|
25 |
|
|
26 |
private String sets; |
|
27 |
|
|
28 |
@Override |
|
29 |
protected String execute(final NodeToken token) throws Exception { |
|
30 |
|
|
31 |
final List<Map<String, String>> setList = getSetList(); |
|
32 |
final String now = DateUtils.now_ISO8601(); |
|
33 |
|
|
34 |
for (Map<String, String> set : setList) { |
|
35 |
set.put("rawset", RawSet.newInstance().getId()); |
|
36 |
set.put("creationDate", now); |
|
37 |
|
|
38 |
if (set.get("enabled").equals("true")) { |
|
39 |
log.info("preparing set: " + simplifySetInfo(set)); |
|
40 |
} |
|
41 |
// setting the job properties needed to name the rawsets |
|
42 |
token.getEnv().setAttribute(set.get("jobProperty"), set.get("rawset")); |
|
43 |
} |
|
44 |
|
|
45 |
token.getEnv().setAttribute("sets", new Gson().toJson(setList)); |
|
46 |
|
|
47 |
return Arc.DEFAULT_ARC; |
|
48 |
} |
|
49 |
|
|
50 |
private Map<String, String> simplifySetInfo(final Map<String, String> set) { |
|
51 |
return Maps.filterKeys(set, new Predicate<String>() { |
|
52 |
|
|
53 |
@Override |
|
54 |
public boolean apply(final String k) { |
|
55 |
|
|
56 |
return k.equals("set") || k.equals("rawset"); |
|
57 |
} |
|
58 |
}); |
|
59 |
} |
|
60 |
|
|
61 |
@SuppressWarnings("unchecked") |
|
62 |
protected List<Map<String, String>> getSetList() { |
|
63 |
return new Gson().fromJson(getSets(), List.class); |
|
64 |
} |
|
65 |
|
|
66 |
public String getSets() { |
|
67 |
return sets; |
|
68 |
} |
|
69 |
|
|
70 |
public void setSets(final String sets) { |
|
71 |
this.sets = sets; |
|
72 |
} |
|
73 |
|
|
74 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupConfigurationSetterJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import org.apache.commons.lang.StringUtils; |
|
4 |
import org.springframework.beans.factory.annotation.Autowired; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Arc; |
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
|
|
9 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
10 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader; |
|
11 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
12 |
|
|
13 |
public class DedupConfigurationSetterJobNode extends AsyncJobNode { |
|
14 |
|
|
15 |
private String dedupConfigSequence; |
|
16 |
|
|
17 |
private String dedupConfigSequenceParam; |
|
18 |
|
|
19 |
@Autowired |
|
20 |
private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader; |
|
21 |
|
|
22 |
@Override |
|
23 |
protected String execute(final NodeToken token) throws Exception { |
|
24 |
|
|
25 |
if (StringUtils.isBlank(getDedupConfigSequence())) throw new IllegalArgumentException("missing configuration sequence"); |
|
26 |
|
|
27 |
final DedupConfigurationOrchestration dedupOrchestration = dedupOrchestrationLoader.loadByActionSetId(getDedupConfigSequence()); |
|
28 |
|
|
29 |
token.getEnv().setAttribute("entityType", dedupOrchestration.getEntity().getName()); |
|
30 |
token.getEnv().setAttribute("entityTypeId", dedupOrchestration.getEntity().getCode()); |
|
31 |
|
|
32 |
token.getEnv().setAttribute(getDedupConfigSequenceParam(), dedupOrchestration.toString()); |
|
33 |
|
|
34 |
return Arc.DEFAULT_ARC; |
|
35 |
} |
|
36 |
|
|
37 |
public String getDedupConfigSequence() { |
|
38 |
return dedupConfigSequence; |
|
39 |
} |
|
40 |
|
|
41 |
public void setDedupConfigSequence(final String dedupConfigSequence) { |
|
42 |
this.dedupConfigSequence = dedupConfigSequence; |
|
43 |
} |
|
44 |
|
|
45 |
public String getDedupConfigSequenceParam() { |
|
46 |
return dedupConfigSequenceParam; |
|
47 |
} |
|
48 |
|
|
49 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
50 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
51 |
} |
|
52 |
|
|
53 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupDuplicateScanJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import org.apache.commons.collections.CollectionUtils; |
|
4 |
import org.apache.commons.logging.Log; |
|
5 |
import org.apache.commons.logging.LogFactory; |
|
6 |
|
|
7 |
import com.googlecode.sarasvati.Arc; |
|
8 |
import com.googlecode.sarasvati.Engine; |
|
9 |
import com.googlecode.sarasvati.NodeToken; |
|
10 |
|
|
11 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
12 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
13 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
|
14 |
|
|
15 |
public class DedupDuplicateScanJobNode extends DedupConfigurationAwareJobNode { |
|
16 |
|
|
17 |
private static final Log log = LogFactory.getLog(DedupDuplicateScanJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
18 |
|
|
19 |
private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener { |
|
20 |
|
|
21 |
public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) { |
|
22 |
super(engine, token); |
|
23 |
} |
|
24 |
|
|
25 |
@Override |
|
26 |
protected void onDone(final BlackboardJob job) { |
|
27 |
|
|
28 |
final DedupConfigurationOrchestration confs = dedupConfigurations(getToken()); |
|
29 |
|
|
30 |
confs.getConfigurations().poll(); |
|
31 |
|
|
32 |
log.info("checking dedup configs queue, size: " + confs.getConfigurations().size()); |
|
33 |
|
|
34 |
if (CollectionUtils.isEmpty(confs.getConfigurations())) { |
|
35 |
log.info("dedup similarity scan done"); |
|
36 |
super.complete(job, "done"); |
|
37 |
} else { |
|
38 |
log.debug("remaining confs: " + confs); |
|
39 |
|
|
40 |
getToken().getEnv().setAttribute(getDedupConfigSequenceParam(), confs.toString()); |
|
41 |
|
|
42 |
super.complete(job, Arc.DEFAULT_ARC); |
|
43 |
} |
|
44 |
} |
|
45 |
} |
|
46 |
|
|
47 |
@Override |
|
48 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) { |
|
49 |
return new DedupBlackboardWorkflowJobListener(engine, token); |
|
50 |
} |
|
51 |
|
|
52 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/BuildSimilarityMeshJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.Iterator; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Queue; |
|
7 |
|
|
8 |
import javax.annotation.Resource; |
|
9 |
import javax.xml.ws.wsaddressing.W3CEndpointReference; |
|
10 |
|
|
11 |
import org.antlr.stringtemplate.StringTemplate; |
|
12 |
import org.apache.commons.logging.Log; |
|
13 |
import org.apache.commons.logging.LogFactory; |
|
14 |
import org.dom4j.Document; |
|
15 |
import org.dom4j.DocumentException; |
|
16 |
import org.dom4j.Node; |
|
17 |
import org.dom4j.io.SAXReader; |
|
18 |
import org.springframework.beans.factory.annotation.Required; |
|
19 |
|
|
20 |
import com.google.common.collect.Lists; |
|
21 |
import com.google.common.collect.Queues; |
|
22 |
import com.googlecode.sarasvati.Arc; |
|
23 |
import com.googlecode.sarasvati.NodeToken; |
|
24 |
|
|
25 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
26 |
import eu.dnetlib.enabling.resultset.IterableResultSetFactory; |
|
27 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
28 |
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity; |
|
29 |
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder; |
|
30 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
31 |
|
|
32 |
public class BuildSimilarityMeshJobNode extends AsyncJobNode { |
|
33 |
|
|
34 |
private static final Log log = LogFactory.getLog(BuildSimilarityMeshJobNode.class); |
|
35 |
|
|
36 |
/** The result set factory. */ |
|
37 |
@Resource(name = "iterableResultSetFactory") |
|
38 |
private IterableResultSetFactory resultSetFactory; |
|
39 |
|
|
40 |
/** The result set client factory. */ |
|
41 |
@Resource(name = "resultSetClientFactory") |
|
42 |
private ResultSetClientFactory resultSetClientFactory; |
|
43 |
|
|
44 |
private StringTemplate similarity; |
|
45 |
|
|
46 |
private String inputEprParam; |
|
47 |
|
|
48 |
private String outputEprParam; |
|
49 |
|
|
50 |
@Override |
|
51 |
protected String execute(final NodeToken token) throws Exception { |
|
52 |
|
|
53 |
final String inputEpr = token.getEnv().getAttribute(getInputEprParam()); |
|
54 |
|
|
55 |
final Iterator<String> rsClient = resultSetClientFactory.getClient(inputEpr).iterator(); |
|
56 |
final Queue<Object> queue = Queues.newLinkedBlockingQueue(); |
|
57 |
final SAXReader reader = new SAXReader(); |
|
58 |
|
|
59 |
if (rsClient.hasNext()) { |
|
60 |
populateQueue(queue, reader, rsClient.next()); |
|
61 |
} |
|
62 |
|
|
63 |
final W3CEndpointReference eprOut = resultSetFactory.createIterableResultSet(new Iterable<String>() { |
|
64 |
|
|
65 |
@Override |
|
66 |
public Iterator<String> iterator() { |
|
67 |
return new Iterator<String>() { |
|
68 |
|
|
69 |
@Override |
|
70 |
public boolean hasNext() { |
|
71 |
synchronized (queue) { |
|
72 |
return !queue.isEmpty(); |
|
73 |
} |
|
74 |
} |
|
75 |
|
|
76 |
@Override |
|
77 |
public String next() { |
|
78 |
synchronized (queue) { |
|
79 |
final Object o = queue.poll(); |
|
80 |
while (queue.isEmpty() && rsClient.hasNext()) { |
|
81 |
populateQueue(queue, reader, rsClient.next()); |
|
82 |
} |
|
83 |
return buildSimilarity((Similarity) o); |
|
84 |
} |
|
85 |
} |
|
86 |
|
|
87 |
@Override |
|
88 |
public void remove() { |
|
89 |
throw new UnsupportedOperationException(); |
|
90 |
} |
|
91 |
}; |
|
92 |
} |
|
93 |
}); |
|
94 |
|
|
95 |
token.getEnv().setAttribute(getOutputEprParam(), eprOut.toString()); |
|
96 |
|
|
97 |
return Arc.DEFAULT_ARC; |
|
98 |
} |
|
99 |
|
|
100 |
private void populateQueue(final Queue<Object> q, final SAXReader r, final String xml) { |
|
101 |
try { |
|
102 |
final Document d = r.read(new StringReader(xml)); |
|
103 |
final String groupid = d.valueOf("//FIELD[@name='id']"); |
|
104 |
final List<?> items = d.selectNodes("//FIELD[@name='group']/ITEM"); |
|
105 |
final String entitytype = d.valueOf("//FIELD[@name='entitytype']"); |
|
106 |
final List<String> group = Lists.newArrayList(); |
|
107 |
for (final Object id : items) { |
|
108 |
group.add(((Node) id).getText()); |
|
109 |
} |
|
110 |
// compute the full mesh |
|
111 |
final Type type = Type.valueOf(entitytype); |
|
112 |
|
|
113 |
final List<Similarity> mesh = SimilarityMeshBuilder.build(type, group); |
|
114 |
// total += mesh.size(); |
|
115 |
if (log.isDebugEnabled()) { |
|
116 |
log.debug(String.format("built mesh for group '%s', size %d", groupid, mesh.size())); |
|
117 |
} |
|
118 |
for (final Similarity s : mesh) { |
|
119 |
if (log.isDebugEnabled()) { |
|
120 |
log.debug(String.format("adding to queue: %s", s.toString())); |
|
121 |
} |
|
122 |
q.add(s); |
|
123 |
} |
|
124 |
} catch (final DocumentException e) { |
|
125 |
log.error("invalid document: " + xml); |
|
126 |
} |
|
127 |
} |
|
128 |
|
|
129 |
private String buildSimilarity(final Similarity s) { |
|
130 |
final StringTemplate template = new StringTemplate(getSimilarity().getTemplate()); |
|
131 |
|
|
132 |
template.setAttribute("source", s.getPair().getKey()); |
|
133 |
template.setAttribute("target", s.getPair().getValue()); |
|
134 |
template.setAttribute("type", s.getType().toString()); |
|
135 |
|
|
136 |
final String res = template.toString(); |
|
137 |
return res; |
|
138 |
} |
|
139 |
|
|
140 |
public String getInputEprParam() { |
|
141 |
return inputEprParam; |
|
142 |
} |
|
143 |
|
|
144 |
public void setInputEprParam(final String inputEprParam) { |
|
145 |
this.inputEprParam = inputEprParam; |
|
146 |
} |
|
147 |
|
|
148 |
public String getOutputEprParam() { |
|
149 |
return outputEprParam; |
|
150 |
} |
|
151 |
|
|
152 |
public void setOutputEprParam(final String outputEprParam) { |
|
153 |
this.outputEprParam = outputEprParam; |
|
154 |
} |
|
155 |
|
|
156 |
public StringTemplate getSimilarity() { |
|
157 |
return similarity; |
|
158 |
} |
|
159 |
|
|
160 |
@Required |
|
161 |
public void setSimilarity(final StringTemplate similarity) { |
|
162 |
this.similarity = similarity; |
|
163 |
} |
|
164 |
|
|
165 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/conf/DedupConfigurationOrchestrationLoader.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup.conf; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Queue; |
|
6 |
|
|
7 |
import javax.annotation.Resource; |
|
8 |
|
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.dom4j.Document; |
|
12 |
import org.dom4j.DocumentException; |
|
13 |
import org.dom4j.Element; |
|
14 |
import org.dom4j.io.SAXReader; |
|
15 |
|
|
16 |
import com.google.common.collect.Lists; |
|
17 |
|
|
18 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException; |
|
19 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
20 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
21 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
22 |
import eu.dnetlib.pace.config.DedupConfig; |
|
23 |
|
|
24 |
/** |
|
25 |
* The Class DedupConfigurationOrchestrationLoader. |
|
26 |
*/ |
|
27 |
public class DedupConfigurationOrchestrationLoader { |
|
28 |
|
|
29 |
/** The Constant log. */ |
|
30 |
private static final Log log = LogFactory.getLog(DedupConfigurationOrchestrationLoader.class); |
|
31 |
|
|
32 |
/** The service locator. */ |
|
33 |
@Resource |
|
34 |
private UniqueServiceLocator serviceLocator; |
|
35 |
|
|
36 |
/** |
|
37 |
* Load the dedup orchestration profile from the IS. |
|
38 |
* |
|
39 |
* @param id |
|
40 |
* the id |
|
41 |
* @return the dedup configuration orchestration |
|
42 |
* @throws ISLookUpDocumentNotFoundException |
|
43 |
* the IS look up document not found exception |
|
44 |
* @throws ISLookUpException |
|
45 |
* the IS look up exception |
|
46 |
* @throws DocumentException |
|
47 |
* the document exception |
|
48 |
*/ |
|
49 |
public DedupConfigurationOrchestration loadByActionSetId(final String id) throws Exception { |
|
50 |
|
|
51 |
final ISLookUpService isLookUpService = serviceLocator.getService(ISLookUpService.class); |
|
52 |
|
|
53 |
final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", id); |
|
54 |
log.info("loading dedup orchestration: " + xquery); |
|
55 |
|
|
56 |
return parseOrchestrationProfile(isLookUpService, isLookUpService.getResourceProfileByQuery(xquery)); |
|
57 |
} |
|
58 |
|
|
59 |
public List<DedupConfigurationOrchestration> loadByEntityName(final String entityName) throws Exception { |
|
60 |
|
|
61 |
final ISLookUpService isLookUpService = serviceLocator.getService(ISLookUpService.class); |
|
62 |
|
|
63 |
final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ENTITY/@name = '%s']", entityName); |
|
64 |
log.info("loading dedup orchestration: " + xquery); |
|
65 |
|
|
66 |
final List<DedupConfigurationOrchestration> res = Lists.newArrayList(); |
|
67 |
|
|
68 |
for (final String profile : isLookUpService.quickSearchProfile(xquery)) { |
|
69 |
res.add(parseOrchestrationProfile(isLookUpService, profile)); |
|
70 |
} |
|
71 |
|
|
72 |
return res; |
|
73 |
} |
|
74 |
|
|
75 |
private DedupConfigurationOrchestration parseOrchestrationProfile(final ISLookUpService isLookUpService, final String dedupOrchestation) |
|
76 |
throws DocumentException, |
|
77 |
ISLookUpException, ISLookUpDocumentNotFoundException { |
|
78 |
final Document doc = new SAXReader().read(new StringReader(dedupOrchestation)); |
|
79 |
|
|
80 |
final Element e = (Element) doc.selectSingleNode("//DEDUPLICATION/ENTITY"); |
|
81 |
final Entity entity = new Entity(e.attributeValue("name"), e.attributeValue("code"), e.attributeValue("label")); |
|
82 |
|
|
83 |
final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id"); |
|
84 |
final Queue<DedupConfig> configurations = Lists.newLinkedList(); |
|
85 |
|
|
86 |
for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) { |
|
87 |
configurations.add(loadConfig(isLookUpService, actionSetId, o)); |
|
88 |
} |
|
89 |
|
|
90 |
final DedupConfigurationOrchestration dco = new DedupConfigurationOrchestration(entity, actionSetId, configurations); |
|
91 |
|
|
92 |
log.debug("loaded dedup configuration orchestration: " + dco.toString()); |
|
93 |
log.info("loaded dedup configuration orchestration, size: " + dco.getConfigurations().size()); |
|
94 |
return dco; |
|
95 |
} |
|
96 |
|
|
97 |
private DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) throws ISLookUpException, |
|
98 |
ISLookUpDocumentNotFoundException { |
|
99 |
final Element s = (Element) o; |
|
100 |
final String configProfileId = s.attributeValue("id"); |
|
101 |
final String conf = |
|
102 |
isLookUpService.getResourceProfileByQuery(String.format( |
|
103 |
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", |
|
104 |
configProfileId)); |
|
105 |
|
|
106 |
final DedupConfig dedupConfig = DedupConfig.load(conf); |
|
107 |
dedupConfig.getWf().setConfigurationId(actionSetId); |
|
108 |
return dedupConfig; |
|
109 |
} |
|
110 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/conf/Entity.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup.conf; |
|
2 |
|
|
3 |
import com.google.gson.GsonBuilder; |
|
4 |
|
|
5 |
/** |
|
6 |
* The Class Entity. |
|
7 |
*/ |
|
8 |
public class Entity { |
|
9 |
|
|
10 |
/** The name. */ |
|
11 |
private String name; |
|
12 |
|
|
13 |
/** The code. */ |
|
14 |
private String code; |
|
15 |
|
|
16 |
/** The label. */ |
|
17 |
private String label; |
|
18 |
|
|
19 |
public Entity() {} |
|
20 |
|
|
21 |
/** |
|
22 |
* Instantiates a new entity. |
|
23 |
* |
|
24 |
* @param name |
|
25 |
* the name |
|
26 |
* @param code |
|
27 |
* the code |
|
28 |
* @param label |
|
29 |
* the label |
|
30 |
*/ |
|
31 |
public Entity(final String name, final String code, final String label) { |
|
32 |
super(); |
|
33 |
this.setName(name); |
|
34 |
this.setCode(code); |
|
35 |
this.setLabel(label); |
|
36 |
} |
|
37 |
|
|
38 |
/** |
|
39 |
* Gets the name. |
|
40 |
* |
|
41 |
* @return the name |
|
42 |
*/ |
|
43 |
public String getName() { |
|
44 |
return name; |
|
45 |
} |
|
46 |
|
|
47 |
/** |
|
48 |
* Gets the code. |
|
49 |
* |
|
50 |
* @return the code |
|
51 |
*/ |
|
52 |
public String getCode() { |
|
53 |
return code; |
|
54 |
} |
|
55 |
|
|
56 |
/** |
|
57 |
* Gets the label. |
|
58 |
* |
|
59 |
* @return the label |
|
60 |
*/ |
|
61 |
public String getLabel() { |
|
62 |
return label; |
|
63 |
} |
|
64 |
|
|
65 |
public void setName(final String name) { |
|
66 |
this.name = name; |
|
67 |
} |
|
68 |
|
|
69 |
public void setCode(final String code) { |
|
70 |
this.code = code; |
|
71 |
} |
|
72 |
|
|
73 |
public void setLabel(final String label) { |
|
74 |
this.label = label; |
|
75 |
} |
|
76 |
|
|
77 |
/* |
|
78 |
* (non-Javadoc) |
|
79 |
* |
|
80 |
* @see java.lang.Object#toString() |
|
81 |
*/ |
|
82 |
@Override |
|
83 |
public String toString() { |
|
84 |
return new GsonBuilder().setPrettyPrinting().create().toJson(this); |
|
85 |
} |
|
86 |
|
|
87 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/conf/DedupConfigurationOrchestration.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup.conf; |
|
2 |
|
|
3 |
import java.util.Queue; |
|
4 |
|
|
5 |
import com.google.gson.Gson; |
|
6 |
import com.google.gson.GsonBuilder; |
|
7 |
|
|
8 |
import eu.dnetlib.pace.config.DedupConfig; |
|
9 |
|
|
10 |
/** |
|
11 |
* The Class DedupConfigurationOrchestration. |
|
12 |
*/ |
|
13 |
public class DedupConfigurationOrchestration { |
|
14 |
|
|
15 |
/** The entity. */ |
|
16 |
private Entity entity; |
|
17 |
|
|
18 |
/** The action set id. */ |
|
19 |
private String actionSetId; |
|
20 |
|
|
21 |
/** The configurations. */ |
|
22 |
private Queue<DedupConfig> configurations; |
|
23 |
|
|
24 |
public DedupConfigurationOrchestration() {} |
|
25 |
|
|
26 |
/** |
|
27 |
* Instantiates a new dedup configuration orchestration. |
|
28 |
* |
|
29 |
* @param entity |
|
30 |
* the entity |
|
31 |
* @param actionSetId |
|
32 |
* the action set id |
|
33 |
* @param configurations |
|
34 |
* the configurations |
|
35 |
*/ |
|
36 |
public DedupConfigurationOrchestration(final Entity entity, final String actionSetId, final Queue<DedupConfig> configurations) { |
|
37 |
super(); |
|
38 |
this.setEntity(entity); |
|
39 |
this.setActionSetId(actionSetId); |
|
40 |
this.setConfigurations(configurations); |
|
41 |
} |
|
42 |
|
|
43 |
/** |
|
44 |
* Gets the entity. |
|
45 |
* |
|
46 |
* @return the entity |
|
47 |
*/ |
|
48 |
public Entity getEntity() { |
|
49 |
return entity; |
|
50 |
} |
|
51 |
|
|
52 |
/** |
|
53 |
* Gets the action set id. |
|
54 |
* |
|
55 |
* @return the action set id |
|
56 |
*/ |
|
57 |
public String getActionSetId() { |
|
58 |
return actionSetId; |
|
59 |
} |
|
60 |
|
|
61 |
/** |
|
62 |
* Gets the configurations. |
|
63 |
* |
|
64 |
* @return the configurations |
|
65 |
*/ |
|
66 |
public Queue<DedupConfig> getConfigurations() { |
|
67 |
return configurations; |
|
68 |
} |
|
69 |
|
|
70 |
public void setEntity(final Entity entity) { |
|
71 |
this.entity = entity; |
|
72 |
} |
|
73 |
|
|
74 |
public void setActionSetId(final String actionSetId) { |
|
75 |
this.actionSetId = actionSetId; |
|
76 |
} |
|
77 |
|
|
78 |
public void setConfigurations(final Queue<DedupConfig> configurations) { |
|
79 |
this.configurations = configurations; |
|
80 |
} |
|
81 |
|
|
82 |
/** |
|
83 |
* From json. |
|
84 |
* |
|
85 |
* @param json |
|
86 |
* the json |
|
87 |
* @return the dedup configuration orchestration |
|
88 |
*/ |
|
89 |
public static DedupConfigurationOrchestration fromJSON(final String json) { |
|
90 |
return new Gson().fromJson(json, DedupConfigurationOrchestration.class); |
|
91 |
} |
|
92 |
|
|
93 |
/* |
|
94 |
* (non-Javadoc) |
|
95 |
* |
|
96 |
* @see java.lang.Object#toString() |
|
97 |
*/ |
|
98 |
@Override |
|
99 |
public String toString() { |
|
100 |
return new GsonBuilder().setPrettyPrinting().create().toJson(this); |
|
101 |
} |
|
102 |
|
|
103 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupCheckEntitySequenceJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.Queue; |
|
4 |
|
|
5 |
import org.apache.commons.lang.StringUtils; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.springframework.beans.factory.annotation.Autowired; |
|
9 |
|
|
10 |
import com.google.common.base.Function; |
|
11 |
import com.google.common.base.Splitter; |
|
12 |
import com.google.common.collect.Iterables; |
|
13 |
import com.google.common.collect.Lists; |
|
14 |
import com.googlecode.sarasvati.Arc; |
|
15 |
import com.googlecode.sarasvati.NodeToken; |
|
16 |
|
|
17 |
import eu.dnetlib.msro.rmi.MSROException; |
|
18 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
19 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader; |
|
20 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
21 |
|
|
22 |
public class DedupCheckEntitySequenceJobNode extends AsyncJobNode { |
|
23 |
|
|
24 |
private static final Log log = LogFactory.getLog(DedupCheckEntitySequenceJobNode.class); |
|
25 |
|
|
26 |
@Autowired |
|
27 |
private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader; |
|
28 |
|
|
29 |
private String dedupConfigSequenceParam; |
|
30 |
|
|
31 |
private String entitySequence; |
|
32 |
|
|
33 |
@Override |
|
34 |
protected String execute(final NodeToken token) throws Exception { |
|
35 |
|
|
36 |
if (StringUtils.isBlank(getEntitySequence())) throw new MSROException("missing entity sequence, e.g. a csv: organization,person,result"); |
|
37 |
|
|
38 |
if (token.getFullEnv().hasAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER)) { |
|
39 |
log.info("reset env variable: " + DedupGrouperJobNode.DEDUP_GROUPER_LOOPER + " to zero"); |
|
40 |
token.getFullEnv().setAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER, 0); |
|
41 |
} |
|
42 |
|
|
43 |
if (!token.getEnv().hasAttribute("entitySequence")) { |
|
44 |
|
|
45 |
log.info("parsing config sequence: " + getEntitySequence()); |
|
46 |
|
|
47 |
token.getEnv().setAttribute("entitySequence", getEntitySequence()); |
|
48 |
|
|
49 |
final Iterable<String> sequence = Splitter.on(",").omitEmptyStrings().split(getEntitySequence()); |
|
50 |
final Queue<DedupConfigurationOrchestration> q = |
|
51 |
Lists.newLinkedList(Iterables.transform(sequence, new Function<String, DedupConfigurationOrchestration>() { |
|
52 |
|
|
53 |
@Override |
|
54 |
public DedupConfigurationOrchestration apply(final String entityName) { |
|
55 |
try { |
|
56 |
final DedupConfigurationOrchestration dco = Iterables.getFirst(dedupOrchestrationLoader.loadByEntityName(entityName), null); |
|
57 |
if (dco == null) throw new RuntimeException("unable to find DedupOrchestration profile for entity type: " + entityName); |
|
58 |
return dco; |
|
59 |
} catch (final Throwable e) { |
|
60 |
throw new RuntimeException("", e); |
|
61 |
} |
|
62 |
} |
|
63 |
})); |
|
64 |
|
|
65 |
log.info("built sequence of dedup orchestration profiles, size: " + q.size()); |
|
66 |
final DedupConfigurationOrchestration dco = q.remove(); |
|
67 |
log.info("closing mesh for entity: " + dco.getEntity().getName()); |
|
68 |
setDedupConfParams(token, dco); |
|
69 |
token.getEnv().setTransientAttribute("entitySequenceQueue", q); |
|
70 |
|
|
71 |
return Arc.DEFAULT_ARC; |
|
72 |
} |
|
73 |
|
|
74 |
@SuppressWarnings("unchecked") |
|
75 |
final Queue<DedupConfigurationOrchestration> q = (Queue<DedupConfigurationOrchestration>) token.getEnv().getTransientAttribute("entitySequenceQueue"); |
|
76 |
|
|
77 |
if (!q.isEmpty()) { |
|
78 |
log.info("remaining dedup orchestration profiles: " + q.size()); |
|
79 |
final DedupConfigurationOrchestration dco = q.remove(); |
|
80 |
log.info("closing mesh for entity: " + dco.getEntity().getName()); |
|
81 |
|
|
82 |
setDedupConfParams(token, dco); |
|
83 |
return Arc.DEFAULT_ARC; |
|
84 |
} |
|
85 |
|
|
86 |
log.info("completed closing mesh for entities: " + getEntitySequence()); |
|
87 |
return "done"; |
|
88 |
|
|
89 |
} |
|
90 |
|
|
91 |
private void setDedupConfParams(final NodeToken token, final DedupConfigurationOrchestration dco) { |
|
92 |
token.getEnv().setAttribute("entityType", dco.getEntity().getName()); |
|
93 |
token.getEnv().setAttribute("entityTypeId", dco.getEntity().getCode()); |
|
94 |
token.getEnv().setAttribute(getDedupConfigSequenceParam(), dco.toString()); |
|
95 |
} |
|
96 |
|
|
97 |
public String getEntitySequence() { |
|
98 |
return entitySequence; |
|
99 |
} |
|
100 |
|
|
101 |
public void setEntitySequence(final String entitySequence) { |
|
102 |
this.entitySequence = entitySequence; |
|
103 |
} |
|
104 |
|
|
105 |
public String getDedupConfigSequenceParam() { |
|
106 |
return dedupConfigSequenceParam; |
|
107 |
} |
|
108 |
|
|
109 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
110 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
111 |
} |
|
112 |
|
|
113 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupGrouperJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import org.apache.commons.logging.Log; |
|
4 |
import org.apache.commons.logging.LogFactory; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Arc; |
|
7 |
import com.googlecode.sarasvati.Engine; |
|
8 |
import com.googlecode.sarasvati.NodeToken; |
|
9 |
|
|
10 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
11 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
|
12 |
|
|
13 |
public class DedupGrouperJobNode extends DedupConfigurationAwareJobNode { |
|
14 |
|
|
15 |
// TODO factor out this constant, it should be a configuration parameter |
|
16 |
public static final int DEDUP_GROUPER_MAX_LOOPS = 10; |
|
17 |
|
|
18 |
public static final String DEDUP_GROUPER_LOOPER = "dedup.grouper.looper"; |
|
19 |
public static final String DEDUP_GROUPER_CURR_WRITTEN_RELS = "dedup.grouper.written.rels"; |
|
20 |
public static final String DEDUP_GROUPER_PREV_WRITTEN_RELS = "dedup.grouper.prev.written.rels"; |
|
21 |
|
|
22 |
private static final Log log = LogFactory.getLog(DedupGrouperJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener { |
|
25 |
|
|
26 |
public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) { |
|
27 |
super(engine, token); |
|
28 |
} |
|
29 |
|
|
30 |
@Override |
|
31 |
protected void onDone(final BlackboardJob job) { |
|
32 |
|
|
33 |
final int times = currentIteration(getToken()); |
|
34 |
final String curr = job.getParameters().get(DEDUP_GROUPER_CURR_WRITTEN_RELS); |
|
35 |
|
|
36 |
if (times == 0) { |
|
37 |
getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, -1); |
|
38 |
} |
|
39 |
|
|
40 |
if ((times >= DEDUP_GROUPER_MAX_LOOPS) || isStable(getToken(), curr)) { |
|
41 |
super.complete(job, "done"); |
|
42 |
} else { |
|
43 |
log.info("incrementing dedup.grouper.looper to " + (times + 1)); |
|
44 |
getToken().getFullEnv().setAttribute(DEDUP_GROUPER_LOOPER, times + 1); |
|
45 |
getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, curr); |
|
46 |
super.complete(job, Arc.DEFAULT_ARC); |
|
47 |
} |
|
48 |
} |
|
49 |
} |
|
50 |
|
|
51 |
private int currentIteration(final NodeToken token) { |
|
52 |
try { |
|
53 |
final String sTimes = token.getFullEnv().getAttribute(DEDUP_GROUPER_LOOPER); |
|
54 |
log.info("read dedup.grouper.looper from fullEnv: '" + sTimes + "'"); |
|
55 |
return Integer.parseInt(sTimes); |
|
56 |
} catch (final NumberFormatException e) { |
|
57 |
log.info("got empty dedup.grouper.looper, initializing to 0"); |
|
58 |
return 0; |
|
59 |
} |
|
60 |
} |
|
61 |
|
|
62 |
private boolean isStable(final NodeToken token, final String sCurr) { |
|
63 |
final String sPrev = token.getFullEnv().getAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS); |
|
64 |
|
|
65 |
log.info("Comparing written rels, prev=" + sPrev + ", curr=" + sCurr); |
|
66 |
try { |
|
67 |
final boolean b = Integer.parseInt(sCurr) == Integer.parseInt(sPrev); |
|
68 |
if (b) { |
|
69 |
log.info(" --- The number of written rels is STABLE"); |
|
70 |
} |
|
71 |
return b; |
|
72 |
} catch (final Exception e) { |
|
73 |
log.error("Invalid parsing of written rels counters - curr: " + sCurr + ", prev: " + sPrev); |
|
74 |
return false; |
|
75 |
} |
|
76 |
} |
|
77 |
|
|
78 |
@Override |
|
79 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) { |
|
80 |
return new DedupBlackboardWorkflowJobListener(engine, token); |
|
81 |
} |
|
82 |
|
|
83 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupCheckConfigurationJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.springframework.beans.factory.annotation.Autowired; |
|
8 |
|
|
9 |
import com.googlecode.sarasvati.Arc; |
|
10 |
import com.googlecode.sarasvati.NodeToken; |
|
11 |
|
|
12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
13 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
14 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
15 |
import eu.dnetlib.msro.rmi.MSROException; |
|
16 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration; |
|
17 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
|
18 |
|
|
19 |
public class DedupCheckConfigurationJobNode extends AsyncJobNode { |
|
20 |
|
|
21 |
private static final Log log = LogFactory.getLog(DedupCheckConfigurationJobNode.class); |
|
22 |
|
|
23 |
@Autowired |
|
24 |
private UniqueServiceLocator serviceLocator; |
|
25 |
|
|
26 |
private String dedupConfigSequenceParam; |
|
27 |
|
|
28 |
@Override |
|
29 |
protected String execute(final NodeToken token) throws Exception { |
|
30 |
|
|
31 |
final String dcoJson = token.getEnv().getAttribute(getDedupConfigSequenceParam()); |
|
32 |
|
|
33 |
final DedupConfigurationOrchestration dco = DedupConfigurationOrchestration.fromJSON(dcoJson); |
|
34 |
|
|
35 |
if (!existActionSetProfile(dco)) throw new MSROException("missing action set profile: " + dco.getActionSetId()); |
|
36 |
|
|
37 |
log.info("found action set profile: " + dco.getActionSetId()); |
|
38 |
|
|
39 |
return Arc.DEFAULT_ARC; |
|
40 |
} |
|
41 |
|
|
42 |
private boolean existActionSetProfile(final DedupConfigurationOrchestration dco) throws ISLookUpException { |
|
43 |
final String xquery = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'DedupOrchestrationDSResourceType' and .//ACTION_SET/@id='%s'] return 1"; |
|
44 |
log.info("looking for action set profile, id: " + dco.getActionSetId()); |
|
45 |
final List<String> actionSets = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(xquery, dco.getActionSetId())); |
|
46 |
return !actionSets.isEmpty(); |
|
47 |
} |
|
48 |
|
|
49 |
public String getDedupConfigSequenceParam() { |
|
50 |
return dedupConfigSequenceParam; |
|
51 |
} |
|
52 |
|
|
53 |
public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) { |
|
54 |
this.dedupConfigSequenceParam = dedupConfigSequenceParam; |
|
55 |
} |
|
56 |
|
|
57 |
} |
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/PrepareDedupIndexJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.dedup; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import org.apache.commons.lang.StringUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.springframework.beans.factory.annotation.Autowired; |
|
12 |
import org.springframework.beans.factory.annotation.Required; |
|
13 |
|
|
14 |
import com.google.common.collect.Iterables; |
|
15 |
import com.google.gson.Gson; |
|
16 |
import com.googlecode.sarasvati.Arc; |
|
17 |
import com.googlecode.sarasvati.NodeToken; |
|
18 |
|
|
19 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
20 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
21 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
22 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
23 |
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader; |
|
24 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
25 |
|
|
26 |
public class PrepareDedupIndexJobNode extends SimpleJobNode { |
|
27 |
|
|
28 |
/** |
|
29 |
* logger. |
|
30 |
*/ |
|
31 |
private static final Log log = LogFactory.getLog(PrepareDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
32 |
|
|
33 |
@Resource |
|
34 |
private UniqueServiceLocator serviceLocator; |
|
35 |
|
|
36 |
@Autowired |
|
37 |
private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader; |
|
38 |
|
|
39 |
private String rottenRecordsPathParam; |
|
40 |
|
|
41 |
private String hbaseTable; |
|
42 |
|
|
43 |
private String dedupConfig; |
|
44 |
|
|
45 |
@Override |
|
46 |
protected String execute(final NodeToken token) throws Exception { |
|
47 |
|
|
48 |
log.info("start preparing job"); |
|
49 |
|
|
50 |
final String fields = getFields(env("format", token), env("layout", token)); |
|
51 |
token.getEnv().setAttribute("index.fields", fields); |
|
52 |
|
|
53 |
if (!StringUtils.isBlank(getRottenRecordsPathParam())) { |
|
54 |
token.getEnv().setAttribute(getRottenRecordsPathParam(), "/tmp" + getFileName(token, "rottenrecords")); |
|
55 |
} |
|
56 |
|
|
57 |
token.getEnv().setAttribute("index.solr.url", getIndexSolrUrlZk()); |
|
58 |
token.getEnv().setAttribute("index.solr.collection", getCollectionName(token)); |
|
59 |
|
|
60 |
token.getEnv().setAttribute("index.shutdown.wait.time", getIndexSolrShutdownWait()); |
|
61 |
token.getEnv().setAttribute("index.buffer.flush.threshold", getIndexBufferFlushTreshold()); |
|
62 |
token.getEnv().setAttribute("index.solr.sim.mode", isFeedingSimulationMode()); |
|
63 |
|
|
64 |
token.getEnv().setAttribute("index.feed.timestamp", DateUtils.now_ISO8601()); |
|
65 |
|
|
66 |
@SuppressWarnings("unchecked") |
|
67 |
final List<Map<String, String>> sets = new Gson().fromJson(token.getEnv().getAttribute("sets"), List.class); |
|
68 |
|
|
69 |
token.getEnv().setAttribute("actionset", Iterables.getOnlyElement(sets).get("set")); |
|
70 |
token.getEnv().setAttribute("entityType", token.getEnv().getAttribute("entityType")); |
|
71 |
token.getEnv().setAttribute("entityTypeId", token.getEnv().getAttribute("entityTypeId")); |
|
72 |
|
|
73 |
return Arc.DEFAULT_ARC; |
|
74 |
} |
|
75 |
|
|
76 |
private String getFields(final String format, final String layout) throws ISLookUpException { |
|
77 |
return isLookup(String |
|
78 |
.format("<FIELDS>{for $x in collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']/FIELDS/FIELD return $x[string(@path)]}</FIELDS>", |
|
79 |
format, layout)); |
|
80 |
} |
|
81 |
|
|
82 |
public String getIndexSolrUrlZk() throws ISLookUpException { |
|
83 |
return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()"); |
|
84 |
} |
|
85 |
|
|
86 |
public String getIndexSolrShutdownWait() throws ISLookUpException { |
|
87 |
return queryForServiceProperty("solr:feedingShutdownTolerance"); |
|
88 |
} |
|
89 |
|
|
90 |
public String getIndexBufferFlushTreshold() throws ISLookUpException { |
|
91 |
return queryForServiceProperty("solr:feedingBufferFlushThreshold"); |
|
92 |
} |
|
93 |
|
|
94 |
public String isFeedingSimulationMode() throws ISLookUpException { |
|
95 |
return queryForServiceProperty("solr:feedingSimulationMode"); |
|
96 |
} |
|
97 |
|
|
98 |
private String queryForServiceProperty(final String key) throws ISLookUpException { |
|
99 |
return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='" |
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-deduplication-1.1.8