Project

General

Profile

1
package eu.dnetlib.msro.workflows.dedup;
2

    
3
import java.util.Queue;
4

    
5
import org.apache.commons.collections.CollectionUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12

    
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
15
import eu.dnetlib.pace.config.DedupConfig;
16

    
17
public class DedupDuplicateScanJobNode extends DedupConfigurationAwareJobNode {
18

    
19
	private static final Log log = LogFactory.getLog(DedupDuplicateScanJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
20

    
21
	private class DedupBlackboardWorkflowJobListener extends BlackboardWorkflowJobListener {
22

    
23
		public DedupBlackboardWorkflowJobListener(final Engine engine, final NodeToken token) {
24
			super(engine, token);
25
		}
26

    
27
		@Override
28
		protected void onDone(final BlackboardJob job) {
29

    
30
			final Queue<DedupConfig> confs = dedupConfigurations(getToken());
31

    
32
			confs.poll();
33

    
34
			log.info("checking dedup configs queue, size: " + confs.size() + " configs: " + confs);
35

    
36
			if (CollectionUtils.isEmpty(confs)) {
37
				log.info("dedup similarity scan done");
38
				super.complete(job, "done");
39
			} else {
40
				log.info("remaining confs: " + confs);
41

    
42
				getToken().getEnv().setAttribute(getDedupConfigSequenceParam(), confs.toString());
43

    
44
				super.complete(job, Arc.DEFAULT_ARC);
45
			}
46
		}
47
	}
48

    
49
	@Override
50
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
51
		return new DedupBlackboardWorkflowJobListener(engine, token);
52
	}
53

    
54
}
(4-4/8)