Project

General

Profile

1
package eu.dnetlib.resolver.mdstore.plugin;
2

    
3
import java.util.List;
4
import java.util.concurrent.BlockingQueue;
5
import java.util.concurrent.Callable;
6

    
7
import com.mongodb.DBObject;
8
import com.mongodb.client.MongoCollection;
9
import eu.dnetlib.resolver.PIDResolver;
10
import eu.dnetlib.resolver.model.CompletionStatus;
11
import eu.dnetlib.resolver.model.ObjectRelation;
12
import eu.dnetlib.resolver.model.PID;
13
import eu.dnetlib.resolver.model.ResolvedObject;
14
import eu.dnetlib.resolver.model.serializer.ResolverSerializer;
15
import eu.dnetlib.resolver.parser.DLIParser;
16
import org.antlr.stringtemplate.StringTemplate;
17
import org.apache.commons.lang3.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20

    
21
/**
22
 * Created by sandro on 9/22/16.
23
 */
24
public class RecordResolver implements Callable<Boolean> {
25

    
26
	private static final Log log = LogFactory.getLog(RecordResolver.class);
27

    
28
	private final DLIParser parser = new DLIParser();
29
	private List<PIDResolver> pluginResolver;
30
	private BlockingQueue<DBObject> inputQueue;
31
	private MongoCollection<DBObject> outputCollection;
32

    
33
	private ResolverSerializer serializer;
34

    
35
	public String resolve(final String inputRecord) {
36

    
37
		final ResolvedObject inputObject = parser.parse(inputRecord);
38
		boolean shouldUpdate = false;
39
		if (inputObject.getCompletionStatus() == null || !inputObject.getCompletionStatus().equals(CompletionStatus.complete.toString())) {
40
			shouldUpdate = shouldUpdate || tryToResolveRecord(inputObject);
41
		}
42
		if (inputObject.getRelations() != null) {
43
			for (ObjectRelation rel : inputObject.getRelations()) {
44
				final String dnetId = tryToResolveRelation(rel.getTargetPID());
45
				if (dnetId != null && !StringUtils.isEmpty(dnetId)) {
46
					rel.setTargetPID(new PID(dnetId, "dnet"));
47
					shouldUpdate = true;
48
				}
49
			}
50
			if (shouldUpdate) {
51
				final String newXML = serializer.serializeReplacingXML(inputRecord, inputObject);
52
				return newXML;
53
			}
54
		}
55
		return null;
56
	}
57

    
58
	public void setTemplate(final StringTemplate template) {
59
		serializer.setDmfTemplate(template);
60
	}
61

    
62
	private String tryToResolveRelation(final PID currentPid) {
63
		if (currentPid.getId() != null && currentPid.getId().contains("dli_resolver")) {
64
			return null;
65
		}
66
		for (PIDResolver resolver : pluginResolver) {
67
			final String currentIdentifier = resolver.retrieveDnetID(currentPid.getId(), currentPid.getType());
68
			if (currentIdentifier != null && !StringUtils.isBlank(currentIdentifier)) {
69
				return currentIdentifier;
70
			}
71
		}
72
		return null;
73
	}
74

    
75
	private boolean tryToResolveRecord(final ResolvedObject object) {
76

    
77
		for (PIDResolver resolver : pluginResolver) {
78
			final ResolvedObject resolvedObject = resolver.retrievePID(object.getPid(), object.getPidType());
79
			if (resolvedObject != null &&
80
					resolvedObject.getCompletionStatus() != null &&
81
					resolvedObject.getCompletionStatus().toString().equals(CompletionStatus.complete.toString())) {
82
				{
83
					object.setAuthors(resolvedObject.getAuthors());
84
					object.setTitles(resolvedObject.getTitles());
85
					object.setCompletionStatus(resolvedObject.getCompletionStatus());
86
					object.setDate(resolvedObject.getDate());
87
					object.getDatasourceProvenance().addAll(resolvedObject.getDatasourceProvenance());
88
					object.setDescription(resolvedObject.getDescription());
89
					object.setSubjects(resolvedObject.getSubjects());
90
					object.setType(resolvedObject.getType());
91
					return true;
92
				}
93
			}
94
		}
95
		return false;
96
	}
97

    
98
	public void setPluginResolver(final List<PIDResolver> pluginResolver) {
99
		this.pluginResolver = pluginResolver;
100
	}
101

    
102
	@Override
103
	public Boolean call() throws Exception {
104
		DBObject currentObject = inputQueue.take();
105
		int i = 0;
106
		while (currentObject != ResolverMDStorePlugin.DONE) {
107
			try {
108
				final String currentRecord = (String) currentObject.get("body");
109

    
110
				final String resolvedRecord = resolve(currentRecord);
111
				if (resolvedRecord != null) {
112
					currentObject.put("body", resolvedRecord);
113
					outputCollection.insertOne(currentObject);
114
				}
115

    
116
				currentObject = inputQueue.take();
117

    
118
				if (i++ % 100 == 0)
119
					log.debug(Thread.currentThread().getId() + " total object resolved: " + i);
120
			} catch (Throwable e) {
121
				log.error("Error on resolving objects ", e);
122
				return false;
123
			}
124
		}
125
		if (currentObject == ResolverMDStorePlugin.DONE) {
126
			inputQueue.put(currentObject);
127
		}
128
		return true;
129

    
130
	}
131

    
132
	public BlockingQueue<DBObject> getInputQueue() {
133
		return inputQueue;
134
	}
135

    
136
	public void setInputQueue(final BlockingQueue<DBObject> inputQueue) {
137
		this.inputQueue = inputQueue;
138
	}
139

    
140
	public MongoCollection<DBObject> getOutputCollection() {
141
		return outputCollection;
142
	}
143

    
144
	public void setOutputCollection(final MongoCollection<DBObject> outputCollection) {
145
		this.outputCollection = outputCollection;
146
	}
147

    
148
	public void setSerializer(final ResolverSerializer serializer) {
149
		final ResolverSerializer tmp = new ResolverSerializer();
150
		tmp.setPmfTemplate(new StringTemplate(serializer.getPmfTemplate().getTemplate()));
151
		tmp.setDmfTemplate(new StringTemplate(serializer.getDmfTemplate().getTemplate()));
152
		tmp.setScholixTemplate(new StringTemplate(serializer.getScholixTemplate().getTemplate()));
153
		this.serializer = tmp;
154
	}
155
}
(1-1/2)