Project

General

Profile

1
package eu.dnetlib.resolver.mdstore.plugin;
2

    
3
import java.util.HashMap;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.concurrent.BlockingQueue;
7
import java.util.concurrent.Callable;
8

    
9
import com.mongodb.DBObject;
10
import com.mongodb.client.MongoCollection;
11
import eu.dnetlib.dli.DLIUtils;
12
import eu.dnetlib.dli.resolver.PIDResolver;
13
import eu.dnetlib.dli.resolver.model.*;
14
import eu.dnetlib.dli.resolver.model.serializer.ResolverSerializer;
15
import eu.dnetlib.miscutils.collections.Pair;
16
import eu.dnetlib.resolver.parser.DLIParser;
17
import org.antlr.stringtemplate.StringTemplate;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21

    
22
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
23

    
24
/**
25
 * Created by sandro on 9/22/16.
26
 */
27
public class RecordResolver implements Callable<Boolean> {
28

    
29
	private static final Log log = LogFactory.getLog(RecordResolver.class);
30

    
31
	private final DLIParser parser = new DLIParser();
32
    private final Long timestamp;
33
    private List<PIDResolver> pluginResolver;
34
	private BlockingQueue<DBObject> inputQueue;
35
	private MongoCollection<DBObject> outputCollection;
36
	private ResolverSerializer serializer;
37

    
38
    public RecordResolver(final long ts) {
39
        this.timestamp = Long.valueOf(ts);
40
    }
41

    
42
    private String resolve(final String inputRecord) {
43

    
44
		final ResolvedObject inputObject = parser.parse(inputRecord);
45
		if (inputObject != null && StringUtils.isNoneBlank(inputObject.getResolvedDate()))
46
			return null;
47

    
48
		if (inputObject != null && !StringUtils.isBlank(inputObject.getPid())) {
49
			log.debug("trying to resolve " + inputObject.getPid());
50
		}
51

    
52
		boolean shouldUpdate = false;
53
		if (inputObject.getCompletionStatus() == null || !inputObject.getCompletionStatus().equals(CompletionStatus.complete.toString())) {
54
			shouldUpdate = shouldUpdate || tryToResolveRecord(inputObject);
55
		}
56
		if (inputObject.getRelations() != null) {
57
			for (ObjectRelation rel : inputObject.getRelations()) {
58
                final Map<String, ObjectType> resolvedRelation = tryToResolveRelation(rel.getTargetPID());
59
                if (resolvedRelation != null && !resolvedRelation.isEmpty()) {
60
                    resolvedRelation.entrySet()
61
                            .forEach(e -> {
62
                                rel.setTargetPID(new PID(e.getKey(), "dnet"));
63
                                rel.setTargetType(e.getValue());
64
                            });
65
                    shouldUpdate = true;
66
				}
67
			}
68
			if (shouldUpdate) {
69
				final String newXML = serializer.serializeReplacingXML(inputRecord, inputObject);
70
				return newXML;
71
			}
72
		}
73
		return null;
74
	}
75

    
76
	public void setTemplate(final StringTemplate template) {
77
		serializer.setDmfTemplate(template);
78
	}
79

    
80
    private Map<String, ObjectType> tryToResolveRelation(final PID currentPid) {
81
        if (currentPid.getId() != null && currentPid.getId().contains("dli_resolver")) {
82
			return null;
83
		}
84
		log.debug("tryToResolveRelation " + currentPid);
85

    
86
		for (PIDResolver resolver : pluginResolver) {
87
            final ResolvedObject currentIdentifier = resolver.retrievePID(currentPid.getId(), currentPid.getType());
88

    
89

    
90
            if (currentIdentifier != null && !StringUtils.isBlank(currentIdentifier.getPid()) && currentIdentifier.getPid().toLowerCase().equals(currentPid.getId().toLowerCase())) {
91
                final HashMap<String, ObjectType> result = new HashMap<>();
92
				result.put("dli_resolver::" + DLIUtils.generateIdentifier(currentIdentifier.getPid(), currentIdentifier.getPidType()),
93
						currentIdentifier.getType());
94
                return result;
95
            }
96
		}
97
		return null;
98
	}
99

    
100
	private boolean tryToResolveRecord(final ResolvedObject object) {
101

    
102
		for (PIDResolver resolver : pluginResolver) {
103
			final ResolvedObject resolvedObject = resolver.retrievePID(object.getPid(), object.getPidType());
104
			if (resolvedObject != null &&
105
					resolvedObject.getCompletionStatus() != null &&
106
					resolvedObject.getCompletionStatus().toString().equals(CompletionStatus.complete.toString())) {
107
				{
108
					object.setAuthors(resolvedObject.getAuthors());
109
					object.setTitles(resolvedObject.getTitles());
110
					object.setCompletionStatus(resolvedObject.getCompletionStatus());
111
					object.setDate(resolvedObject.getDate());
112
					object.getDatasourceProvenance().addAll(resolvedObject.getDatasourceProvenance());
113
					object.setDescription(resolvedObject.getDescription());
114
					object.setSubjects(resolvedObject.getSubjects());
115
					object.setType(resolvedObject.getType());
116
					log.debug("Record Resolved by " + resolver.getClass().getCanonicalName() + "  PID: " + object.getPid());
117
					return true;
118
				}
119
			}
120
		}
121
		log.debug("Record NOT Resolved  PID: " + object.getPid());
122
		return false;
123
	}
124

    
125
	public void setPluginResolver(final List<PIDResolver> pluginResolver) {
126
		this.pluginResolver = pluginResolver;
127
	}
128

    
129
	@Override
130
	public Boolean call() throws Exception {
131

    
132
        log.info("START HERE!");
133

    
134
		DBObject currentObject = inputQueue.take();
135
		int i = 0;
136
        String currentRecord = null;
137
        double sumTotal = 0;
138
        while (currentObject != ResolverMDStorePlugin.DONE) {
139
			try {
140
                currentRecord = (String) currentObject.get("body");
141
                if (currentObject.get("resolved_ts") == null) {
142
                    final double start = System.currentTimeMillis();
143
                    final String resolvedRecord = resolve(currentRecord);
144
                    if (resolvedRecord != null) {
145
                        currentObject.put("body", resolvedRecord);
146
                        currentObject.removeField("_id");
147
                        currentObject.put("resolved_ts", timestamp);
148
                        outputCollection.insertOne(currentObject);
149
                    }
150
                    final double total = System.currentTimeMillis() - start;
151
                    sumTotal += total;
152

    
153
                }
154

    
155
				currentObject = inputQueue.take();
156
                if (i++ % 100 == 0) {
157
                    log.debug(Thread.currentThread().getId() + " total object resolved: " + i + "in average time " + (sumTotal / 100) + "ms");
158
                    sumTotal = 0;
159
                }
160
            } catch (Throwable e) {
161
                log.error("Error on resolving objects " + currentRecord, e);
162
                return false;
163
			}
164
		}
165
		if (currentObject == ResolverMDStorePlugin.DONE) {
166
			inputQueue.put(currentObject);
167
		}
168
		return true;
169

    
170
	}
171

    
172
	public BlockingQueue<DBObject> getInputQueue() {
173
		return inputQueue;
174
	}
175

    
176
	public void setInputQueue(final BlockingQueue<DBObject> inputQueue) {
177
		this.inputQueue = inputQueue;
178
	}
179

    
180
	public MongoCollection<DBObject> getOutputCollection() {
181
		return outputCollection;
182
	}
183

    
184
	public void setOutputCollection(final MongoCollection<DBObject> outputCollection) {
185
		this.outputCollection = outputCollection;
186
	}
187

    
188
	public void setSerializer(final ResolverSerializer serializer) {
189
		final ResolverSerializer tmp = new ResolverSerializer();
190
		tmp.setPmfTemplate(new StringTemplate(serializer.getPmfTemplate().getTemplate()));
191
		tmp.setDmfTemplate(new StringTemplate(serializer.getDmfTemplate().getTemplate()));
192
		tmp.setScholixTemplate(new StringTemplate(serializer.getScholixTemplate().getTemplate()));
193
		this.serializer = tmp;
194
	}
195
}
(1-1/2)