Project

General

Profile

1 43783 sandro.lab
package eu.dnetlib.resolver.mdstore.plugin;
2
3
import com.mongodb.DBObject;
4
import com.mongodb.client.MongoCollection;
5 45816 sandro.lab
import eu.dnetlib.dli.DLIUtils;
6 49184 sandro.lab
import eu.dnetlib.dli.resolver.model.CompletionStatus;
7
import eu.dnetlib.dli.resolver.model.DLIObjectRelation;
8
import eu.dnetlib.dli.resolver.model.DLIResolvedObject;
9
import eu.dnetlib.dli.resolver.model.PID;
10 44356 claudio.at
import eu.dnetlib.dli.resolver.model.serializer.ResolverSerializer;
11 49184 sandro.lab
import eu.dnetlib.pid.resolver.PIDResolver;
12
import eu.dnetlib.pid.resolver.model.ObjectType;
13 44050 sandro.lab
import eu.dnetlib.resolver.parser.DLIParser;
14 43783 sandro.lab
import org.antlr.stringtemplate.StringTemplate;
15
import org.apache.commons.lang3.StringUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
19 49184 sandro.lab
import java.util.HashMap;
20
import java.util.List;
21
import java.util.Map;
22
import java.util.concurrent.BlockingQueue;
23
import java.util.concurrent.Callable;
24 45816 sandro.lab
25 43783 sandro.lab
/**
26
 * Created by sandro on 9/22/16.
27
 */
