Project

General

Profile

« Previous | Next » 

Revision 37515

[maven-release-plugin] copy for tag dnet-deduplication-1.1.8

View differences:

modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/deploy.info
1
{
2
	"type_source": "SVN", 
3
	"goal": "package -U source:jar", 
4
	"url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-deduplication/trunk", 
5
	"deploy_repository": "dnet4-snapshots", 
6
	"version": "4", 
7
	"mail": "alessia.bardi@isti.cnr.it", 
8
	"deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", 
9
	"name": "dnet-deduplication"
10
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/test/java/eu/dnetlib/msro/workflows/dedup/conf/DedupConfigurationOrchestrationTest.java
1
package eu.dnetlib.msro.workflows.dedup.conf;
2

  
3
import static org.junit.Assert.assertNotNull;
4
import static org.junit.Assert.assertTrue;
5

  
6
import java.io.IOException;
7
import java.util.Queue;
8

  
9
import org.junit.Before;
10
import org.junit.Test;
11

  
12
import com.google.common.collect.Lists;
13

  
14
import eu.dnetlib.pace.config.DedupConfig;
15

  
16
public class DedupConfigurationOrchestrationTest {
17

  
18
	public DedupConfigurationOrchestration dco;
19

  
20
	@Before
21
	public void setUp() throws IOException {
22

  
23
		final Entity e = new Entity("result", "50", "Publication");
24

  
25
		final String actionSetId = "001";
26
		final Queue<DedupConfig> configurations = Lists.newLinkedList();
27

  
28
		configurations.add(DedupConfig.loadDefault());
29

  
30
		dco = new DedupConfigurationOrchestration(e, actionSetId, configurations);
31
		assertNotNull(dco);
32
		assertNotNull(dco.getActionSetId());
33
		assertNotNull(dco.getEntity());
34
		assertNotNull(dco.getConfigurations());
35
	}
36

  
37
	@Test
38
	public void testSerialization() {
39

  
40
		final String json = dco.toString();
41
		final DedupConfigurationOrchestration anotherDco = DedupConfigurationOrchestration.fromJSON(json);
42
		assertNotNull(anotherDco);
43
		assertTrue(json.equals(anotherDco.toString()));
44
	}
45
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/test/java/eu/dnetlib/msro/workflows/dedup/SimilarityMeshBuilderTest.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import java.util.List;
4

  
5
import org.junit.Before;
6
import org.junit.Test;
7

  
8
import com.google.common.collect.Lists;
9

  
10
import eu.dnetlib.data.proto.TypeProtos.Type;
11
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity;
12
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder;
13

  
14
public class SimilarityMeshBuilderTest {
15

  
16
	private List<String> list;
17

  
18
	@Before
19
	public void setUp() throws Exception {
20
		list = Lists.newArrayList();
21
		for (int i = 0; i < 10; i++) {
22
			list.add(i + "");
23
		}
24
	}
25

  
26
	@Test
27
	public void test() {
28
		final List<Similarity> combinations = SimilarityMeshBuilder.build(Type.result, list);
29

  
30
		System.out.println(combinations);
31
		System.out.println(combinations.size());
32

  
33
	}
34

  
35
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/PrepareConfiguredActionSetJobNode.java
1
package eu.dnetlib.msro.workflows.actions;
2

  
3
import java.util.List;
4
import java.util.Map;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import com.google.gson.Gson;
12
import com.googlecode.sarasvati.Arc;
13
import com.googlecode.sarasvati.NodeToken;
14

  
15
import eu.dnetlib.actionmanager.set.RawSet;
16
import eu.dnetlib.miscutils.datetime.DateUtils;
17
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
18
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
19

  
20
/**
21
 * The Class PrepareConfiguredActionSetJobNode.
22
 */
23
public class PrepareConfiguredActionSetJobNode extends SimpleJobNode {
24

  
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(PrepareConfiguredActionSetJobNode.class);
29

  
30
	/** The dedup config sequence param. */
31
	private String dedupConfigSequenceParam;
32

  
33
	/** The job property. */
34
	private String jobProperty;
35

  
36
	/*
37
	 * (non-Javadoc)
38
	 *
39
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
40
	 */
41
	@Override
42
	protected String execute(final NodeToken token) throws Exception {
43

  
44
		final List<Map<String, String>> setList = Lists.newArrayList();
45

  
46
		final Map<String, String> set = Maps.newHashMap();
47

  
48
		set.put("rawset", RawSet.newInstance().getId());
49
		set.put("creationDate", DateUtils.now_ISO8601());
50
		set.put("set", getActionSetId(token));
51
		set.put("enabled", "true");
52
		set.put("jobProperty", getJobProperty());
53

  
54
		token.getEnv().setAttribute(set.get("jobProperty"), set.get("rawset"));
55

  
56
		setList.add(set);
57
		final String sets = new Gson().toJson(setList);
58
		log.debug("built set: " + sets);
59

  
60
		token.getEnv().setAttribute("sets", sets);
61

  
62
		return Arc.DEFAULT_ARC;
63
	}
64

  
65
	/**
66
	 * Gets the action set id.
67
	 *
68
	 * @param token
69
	 *            the token
70
	 * @return the action set id
71
	 */
72
	private String getActionSetId(final NodeToken token) {
73
		final String json = token.getEnv().getAttribute(getDedupConfigSequenceParam());
74
		final DedupConfigurationOrchestration dco = DedupConfigurationOrchestration.fromJSON(json);
75
		final String actionSetId = dco.getActionSetId();
76
		log.info("found actionSetId in workflow env: " + actionSetId);
77
		return actionSetId;
78
	}
79

  
80
	/**
81
	 * Gets the dedup config sequence param.
82
	 *
83
	 * @return the dedup config sequence param
84
	 */
85
	public String getDedupConfigSequenceParam() {
86
		return dedupConfigSequenceParam;
87
	}
88

  
89
	/**
90
	 * Sets the dedup config sequence param.
91
	 *
92
	 * @param dedupConfigSequenceParam
93
	 *            the new dedup config sequence param
94
	 */
95
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
96
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
97
	}
98

  
99
	/**
100
	 * Gets the job property.
101
	 *
102
	 * @return the job property
103
	 */
104
	public String getJobProperty() {
105
		return jobProperty;
106
	}
107

  
108
	/**
109
	 * Sets the job property.
110
	 *
111
	 * @param jobProperty
112
	 *            the new job property
113
	 */
114
	public void setJobProperty(final String jobProperty) {
115
		this.jobProperty = jobProperty;
116
	}
117

  
118
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/CleanActionSetsProfileJobNode.java
1
package eu.dnetlib.msro.workflows.actions;
2

  
3
import javax.annotation.Resource;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Required;
8

  
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.NodeToken;
11

  
12
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
13
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
14
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
15

  
16
public class CleanActionSetsProfileJobNode extends SimpleJobNode {
17

  
18
	/**
19
	 * logger.
20
	 */
21
	private static final Log log = LogFactory.getLog(CleanActionSetsProfileJobNode.class);
22

  
23
	@Resource
24
	private UniqueServiceLocator serviceLocator;
25

  
26
	private String xupdate;
27

  
28
	@Override
29
	protected String execute(final NodeToken token) throws Exception {
30
		log.info("updating Action Sets profiles: " + getXupdate());
31
		serviceLocator.getService(ISRegistryService.class).executeXUpdate(getXupdate());
32
		return Arc.DEFAULT_ARC;
33
	}
34

  
35
	public String getXupdate() {
36
		return xupdate;
37
	}
38

  
39
	@Required
40
	public void setXupdate(final String xupdate) {
41
		this.xupdate = xupdate;
42
	}
43

  
44
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/UpdateSetsJobNode.java
1
package eu.dnetlib.msro.workflows.actions;
2

  
3
import java.io.StringReader;
4
import java.util.List;
5
import java.util.Map;
6

  
7
import javax.annotation.Resource;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.dom4j.Attribute;
12
import org.dom4j.Document;
13
import org.dom4j.Element;
14
import org.dom4j.io.SAXReader;
15

  
16
import com.google.gson.Gson;
17
import com.googlecode.sarasvati.Arc;
18
import com.googlecode.sarasvati.NodeToken;
19

  
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
21
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
22
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
23
import eu.dnetlib.miscutils.datetime.DateUtils;
24
import eu.dnetlib.msro.rmi.MSROException;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26

  
27
public class UpdateSetsJobNode extends SimpleJobNode {
28

  
29
	/**
30
	 * logger.
31
	 */
32
	private static final Log log = LogFactory.getLog(UpdateSetsJobNode.class);
33

  
34
	@Resource
35
	private UniqueServiceLocator serviceLocator;
36

  
37
	@Override
38
	protected String execute(final NodeToken token) throws Exception {
39

  
40
		@SuppressWarnings("unchecked")
41
		final List<Map<String, String>> sets = new Gson().fromJson(token.getEnv().getAttribute("sets"), List.class);
42

  
43
		final String lastUpdate = DateUtils.now_ISO8601();
44
		for (Map<String, String> set : sets) {
45

  
46
			// update only the enabled sets.
47
			if (isEnabled(set)) {
48
				log.info("updating set: " + set.toString());
49
				addLatestRawSet(set, lastUpdate);
50
			} else {
51
				log.info("skip set update: " + set.toString());
52
			}
53
		}
54

  
55
		return Arc.DEFAULT_ARC;
56
	}
57

  
58
	private boolean isEnabled(final Map<String, String> set) {
59
		return set.containsKey("enabled") && set.get("enabled").equals("true");
60
	}
61

  
62
	public void addLatestRawSet(final Map<String, String> set, final String lastUpdate) throws MSROException {
63
		final String q = "for $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') where $x//SET/@id = '" + set.get("set")
64
				+ "' return $x";
65
		try {
66
			final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(q);
67
			final Document doc = new SAXReader().read(new StringReader(profile));
68
			final String profId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
69
			final Element latest = (Element) doc.selectSingleNode("//RAW_SETS/LATEST");
70
			final Element expired = ((Element) doc.selectSingleNode("//RAW_SETS")).addElement("EXPIRED");
71

  
72
			for (Object o : latest.attributes()) {
73
				final Attribute a = (Attribute) o;
74
				expired.addAttribute(a.getName(), a.getValue());
75
			}
76

  
77
			latest.addAttribute("id", set.get("rawset"));
78
			latest.addAttribute("creationDate", set.get("creationDate"));
79
			latest.addAttribute("lastUpdate", lastUpdate);
80

  
81
			serviceLocator.getService(ISRegistryService.class).updateProfile(profId, doc.asXML(), "ActionManagerSetDSResourceType");
82
		} catch (Exception e) {
83
			String msg = "Error updating profile of set: " + set;
84
			log.error(msg);
85
			throw new MSROException(msg, e);
86
		}
87
	}
88

  
89
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/PromoteActionsJobNode.java
1
package eu.dnetlib.msro.workflows.actions;
2

  
3
import org.apache.commons.lang.StringUtils;
4

  
5
import com.googlecode.sarasvati.NodeToken;
6

  
7
import eu.dnetlib.actionmanager.rmi.ActionManagerService;
8
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
9
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
10

  
11
public class PromoteActionsJobNode extends BlackboardJobNode {
12

  
13
	public static final String ALL_SETS = "ALL SETS";
14

  
15
	private String set;
16

  
17
	@Override
18
	protected String obtainServiceId(final NodeToken token) {
19
		return getServiceLocator().getServiceId(ActionManagerService.class);
20
	}
21

  
22
	@Override
23
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
24
		job.setAction("PROMOTE");
25
		if (!StringUtils.isBlank(getSet()) && !getSet().equals(ALL_SETS)) {
26
			job.getParameters().put("set", getSet());
27
			token.getEnv().setAttribute("set", getSet());
28
		}
29
	}
30

  
31
	public String getSet() {
32
		return set;
33
	}
34

  
35
	public void setSet(final String set) {
36
		this.set = set;
37
	}
38

  
39
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/GarbageSetsJobNode.java
1
package eu.dnetlib.msro.workflows.actions;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.NodeToken;
7

  
8
import eu.dnetlib.actionmanager.rmi.ActionManagerService;
9
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
10
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
11

  
12
public class GarbageSetsJobNode extends BlackboardJobNode {
13

  
14
	/**
15
	 * logger.
16
	 */
17
	private static final Log log = LogFactory.getLog(GarbageSetsJobNode.class);
18

  
19
	@Override
20
	protected String obtainServiceId(final NodeToken token) {
21
		return getServiceLocator().getServiceId(ActionManagerService.class);
22
	}
23

  
24
	@Override
25
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
26
		log.info("preparing garbage collection for Action sets Job");
27
		job.setAction("GARBAGE");
28
	}
29

  
30
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/actions/PrepareActionSetsJobNode.java
1
package eu.dnetlib.msro.workflows.actions;
2

  
3
import java.util.List;
4
import java.util.Map;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
import com.google.common.base.Predicate;
10
import com.google.common.collect.Maps;
11
import com.google.gson.Gson;
12
import com.googlecode.sarasvati.Arc;
13
import com.googlecode.sarasvati.NodeToken;
14

  
15
import eu.dnetlib.actionmanager.set.RawSet;
16
import eu.dnetlib.miscutils.datetime.DateUtils;
17
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
18

  
19
public class PrepareActionSetsJobNode extends SimpleJobNode {
20

  
21
	/**
22
	 * logger.
23
	 */
24
	private static final Log log = LogFactory.getLog(PrepareActionSetsJobNode.class);
25

  
26
	private String sets;
27

  
28
	@Override
29
	protected String execute(final NodeToken token) throws Exception {
30

  
31
		final List<Map<String, String>> setList = getSetList();
32
		final String now = DateUtils.now_ISO8601();
33

  
34
		for (Map<String, String> set : setList) {
35
			set.put("rawset", RawSet.newInstance().getId());
36
			set.put("creationDate", now);
37

  
38
			if (set.get("enabled").equals("true")) {
39
				log.info("preparing set: " + simplifySetInfo(set));
40
			}
41
			// setting the job properties needed to name the rawsets
42
			token.getEnv().setAttribute(set.get("jobProperty"), set.get("rawset"));
43
		}
44

  
45
		token.getEnv().setAttribute("sets", new Gson().toJson(setList));
46

  
47
		return Arc.DEFAULT_ARC;
48
	}
49

  
50
	private Map<String, String> simplifySetInfo(final Map<String, String> set) {
51
		return Maps.filterKeys(set, new Predicate<String>() {
52

  
53
			@Override
54
			public boolean apply(final String k) {
55

  
56
				return k.equals("set") || k.equals("rawset");
57
			}
58
		});
59
	}
60

  
61
	@SuppressWarnings("unchecked")
62
	protected List<Map<String, String>> getSetList() {
63
		return new Gson().fromJson(getSets(), List.class);
64
	}
65

  
66
	public String getSets() {
67
		return sets;
68
	}
69

  
70
	public void setSets(final String sets) {
71
		this.sets = sets;
72
	}
73

  
74
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupConfigurationSetterJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import org.apache.commons.lang.StringUtils;
4
import org.springframework.beans.factory.annotation.Autowired;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8

  
9
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
10
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader;
11
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
12

  
13
public class DedupConfigurationSetterJobNode extends AsyncJobNode {
14

  
15
	private String dedupConfigSequence;
16

  
17
	private String dedupConfigSequenceParam;
18

  
19
	@Autowired
20
	private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader;
21

  
22
	@Override
23
	protected String execute(final NodeToken token) throws Exception {
24

  
25
		if (StringUtils.isBlank(getDedupConfigSequence())) throw new IllegalArgumentException("missing configuration sequence");
26

  
27
		final DedupConfigurationOrchestration dedupOrchestration = dedupOrchestrationLoader.loadByActionSetId(getDedupConfigSequence());
28

  
29
		token.getEnv().setAttribute("entityType", dedupOrchestration.getEntity().getName());
30
		token.getEnv().setAttribute("entityTypeId", dedupOrchestration.getEntity().getCode());
31

  
32
		token.getEnv().setAttribute(getDedupConfigSequenceParam(), dedupOrchestration.toString());
33

  
34
		return Arc.DEFAULT_ARC;
35
	}
36

  
37
	public String getDedupConfigSequence() {
38
		return dedupConfigSequence;
39
	}
40

  
41
	public void setDedupConfigSequence(final String dedupConfigSequence) {
42
		this.dedupConfigSequence = dedupConfigSequence;
43
	}
44

  
45
	public String getDedupConfigSequenceParam() {
46
		return dedupConfigSequenceParam;
47
	}
48

  
49
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
50
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
51
	}
52

  
53
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupDuplicateScanJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import org.apache.commons.collections.CollectionUtils;
4
import org.apache.commons.logging.Log;
5
import org.apache.commons.logging.LogFactory;
6

  
7
import com.googlecode.sarasvati.Arc;
8
import com.googlecode.sarasvati.Engine;
9
import com.googlecode.sarasvati.NodeToken;
10

  
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
12
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
13
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
14

  
15
public class DedupDuplicateScanJobNode extends DedupConfigurationAwareJobNode {
16

  
17
	private static final Log log = LogFactory.getLog(DedupDuplicateScanJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
18

  
19
	private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener {
20

  
21
		public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) {
22
			super(engine, token);
23
		}
24

  
25
		@Override
26
		protected void onDone(final BlackboardJob job) {
27

  
28
			final DedupConfigurationOrchestration confs = dedupConfigurations(getToken());
29

  
30
			confs.getConfigurations().poll();
31

  
32
			log.info("checking dedup configs queue, size: " + confs.getConfigurations().size());
33

  
34
			if (CollectionUtils.isEmpty(confs.getConfigurations())) {
35
				log.info("dedup similarity scan done");
36
				super.complete(job, "done");
37
			} else {
38
				log.debug("remaining confs: " + confs);
39

  
40
				getToken().getEnv().setAttribute(getDedupConfigSequenceParam(), confs.toString());
41

  
42
				super.complete(job, Arc.DEFAULT_ARC);
43
			}
44
		}
45
	}
46

  
47
	@Override
48
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
49
		return new DedupBlackboardWorkflowJobListener(engine, token);
50
	}
51

  
52
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/BuildSimilarityMeshJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.List;
6
import java.util.Queue;
7

  
8
import javax.annotation.Resource;
9
import javax.xml.ws.wsaddressing.W3CEndpointReference;
10

  
11
import org.antlr.stringtemplate.StringTemplate;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.Document;
15
import org.dom4j.DocumentException;
16
import org.dom4j.Node;
17
import org.dom4j.io.SAXReader;
18
import org.springframework.beans.factory.annotation.Required;
19

  
20
import com.google.common.collect.Lists;
21
import com.google.common.collect.Queues;
22
import com.googlecode.sarasvati.Arc;
23
import com.googlecode.sarasvati.NodeToken;
24

  
25
import eu.dnetlib.data.proto.TypeProtos.Type;
26
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
27
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
28
import eu.dnetlib.msro.workflows.hadoop.utils.Similarity;
29
import eu.dnetlib.msro.workflows.hadoop.utils.SimilarityMeshBuilder;
30
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
31

  
32
public class BuildSimilarityMeshJobNode extends AsyncJobNode {
33

  
34
	private static final Log log = LogFactory.getLog(BuildSimilarityMeshJobNode.class);
35

  
36
	/** The result set factory. */
37
	@Resource(name = "iterableResultSetFactory")
38
	private IterableResultSetFactory resultSetFactory;
39

  
40
	/** The result set client factory. */
41
	@Resource(name = "resultSetClientFactory")
42
	private ResultSetClientFactory resultSetClientFactory;
43

  
44
	private StringTemplate similarity;
45

  
46
	private String inputEprParam;
47

  
48
	private String outputEprParam;
49

  
50
	@Override
51
	protected String execute(final NodeToken token) throws Exception {
52

  
53
		final String inputEpr = token.getEnv().getAttribute(getInputEprParam());
54

  
55
		final Iterator<String> rsClient = resultSetClientFactory.getClient(inputEpr).iterator();
56
		final Queue<Object> queue = Queues.newLinkedBlockingQueue();
57
		final SAXReader reader = new SAXReader();
58

  
59
		if (rsClient.hasNext()) {
60
			populateQueue(queue, reader, rsClient.next());
61
		}
62

  
63
		final W3CEndpointReference eprOut = resultSetFactory.createIterableResultSet(new Iterable<String>() {
64

  
65
			@Override
66
			public Iterator<String> iterator() {
67
				return new Iterator<String>() {
68

  
69
					@Override
70
					public boolean hasNext() {
71
						synchronized (queue) {
72
							return !queue.isEmpty();
73
						}
74
					}
75

  
76
					@Override
77
					public String next() {
78
						synchronized (queue) {
79
							final Object o = queue.poll();
80
							while (queue.isEmpty() && rsClient.hasNext()) {
81
								populateQueue(queue, reader, rsClient.next());
82
							}
83
							return buildSimilarity((Similarity) o);
84
						}
85
					}
86

  
87
					@Override
88
					public void remove() {
89
						throw new UnsupportedOperationException();
90
					}
91
				};
92
			}
93
		});
94

  
95
		token.getEnv().setAttribute(getOutputEprParam(), eprOut.toString());
96

  
97
		return Arc.DEFAULT_ARC;
98
	}
99

  
100
	private void populateQueue(final Queue<Object> q, final SAXReader r, final String xml) {
101
		try {
102
			final Document d = r.read(new StringReader(xml));
103
			final String groupid = d.valueOf("//FIELD[@name='id']");
104
			final List<?> items = d.selectNodes("//FIELD[@name='group']/ITEM");
105
			final String entitytype = d.valueOf("//FIELD[@name='entitytype']");
106
			final List<String> group = Lists.newArrayList();
107
			for (final Object id : items) {
108
				group.add(((Node) id).getText());
109
			}
110
			// compute the full mesh
111
			final Type type = Type.valueOf(entitytype);
112

  
113
			final List<Similarity> mesh = SimilarityMeshBuilder.build(type, group);
114
			// total += mesh.size();
115
			if (log.isDebugEnabled()) {
116
				log.debug(String.format("built mesh for group '%s', size %d", groupid, mesh.size()));
117
			}
118
			for (final Similarity s : mesh) {
119
				if (log.isDebugEnabled()) {
120
					log.debug(String.format("adding to queue: %s", s.toString()));
121
				}
122
				q.add(s);
123
			}
124
		} catch (final DocumentException e) {
125
			log.error("invalid document: " + xml);
126
		}
127
	}
128

  
129
	private String buildSimilarity(final Similarity s) {
130
		final StringTemplate template = new StringTemplate(getSimilarity().getTemplate());
131

  
132
		template.setAttribute("source", s.getPair().getKey());
133
		template.setAttribute("target", s.getPair().getValue());
134
		template.setAttribute("type", s.getType().toString());
135

  
136
		final String res = template.toString();
137
		return res;
138
	}
139

  
140
	public String getInputEprParam() {
141
		return inputEprParam;
142
	}
143

  
144
	public void setInputEprParam(final String inputEprParam) {
145
		this.inputEprParam = inputEprParam;
146
	}
147

  
148
	public String getOutputEprParam() {
149
		return outputEprParam;
150
	}
151

  
152
	public void setOutputEprParam(final String outputEprParam) {
153
		this.outputEprParam = outputEprParam;
154
	}
155

  
156
	public StringTemplate getSimilarity() {
157
		return similarity;
158
	}
159

  
160
	@Required
161
	public void setSimilarity(final StringTemplate similarity) {
162
		this.similarity = similarity;
163
	}
164

  
165
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/conf/DedupConfigurationOrchestrationLoader.java
1
package eu.dnetlib.msro.workflows.dedup.conf;
2

  
3
import java.io.StringReader;
4
import java.util.List;
5
import java.util.Queue;
6

  
7
import javax.annotation.Resource;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.dom4j.Document;
12
import org.dom4j.DocumentException;
13
import org.dom4j.Element;
14
import org.dom4j.io.SAXReader;
15

  
16
import com.google.common.collect.Lists;
17

  
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
19
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
21
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
22
import eu.dnetlib.pace.config.DedupConfig;
23

  
24
/**
25
 * The Class DedupConfigurationOrchestrationLoader.
26
 */
27
public class DedupConfigurationOrchestrationLoader {
28

  
29
	/** The Constant log. */
30
	private static final Log log = LogFactory.getLog(DedupConfigurationOrchestrationLoader.class);
31

  
32
	/** The service locator. */
33
	@Resource
34
	private UniqueServiceLocator serviceLocator;
35

  
36
	/**
37
	 * Load the dedup orchestration profile from the IS.
38
	 *
39
	 * @param id
40
	 *            the id
41
	 * @return the dedup configuration orchestration
42
	 * @throws ISLookUpDocumentNotFoundException
43
	 *             the IS look up document not found exception
44
	 * @throws ISLookUpException
45
	 *             the IS look up exception
46
	 * @throws DocumentException
47
	 *             the document exception
48
	 */
49
	public DedupConfigurationOrchestration loadByActionSetId(final String id) throws Exception {
50

  
51
		final ISLookUpService isLookUpService = serviceLocator.getService(ISLookUpService.class);
52

  
53
		final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", id);
54
		log.info("loading dedup orchestration: " + xquery);
55

  
56
		return parseOrchestrationProfile(isLookUpService, isLookUpService.getResourceProfileByQuery(xquery));
57
	}
58

  
59
	public List<DedupConfigurationOrchestration> loadByEntityName(final String entityName) throws Exception {
60

  
61
		final ISLookUpService isLookUpService = serviceLocator.getService(ISLookUpService.class);
62

  
63
		final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ENTITY/@name = '%s']", entityName);
64
		log.info("loading dedup orchestration: " + xquery);
65

  
66
		final List<DedupConfigurationOrchestration> res = Lists.newArrayList();
67

  
68
		for (final String profile : isLookUpService.quickSearchProfile(xquery)) {
69
			res.add(parseOrchestrationProfile(isLookUpService, profile));
70
		}
71

  
72
		return res;
73
	}
74

  
75
	private DedupConfigurationOrchestration parseOrchestrationProfile(final ISLookUpService isLookUpService, final String dedupOrchestation)
76
			throws DocumentException,
77
			ISLookUpException, ISLookUpDocumentNotFoundException {
78
		final Document doc = new SAXReader().read(new StringReader(dedupOrchestation));
79

  
80
		final Element e = (Element) doc.selectSingleNode("//DEDUPLICATION/ENTITY");
81
		final Entity entity = new Entity(e.attributeValue("name"), e.attributeValue("code"), e.attributeValue("label"));
82

  
83
		final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id");
84
		final Queue<DedupConfig> configurations = Lists.newLinkedList();
85

  
86
		for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) {
87
			configurations.add(loadConfig(isLookUpService, actionSetId, o));
88
		}
89

  
90
		final DedupConfigurationOrchestration dco = new DedupConfigurationOrchestration(entity, actionSetId, configurations);
91

  
92
		log.debug("loaded dedup configuration orchestration: " + dco.toString());
93
		log.info("loaded dedup configuration orchestration, size: " + dco.getConfigurations().size());
94
		return dco;
95
	}
96

  
97
	private DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) throws ISLookUpException,
98
			ISLookUpDocumentNotFoundException {
99
		final Element s = (Element) o;
100
		final String configProfileId = s.attributeValue("id");
101
		final String conf =
102
				isLookUpService.getResourceProfileByQuery(String.format(
103
						"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
104
						configProfileId));
105

  
106
		final DedupConfig dedupConfig = DedupConfig.load(conf);
107
		dedupConfig.getWf().setConfigurationId(actionSetId);
108
		return dedupConfig;
109
	}
110
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/conf/Entity.java
1
package eu.dnetlib.msro.workflows.dedup.conf;
2

  
3
import com.google.gson.GsonBuilder;
4

  
5
/**
6
 * The Class Entity.
7
 */
8
public class Entity {
9

  
10
	/** The name. */
11
	private String name;
12

  
13
	/** The code. */
14
	private String code;
15

  
16
	/** The label. */
17
	private String label;
18

  
19
	public Entity() {}
20

  
21
	/**
22
	 * Instantiates a new entity.
23
	 *
24
	 * @param name
25
	 *            the name
26
	 * @param code
27
	 *            the code
28
	 * @param label
29
	 *            the label
30
	 */
31
	public Entity(final String name, final String code, final String label) {
32
		super();
33
		this.setName(name);
34
		this.setCode(code);
35
		this.setLabel(label);
36
	}
37

  
38
	/**
39
	 * Gets the name.
40
	 *
41
	 * @return the name
42
	 */
43
	public String getName() {
44
		return name;
45
	}
46

  
47
	/**
48
	 * Gets the code.
49
	 *
50
	 * @return the code
51
	 */
52
	public String getCode() {
53
		return code;
54
	}
55

  
56
	/**
57
	 * Gets the label.
58
	 *
59
	 * @return the label
60
	 */
61
	public String getLabel() {
62
		return label;
63
	}
64

  
65
	public void setName(final String name) {
66
		this.name = name;
67
	}
68

  
69
	public void setCode(final String code) {
70
		this.code = code;
71
	}
72

  
73
	public void setLabel(final String label) {
74
		this.label = label;
75
	}
76

  
77
	/*
78
	 * (non-Javadoc)
79
	 *
80
	 * @see java.lang.Object#toString()
81
	 */
82
	@Override
83
	public String toString() {
84
		return new GsonBuilder().setPrettyPrinting().create().toJson(this);
85
	}
86

  
87
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/conf/DedupConfigurationOrchestration.java
1
package eu.dnetlib.msro.workflows.dedup.conf;
2

  
3
import java.util.Queue;
4

  
5
import com.google.gson.Gson;
6
import com.google.gson.GsonBuilder;
7

  
8
import eu.dnetlib.pace.config.DedupConfig;
9

  
10
/**
11
 * The Class DedupConfigurationOrchestration.
12
 */
13
public class DedupConfigurationOrchestration {
14

  
15
	/** The entity. */
16
	private Entity entity;
17

  
18
	/** The action set id. */
19
	private String actionSetId;
20

  
21
	/** The configurations. */
22
	private Queue<DedupConfig> configurations;
23

  
24
	public DedupConfigurationOrchestration() {}
25

  
26
	/**
27
	 * Instantiates a new dedup configuration orchestration.
28
	 *
29
	 * @param entity
30
	 *            the entity
31
	 * @param actionSetId
32
	 *            the action set id
33
	 * @param configurations
34
	 *            the configurations
35
	 */
36
	public DedupConfigurationOrchestration(final Entity entity, final String actionSetId, final Queue<DedupConfig> configurations) {
37
		super();
38
		this.setEntity(entity);
39
		this.setActionSetId(actionSetId);
40
		this.setConfigurations(configurations);
41
	}
42

  
43
	/**
44
	 * Gets the entity.
45
	 *
46
	 * @return the entity
47
	 */
48
	public Entity getEntity() {
49
		return entity;
50
	}
51

  
52
	/**
53
	 * Gets the action set id.
54
	 *
55
	 * @return the action set id
56
	 */
57
	public String getActionSetId() {
58
		return actionSetId;
59
	}
60

  
61
	/**
62
	 * Gets the configurations.
63
	 *
64
	 * @return the configurations
65
	 */
66
	public Queue<DedupConfig> getConfigurations() {
67
		return configurations;
68
	}
69

  
70
	public void setEntity(final Entity entity) {
71
		this.entity = entity;
72
	}
73

  
74
	public void setActionSetId(final String actionSetId) {
75
		this.actionSetId = actionSetId;
76
	}
77

  
78
	public void setConfigurations(final Queue<DedupConfig> configurations) {
79
		this.configurations = configurations;
80
	}
81

  
82
	/**
83
	 * From json.
84
	 *
85
	 * @param json
86
	 *            the json
87
	 * @return the dedup configuration orchestration
88
	 */
89
	public static DedupConfigurationOrchestration fromJSON(final String json) {
90
		return new Gson().fromJson(json, DedupConfigurationOrchestration.class);
91
	}
92

  
93
	/*
94
	 * (non-Javadoc)
95
	 * 
96
	 * @see java.lang.Object#toString()
97
	 */
98
	@Override
99
	public String toString() {
100
		return new GsonBuilder().setPrettyPrinting().create().toJson(this);
101
	}
102

  
103
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupCheckEntitySequenceJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import java.util.Queue;
4

  
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9

  
10
import com.google.common.base.Function;
11
import com.google.common.base.Splitter;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

  
17
import eu.dnetlib.msro.rmi.MSROException;
18
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
19
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader;
20
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
21

  
22
public class DedupCheckEntitySequenceJobNode extends AsyncJobNode {
23

  
24
	private static final Log log = LogFactory.getLog(DedupCheckEntitySequenceJobNode.class);
25

  
26
	@Autowired
27
	private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader;
28

  
29
	private String dedupConfigSequenceParam;
30

  
31
	private String entitySequence;
32

  
33
	@Override
34
	protected String execute(final NodeToken token) throws Exception {
35

  
36
		if (StringUtils.isBlank(getEntitySequence())) throw new MSROException("missing entity sequence, e.g. a csv: organization,person,result");
37

  
38
		if (token.getFullEnv().hasAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER)) {
39
			log.info("reset env variable: " + DedupGrouperJobNode.DEDUP_GROUPER_LOOPER + " to zero");
40
			token.getFullEnv().setAttribute(DedupGrouperJobNode.DEDUP_GROUPER_LOOPER, 0);
41
		}
42

  
43
		if (!token.getEnv().hasAttribute("entitySequence")) {
44

  
45
			log.info("parsing config sequence: " + getEntitySequence());
46

  
47
			token.getEnv().setAttribute("entitySequence", getEntitySequence());
48

  
49
			final Iterable<String> sequence = Splitter.on(",").omitEmptyStrings().split(getEntitySequence());
50
			final Queue<DedupConfigurationOrchestration> q =
51
					Lists.newLinkedList(Iterables.transform(sequence, new Function<String, DedupConfigurationOrchestration>() {
52

  
53
						@Override
54
						public DedupConfigurationOrchestration apply(final String entityName) {
55
							try {
56
								final DedupConfigurationOrchestration dco = Iterables.getFirst(dedupOrchestrationLoader.loadByEntityName(entityName), null);
57
								if (dco == null) throw new RuntimeException("unable to find DedupOrchestration profile for entity type: " + entityName);
58
								return dco;
59
							} catch (final Throwable e) {
60
								throw new RuntimeException("", e);
61
							}
62
						}
63
					}));
64

  
65
			log.info("built sequence of dedup orchestration profiles, size: " + q.size());
66
			final DedupConfigurationOrchestration dco = q.remove();
67
			log.info("closing mesh for entity: " + dco.getEntity().getName());
68
			setDedupConfParams(token, dco);
69
			token.getEnv().setTransientAttribute("entitySequenceQueue", q);
70

  
71
			return Arc.DEFAULT_ARC;
72
		}
73

  
74
		@SuppressWarnings("unchecked")
75
		final Queue<DedupConfigurationOrchestration> q = (Queue<DedupConfigurationOrchestration>) token.getEnv().getTransientAttribute("entitySequenceQueue");
76

  
77
		if (!q.isEmpty()) {
78
			log.info("remaining dedup orchestration profiles: " + q.size());
79
			final DedupConfigurationOrchestration dco = q.remove();
80
			log.info("closing mesh for entity: " + dco.getEntity().getName());
81

  
82
			setDedupConfParams(token, dco);
83
			return Arc.DEFAULT_ARC;
84
		}
85

  
86
		log.info("completed closing mesh for entities: " + getEntitySequence());
87
		return "done";
88

  
89
	}
90

  
91
	private void setDedupConfParams(final NodeToken token, final DedupConfigurationOrchestration dco) {
92
		token.getEnv().setAttribute("entityType", dco.getEntity().getName());
93
		token.getEnv().setAttribute("entityTypeId", dco.getEntity().getCode());
94
		token.getEnv().setAttribute(getDedupConfigSequenceParam(), dco.toString());
95
	}
96

  
97
	public String getEntitySequence() {
98
		return entitySequence;
99
	}
100

  
101
	public void setEntitySequence(final String entitySequence) {
102
		this.entitySequence = entitySequence;
103
	}
104

  
105
	public String getDedupConfigSequenceParam() {
106
		return dedupConfigSequenceParam;
107
	}
108

  
109
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
110
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
111
	}
112

  
113
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupGrouperJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9

  
10
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12

  
13
public class DedupGrouperJobNode extends DedupConfigurationAwareJobNode {
14

  
15
	// TODO factor out this constant, it should be a configuration parameter
16
	public static final int DEDUP_GROUPER_MAX_LOOPS = 10;
17

  
18
	public static final String DEDUP_GROUPER_LOOPER = "dedup.grouper.looper";
19
	public static final String DEDUP_GROUPER_CURR_WRITTEN_RELS = "dedup.grouper.written.rels";
20
	public static final String DEDUP_GROUPER_PREV_WRITTEN_RELS = "dedup.grouper.prev.written.rels";
21

  
22
	private static final Log log = LogFactory.getLog(DedupGrouperJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener {
25

  
26
		public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) {
27
			super(engine, token);
28
		}
29

  
30
		@Override
31
		protected void onDone(final BlackboardJob job) {
32

  
33
			final int times = currentIteration(getToken());
34
			final String curr = job.getParameters().get(DEDUP_GROUPER_CURR_WRITTEN_RELS);
35

  
36
			if (times == 0) {
37
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, -1);
38
			}
39

  
40
			if ((times >= DEDUP_GROUPER_MAX_LOOPS) || isStable(getToken(), curr)) {
41
				super.complete(job, "done");
42
			} else {
43
				log.info("incrementing dedup.grouper.looper to " + (times + 1));
44
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_LOOPER, times + 1);
45
				getToken().getFullEnv().setAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS, curr);
46
				super.complete(job, Arc.DEFAULT_ARC);
47
			}
48
		}
49
	}
50

  
51
	private int currentIteration(final NodeToken token) {
52
		try {
53
			final String sTimes = token.getFullEnv().getAttribute(DEDUP_GROUPER_LOOPER);
54
			log.info("read dedup.grouper.looper from fullEnv: '" + sTimes + "'");
55
			return Integer.parseInt(sTimes);
56
		} catch (final NumberFormatException e) {
57
			log.info("got empty dedup.grouper.looper, initializing to 0");
58
			return 0;
59
		}
60
	}
61

  
62
	private boolean isStable(final NodeToken token, final String sCurr) {
63
		final String sPrev = token.getFullEnv().getAttribute(DEDUP_GROUPER_PREV_WRITTEN_RELS);
64

  
65
		log.info("Comparing written rels, prev=" + sPrev + ", curr=" + sCurr);
66
		try {
67
			final boolean b = Integer.parseInt(sCurr) == Integer.parseInt(sPrev);
68
			if (b) {
69
				log.info("  --- The number of written rels is STABLE");
70
			}
71
			return b;
72
		} catch (final Exception e) {
73
			log.error("Invalid parsing of written rels counters - curr: " + sCurr + ", prev: " + sPrev);
74
			return false;
75
		}
76
	}
77

  
78
	@Override
79
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
80
		return new DedupBlackboardWorkflowJobListener(engine, token);
81
	}
82

  
83
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/DedupCheckConfigurationJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8

  
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.NodeToken;
11

  
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
14
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
15
import eu.dnetlib.msro.rmi.MSROException;
16
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestration;
17
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
18

  
19
public class DedupCheckConfigurationJobNode extends AsyncJobNode {
20

  
21
	private static final Log log = LogFactory.getLog(DedupCheckConfigurationJobNode.class);
22

  
23
	@Autowired
24
	private UniqueServiceLocator serviceLocator;
25

  
26
	private String dedupConfigSequenceParam;
27

  
28
	@Override
29
	protected String execute(final NodeToken token) throws Exception {
30

  
31
		final String dcoJson = token.getEnv().getAttribute(getDedupConfigSequenceParam());
32

  
33
		final DedupConfigurationOrchestration dco = DedupConfigurationOrchestration.fromJSON(dcoJson);
34

  
35
		if (!existActionSetProfile(dco)) throw new MSROException("missing action set profile: " + dco.getActionSetId());
36

  
37
		log.info("found action set profile: " + dco.getActionSetId());
38

  
39
		return Arc.DEFAULT_ARC;
40
	}
41

  
42
	private boolean existActionSetProfile(final DedupConfigurationOrchestration dco) throws ISLookUpException {
43
		final String xquery = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'DedupOrchestrationDSResourceType' and .//ACTION_SET/@id='%s'] return 1";
44
		log.info("looking for action set profile, id: " + dco.getActionSetId());
45
		final List<String> actionSets = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(xquery, dco.getActionSetId()));
46
		return !actionSets.isEmpty();
47
	}
48

  
49
	public String getDedupConfigSequenceParam() {
50
		return dedupConfigSequenceParam;
51
	}
52

  
53
	public void setDedupConfigSequenceParam(final String dedupConfigSequenceParam) {
54
		this.dedupConfigSequenceParam = dedupConfigSequenceParam;
55
	}
56

  
57
}
modules/dnet-deduplication/tags/dnet-deduplication-1.1.8/src/main/java/eu/dnetlib/msro/workflows/dedup/PrepareDedupIndexJobNode.java
1
package eu.dnetlib.msro.workflows.dedup;
2

  
3
import java.util.List;
4
import java.util.Map;
5

  
6
import javax.annotation.Resource;
7

  
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Autowired;
12
import org.springframework.beans.factory.annotation.Required;
13

  
14
import com.google.common.collect.Iterables;
15
import com.google.gson.Gson;
16
import com.googlecode.sarasvati.Arc;
17
import com.googlecode.sarasvati.NodeToken;
18

  
19
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
21
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
22
import eu.dnetlib.miscutils.datetime.DateUtils;
23
import eu.dnetlib.msro.workflows.dedup.conf.DedupConfigurationOrchestrationLoader;
24
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
25

  
26
public class PrepareDedupIndexJobNode extends SimpleJobNode {
27

  
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(PrepareDedupIndexJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
32

  
33
	@Resource
34
	private UniqueServiceLocator serviceLocator;
35

  
36
	@Autowired
37
	private DedupConfigurationOrchestrationLoader dedupOrchestrationLoader;
38

  
39
	private String rottenRecordsPathParam;
40

  
41
	private String hbaseTable;
42

  
43
	private String dedupConfig;
44

  
45
	@Override
46
	protected String execute(final NodeToken token) throws Exception {
47

  
48
		log.info("start preparing job");
49

  
50
		final String fields = getFields(env("format", token), env("layout", token));
51
		token.getEnv().setAttribute("index.fields", fields);
52

  
53
		if (!StringUtils.isBlank(getRottenRecordsPathParam())) {
54
			token.getEnv().setAttribute(getRottenRecordsPathParam(), "/tmp" + getFileName(token, "rottenrecords"));
55
		}
56

  
57
		token.getEnv().setAttribute("index.solr.url", getIndexSolrUrlZk());
58
		token.getEnv().setAttribute("index.solr.collection", getCollectionName(token));
59

  
60
		token.getEnv().setAttribute("index.shutdown.wait.time", getIndexSolrShutdownWait());
61
		token.getEnv().setAttribute("index.buffer.flush.threshold", getIndexBufferFlushTreshold());
62
		token.getEnv().setAttribute("index.solr.sim.mode", isFeedingSimulationMode());
63

  
64
		token.getEnv().setAttribute("index.feed.timestamp", DateUtils.now_ISO8601());
65

  
66
		@SuppressWarnings("unchecked")
67
		final List<Map<String, String>> sets = new Gson().fromJson(token.getEnv().getAttribute("sets"), List.class);
68

  
69
		token.getEnv().setAttribute("actionset", Iterables.getOnlyElement(sets).get("set"));
70
		token.getEnv().setAttribute("entityType", token.getEnv().getAttribute("entityType"));
71
		token.getEnv().setAttribute("entityTypeId", token.getEnv().getAttribute("entityTypeId"));
72

  
73
		return Arc.DEFAULT_ARC;
74
	}
75

  
76
	private String getFields(final String format, final String layout) throws ISLookUpException {
77
		return isLookup(String
78
				.format("<FIELDS>{for $x in collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']/FIELDS/FIELD return $x[string(@path)]}</FIELDS>",
79
						format, layout));
80
	}
81

  
82
	public String getIndexSolrUrlZk() throws ISLookUpException {
83
		return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
84
	}
85

  
86
	public String getIndexSolrShutdownWait() throws ISLookUpException {
87
		return queryForServiceProperty("solr:feedingShutdownTolerance");
88
	}
89

  
90
	public String getIndexBufferFlushTreshold() throws ISLookUpException {
91
		return queryForServiceProperty("solr:feedingBufferFlushThreshold");
92
	}
93

  
94
	public String isFeedingSimulationMode() throws ISLookUpException {
95
		return queryForServiceProperty("solr:feedingSimulationMode");
96
	}
97

  
98
	private String queryForServiceProperty(final String key) throws ISLookUpException {
99
		return isLookup("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff