Project

General

Profile

1 43783 sandro.lab
package eu.dnetlib.resolver.mdstore.plugin;
2
3 45816 sandro.lab
import java.util.HashMap;
4 43783 sandro.lab
import java.util.List;
5 45816 sandro.lab
import java.util.Map;
6 43783 sandro.lab
import java.util.concurrent.BlockingQueue;
7
import java.util.concurrent.Callable;
8
9
import com.mongodb.DBObject;
10
import com.mongodb.client.MongoCollection;
11 45816 sandro.lab
import eu.dnetlib.dli.DLIUtils;
12 44356 claudio.at
import eu.dnetlib.dli.resolver.PIDResolver;
13 45816 sandro.lab
import eu.dnetlib.dli.resolver.model.*;
14 44356 claudio.at
import eu.dnetlib.dli.resolver.model.serializer.ResolverSerializer;
15 45816 sandro.lab
import eu.dnetlib.miscutils.collections.Pair;
16 44050 sandro.lab
import eu.dnetlib.resolver.parser.DLIParser;
17 43783 sandro.lab
import org.antlr.stringtemplate.StringTemplate;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
22 45816 sandro.lab
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
23
24 43783 sandro.lab
/**
25
 * Created by sandro on 9/22/16.
26
 */
27
public class RecordResolver implements Callable<Boolean> {
28
29
	private static final Log log = LogFactory.getLog(RecordResolver.class);
30 44050 sandro.lab
31
	private final DLIParser parser = new DLIParser();
32 45917 sandro.lab
    private final Long timestamp;
33
    private List<PIDResolver> pluginResolver;
34 43783 sandro.lab
	private BlockingQueue<DBObject> inputQueue;
35
	private MongoCollection<DBObject> outputCollection;
36 44050 sandro.lab
	private ResolverSerializer serializer;
37
38 45917 sandro.lab
    public RecordResolver(final long ts) {
39
        this.timestamp = Long.valueOf(ts);
40
    }
41
42 46475 sandro.lab
    private String resolve(final String inputRecord) {
43 44050 sandro.lab
44
		final ResolvedObject inputObject = parser.parse(inputRecord);
45 45835 sandro.lab
		if (inputObject != null && StringUtils.isNoneBlank(inputObject.getResolvedDate()))
46
			return null;
47 45628 sandro.lab
48
		if (inputObject != null && !StringUtils.isBlank(inputObject.getPid())) {
49
			log.debug("trying to resolve " + inputObject.getPid());
50
		}
51
52 43783 sandro.lab
		boolean shouldUpdate = false;
53
		if (inputObject.getCompletionStatus() == null || !inputObject.getCompletionStatus().equals(CompletionStatus.complete.toString())) {
54
			shouldUpdate = shouldUpdate || tryToResolveRecord(inputObject);
55
		}
56
		if (inputObject.getRelations() != null) {
57
			for (ObjectRelation rel : inputObject.getRelations()) {
58 45816 sandro.lab
                final Map<String, ObjectType> resolvedRelation = tryToResolveRelation(rel.getTargetPID());
59
                if (resolvedRelation != null && !resolvedRelation.isEmpty()) {
60
                    resolvedRelation.entrySet()
61
                            .forEach(e -> {
62
                                rel.setTargetPID(new PID(e.getKey(), "dnet"));
63
                                rel.setTargetType(e.getValue());
64
                            });
65
                    shouldUpdate = true;
66 43783 sandro.lab
				}
67
			}
68
			if (shouldUpdate) {
69
				final String newXML = serializer.serializeReplacingXML(inputRecord, inputObject);
70
				return newXML;
71
			}
72
		}
73
		return null;
74
	}
75
76
	public void setTemplate(final StringTemplate template) {
77 43939 sandro.lab
		serializer.setDmfTemplate(template);
78 43783 sandro.lab
	}
79
80 45816 sandro.lab
    private Map<String, ObjectType> tryToResolveRelation(final PID currentPid) {
81
        if (currentPid.getId() != null && currentPid.getId().contains("dli_resolver")) {
82 43783 sandro.lab
			return null;
83
		}
84 45628 sandro.lab
		log.debug("tryToResolveRelation " + currentPid);
85
86 43783 sandro.lab
		for (PIDResolver resolver : pluginResolver) {
87 45816 sandro.lab
            final ResolvedObject currentIdentifier = resolver.retrievePID(currentPid.getId(), currentPid.getType());
88
89
90
            if (currentIdentifier != null && !StringUtils.isBlank(currentIdentifier.getPid()) && currentIdentifier.getPid().toLowerCase().equals(currentPid.getId().toLowerCase())) {
91
                final HashMap<String, ObjectType> result = new HashMap<>();
92 45835 sandro.lab
				result.put("dli_resolver::" + DLIUtils.generateIdentifier(currentIdentifier.getPid(), currentIdentifier.getPidType()),
93
						currentIdentifier.getType());
94 45816 sandro.lab
                return result;
95
            }
96 43783 sandro.lab
		}
97
		return null;
98
	}
99
100
	private boolean tryToResolveRecord(final ResolvedObject object) {
101
102
		for (PIDResolver resolver : pluginResolver) {
103
			final ResolvedObject resolvedObject = resolver.retrievePID(object.getPid(), object.getPidType());
104
			if (resolvedObject != null &&
105
					resolvedObject.getCompletionStatus() != null &&
106
					resolvedObject.getCompletionStatus().toString().equals(CompletionStatus.complete.toString())) {
107
				{
108
					object.setAuthors(resolvedObject.getAuthors());
109
					object.setTitles(resolvedObject.getTitles());
110
					object.setCompletionStatus(resolvedObject.getCompletionStatus());
111
					object.setDate(resolvedObject.getDate());
112
					object.getDatasourceProvenance().addAll(resolvedObject.getDatasourceProvenance());
113 44352 sandro.lab
					object.setDescription(resolvedObject.getDescription());
114
					object.setSubjects(resolvedObject.getSubjects());
115 43783 sandro.lab
					object.setType(resolvedObject.getType());
116 45628 sandro.lab
					log.debug("Record Resolved by " + resolver.getClass().getCanonicalName() + "  PID: " + object.getPid());
117 43783 sandro.lab
					return true;
118
				}
119
			}
120
		}
121 45628 sandro.lab
		log.debug("Record NOT Resolved  PID: " + object.getPid());
122 43783 sandro.lab
		return false;
123
	}
124
125
	public void setPluginResolver(final List<PIDResolver> pluginResolver) {
126
		this.pluginResolver = pluginResolver;
127
	}
128
129
	@Override
130
	public Boolean call() throws Exception {
131 46475 sandro.lab
132
        log.info("START HERE!");
133
134 43783 sandro.lab
		DBObject currentObject = inputQueue.take();
135 44283 sandro.lab
		int i = 0;
136 45917 sandro.lab
        String currentRecord = null;
137 46216 sandro.lab
        double sumTotal = 0;
138 45917 sandro.lab
        while (currentObject != ResolverMDStorePlugin.DONE) {
139 43783 sandro.lab
			try {
140 45917 sandro.lab
                currentRecord = (String) currentObject.get("body");
141
                if (currentObject.get("resolved_ts") == null) {
142 46216 sandro.lab
                    final double start = System.currentTimeMillis();
143 45917 sandro.lab
                    final String resolvedRecord = resolve(currentRecord);
144
                    if (resolvedRecord != null) {
145
                        currentObject.put("body", resolvedRecord);
146
                        currentObject.removeField("_id");
147
                        currentObject.put("resolved_ts", timestamp);
148
                        outputCollection.insertOne(currentObject);
149
                    }
150 46216 sandro.lab
                    final double total = System.currentTimeMillis() - start;
151
                    sumTotal += total;
152
153 45917 sandro.lab
                }
154 46475 sandro.lab
155 43783 sandro.lab
				currentObject = inputQueue.take();
156 46216 sandro.lab
                if (i++ % 100 == 0) {
157 46475 sandro.lab
                    log.debug(Thread.currentThread().getId() + " total object resolved: " + i + "in average time " + (sumTotal / 100) + "ms");
158 46216 sandro.lab
                    sumTotal = 0;
159
                }
160
            } catch (Throwable e) {
161 45917 sandro.lab
                log.error("Error on resolving objects " + currentRecord, e);
162
                return false;
163 43783 sandro.lab
			}
164
		}
165
		if (currentObject == ResolverMDStorePlugin.DONE) {
166
			inputQueue.put(currentObject);
167
		}
168
		return true;
169
170
	}
171
172
	public BlockingQueue<DBObject> getInputQueue() {
173
		return inputQueue;
174
	}
175
176
	public void setInputQueue(final BlockingQueue<DBObject> inputQueue) {
177
		this.inputQueue = inputQueue;
178
	}
179
180
	public MongoCollection<DBObject> getOutputCollection() {
181
		return outputCollection;
182
	}
183
184
	public void setOutputCollection(final MongoCollection<DBObject> outputCollection) {
185
		this.outputCollection = outputCollection;
186
	}
187 44050 sandro.lab
188
	public void setSerializer(final ResolverSerializer serializer) {
189
		final ResolverSerializer tmp = new ResolverSerializer();
190
		tmp.setPmfTemplate(new StringTemplate(serializer.getPmfTemplate().getTemplate()));
191
		tmp.setDmfTemplate(new StringTemplate(serializer.getDmfTemplate().getTemplate()));
192
		tmp.setScholixTemplate(new StringTemplate(serializer.getScholixTemplate().getTemplate()));
193
		this.serializer = tmp;
194
	}
195 43783 sandro.lab
}