28
public class RecordResolver implements Callable<Boolean> {
29
30
	private static final Log log = LogFactory.getLog(RecordResolver.class);
31 44050 sandro.lab
32
	private final DLIParser parser = new DLIParser();
33 45917 sandro.lab
    private final Long timestamp;
34
    private List<PIDResolver> pluginResolver;
35 43783 sandro.lab
	private BlockingQueue<DBObject> inputQueue;
36
	private MongoCollection<DBObject> outputCollection;
37 44050 sandro.lab
	private ResolverSerializer serializer;
38
39 45917 sandro.lab
    public RecordResolver(final long ts) {
40
        this.timestamp = Long.valueOf(ts);
41
    }
42
43 46475 sandro.lab
    private String resolve(final String inputRecord) {
44 44050 sandro.lab
45 49184 sandro.lab
        final DLIResolvedObject inputObject = parser.parse(inputRecord);
46
        if (inputObject != null && StringUtils.isNoneBlank(inputObject.getResolvedDate()))
47 45835 sandro.lab
			return null;
48 45628 sandro.lab
49
		if (inputObject != null && !StringUtils.isBlank(inputObject.getPid())) {
50
			log.debug("trying to resolve " + inputObject.getPid());
51
		}
52
53 43783 sandro.lab
		boolean shouldUpdate = false;
54
		if (inputObject.getCompletionStatus() == null || !inputObject.getCompletionStatus().equals(CompletionStatus.complete.toString())) {
55
			shouldUpdate = shouldUpdate || tryToResolveRecord(inputObject);
56
		}
57
		if (inputObject.getRelations() != null) {
58 49184 sandro.lab
            for (DLIObjectRelation rel : inputObject.getRelations()) {
59 45816 sandro.lab
                final Map<String, ObjectType> resolvedRelation = tryToResolveRelation(rel.getTargetPID());
60
                if (resolvedRelation != null && !resolvedRelation.isEmpty()) {
61
                    resolvedRelation.entrySet()
62
                            .forEach(e -> {
63
                                rel.setTargetPID(new PID(e.getKey(), "dnet"));
64
                                rel.setTargetType(e.getValue());
65
                            });
66
                    shouldUpdate = true;
67 43783 sandro.lab
				}
68
			}
69
			if (shouldUpdate) {
70
				final String newXML = serializer.serializeReplacingXML(inputRecord, inputObject);
71
				return newXML;
72
			}
73
		}
74
		return null;
75
	}
76
77
	public void setTemplate(final StringTemplate template) {
78 43939 sandro.lab
		serializer.setDmfTemplate(template);
79 43783 sandro.lab
	}
80
81 45816 sandro.lab
    private Map<String, ObjectType> tryToResolveRelation(final PID currentPid) {
82
        if (currentPid.getId() != null && currentPid.getId().contains("dli_resolver")) {
83 43783 sandro.lab
			return null;
84
		}
85 45628 sandro.lab
		log.debug("tryToResolveRelation " + currentPid);
86
87 43783 sandro.lab
		for (PIDResolver resolver : pluginResolver) {
88 49184 sandro.lab
            final DLIResolvedObject currentIdentifier = (DLIResolvedObject) resolver.retrievePID(currentPid.getId(), currentPid.getType());
89 45816 sandro.lab
90
91
            if (currentIdentifier != null && !StringUtils.isBlank(currentIdentifier.getPid()) && currentIdentifier.getPid().toLowerCase().equals(currentPid.getId().toLowerCase())) {
92
                final HashMap<String, ObjectType> result = new HashMap<>();
93 45835 sandro.lab
				result.put("dli_resolver::" + DLIUtils.generateIdentifier(currentIdentifier.getPid(), currentIdentifier.getPidType()),
94
						currentIdentifier.getType());
95 45816 sandro.lab
                return result;
96
            }
97 43783 sandro.lab
		}
98
		return null;
99
	}
100
101 49184 sandro.lab
    private boolean tryToResolveRecord(final DLIResolvedObject object) {
102 43783 sandro.lab
103
		for (PIDResolver resolver : pluginResolver) {
104 49184 sandro.lab
            final DLIResolvedObject resolvedObject = (DLIResolvedObject) resolver.retrievePID(object.getPid(), object.getPidType());
105
            if (resolvedObject != null &&
106 43783 sandro.lab
					resolvedObject.getCompletionStatus() != null &&
107
					resolvedObject.getCompletionStatus().toString().equals(CompletionStatus.complete.toString())) {
108
				{
109
					object.setAuthors(resolvedObject.getAuthors());
110
					object.setTitles(resolvedObject.getTitles());
111
					object.setCompletionStatus(resolvedObject.getCompletionStatus());
112
					object.setDate(resolvedObject.getDate());
113
					object.getDatasourceProvenance().addAll(resolvedObject.getDatasourceProvenance());
114 44352 sandro.lab
					object.setDescription(resolvedObject.getDescription());
115
					object.setSubjects(resolvedObject.getSubjects());
116 43783 sandro.lab
					object.setType(resolvedObject.getType());
117 45628 sandro.lab
					log.debug("Record Resolved by " + resolver.getClass().getCanonicalName() + "  PID: " + object.getPid());
118 43783 sandro.lab
					return true;
119
				}
120
			}
121
		}
122 45628 sandro.lab
		log.debug("Record NOT Resolved  PID: " + object.getPid());
123 43783 sandro.lab
		return false;
124
	}
125
126
	public void setPluginResolver(final List<PIDResolver> pluginResolver) {
127
		this.pluginResolver = pluginResolver;
128
	}
129
130
	@Override
131
	public Boolean call() throws Exception {
132 46475 sandro.lab
133
        log.info("START HERE!");
134
135 43783 sandro.lab
		DBObject currentObject = inputQueue.take();
136 44283 sandro.lab
		int i = 0;
137 45917 sandro.lab
        String currentRecord = null;
138 46216 sandro.lab
        double sumTotal = 0;
139 45917 sandro.lab
        while (currentObject != ResolverMDStorePlugin.DONE) {
140 43783 sandro.lab
			try {
141 45917 sandro.lab
                currentRecord = (String) currentObject.get("body");
142
                if (currentObject.get("resolved_ts") == null) {
143 46216 sandro.lab
                    final double start = System.currentTimeMillis();
144 45917 sandro.lab
                    final String resolvedRecord = resolve(currentRecord);
145
                    if (resolvedRecord != null) {
146
                        currentObject.put("body", resolvedRecord);
147
                        currentObject.removeField("_id");
148
                        currentObject.put("resolved_ts", timestamp);
149
                        outputCollection.insertOne(currentObject);
150
                    }
151 46216 sandro.lab
                    final double total = System.currentTimeMillis() - start;
152
                    sumTotal += total;
153
154 45917 sandro.lab
                }
155 46475 sandro.lab
156 43783 sandro.lab
				currentObject = inputQueue.take();
157 46216 sandro.lab
                if (i++ % 100 == 0) {
158 46475 sandro.lab
                    log.debug(Thread.currentThread().getId() + " total object resolved: " + i + "in average time " + (sumTotal / 100) + "ms");
159 46216 sandro.lab
                    sumTotal = 0;
160
                }
161
            } catch (Throwable e) {
162 45917 sandro.lab
                log.error("Error on resolving objects " + currentRecord, e);
163
                return false;
164 43783 sandro.lab
			}
165
		}
166
		if (currentObject == ResolverMDStorePlugin.DONE) {
167
			inputQueue.put(currentObject);
168
		}
169
		return true;
170
171
	}
172
173
	public BlockingQueue<DBObject> getInputQueue() {
174
		return inputQueue;
175
	}
176
177
	public void setInputQueue(final BlockingQueue<DBObject> inputQueue) {
178
		this.inputQueue = inputQueue;
179
	}
180
181
	public MongoCollection<DBObject> getOutputCollection() {
182
		return outputCollection;
183
	}
184
185
	public void setOutputCollection(final MongoCollection<DBObject> outputCollection) {
186
		this.outputCollection = outputCollection;
187
	}
188 44050 sandro.lab
189
	public void setSerializer(final ResolverSerializer serializer) {
190
		final ResolverSerializer tmp = new ResolverSerializer();
191
		tmp.setPmfTemplate(new StringTemplate(serializer.getPmfTemplate().getTemplate()));
192
		tmp.setDmfTemplate(new StringTemplate(serializer.getDmfTemplate().getTemplate()));
193
		tmp.setScholixTemplate(new StringTemplate(serializer.getScholixTemplate().getTemplate()));
194
		this.serializer = tmp;
195
	}
196 43783 sandro.lab
}