Project

General

Profile

« Previous | Next » 

Revision 55759

reintegrated branch solr75 -r53788:HEAD

View differences:

modules/dnet-deduplication/trunk/deploy.info
1 1
{
2 2
	"type_source": "SVN", 
3 3
	"goal": "package -U source:jar", 
4
	"url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-deduplication/trunk", 
4
	"url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-deduplication/trunk",
5 5
	"deploy_repository": "dnet45-snapshots", 
6 6
	"version": "4", 
7 7
	"mail": "alessia.bardi@isti.cnr.it", 
modules/dnet-deduplication/trunk/src/test/java/eu/dnetlib/data/dedup/DedupInspectorFunctionsTest.java
1
package eu.dnetlib.data.dedup;
2

  
3
import com.google.common.collect.Lists;
4
import eu.dnetlib.data.proto.OafProtos.Oaf;
5
import com.google.common.base.Function;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.junit.Before;
9
import org.junit.Test;
10

  
11
import java.util.Map;
12

  
13
public class DedupInspectorFunctionsTest {
14

  
15
    private static final Log log = LogFactory.getLog(DedupInspectorFunctionsTest.class);
16

  
17
    private String orgFromIndex = "<record>CAESigMIFCK7ARK4ARIlCiNJbnRlcm5hdGlvbmFsIFRlbm5pcyBGZWRlcmF0aW9uIEx0ZCoHCgVm\r\nYWxzZTIHCgVmYWxzZToHCgVmYWxzZUIHCgVmYWxzZUoHCgVmYWxzZVIHCgVmYWxzZVoHCgVmYWxz\r\nZWIHCgVmYWxzZWoHCgVmYWxzZXIHCgVmYWxzZYIBNAoCR0ISDlVuaXRlZCBLaW5nZG9tGg5kbmV0\r\nOmNvdW50cmllcyIOZG5ldDpjb3VudHJpZXNCMnJjdWtfX19fX19fXzo6ODYxOTIxQzMtNjcyMy00\r\nOUMwLUIyQ0UtRDcyODJBOUMxRTk2SkkKMTEwfG9wZW5haXJlX19fXzo6YWIyZDMzMTA3NDFlYTgw\r\nZDNiODcyNmY2NTE1MDI4NTgSFFJlc2VhcmNoIENvdW5jaWxzIFVLWgoyMDE3LTExLTA0YjEyMHxy\r\nY3VrX19fX19fX186OjVjYjFmZTZhYjg1NDcwMTBiMjJkOTAyN2U3MjUyMzVkagoyMDE5LTA1LTE5\r\nIqsBCAEQARoDMC45IiRkZWR1cC1zaW1pbGFyaXR5LW9yZ2FuaXphdGlvbi1zaW1wbGUqegoic3lz\r\naW1wb3J0OmNyb3Nzd2FsazplbnRpdHlyZWdpc3RyeRIic3lzaW1wb3J0OmNyb3Nzd2FsazplbnRp\r\ndHlyZWdpc3RyeRoXZG5ldDpwcm92ZW5hbmNlX2FjdGlvbnMiF2RuZXQ6cHJvdmVuYW5jZV9hY3Rp\r\nb25z\r\n\n</record>";
18

  
19
    private DedupIndexDAO dao;
20

  
21
    @Before
22
    public void setUp() {
23
        dao = new DedupIndexDAO();
24
    }
25

  
26
    @Test
27
    public void test_1() {
28

  
29
        Oaf oaf = dao.getXml2OafFunction().apply(orgFromIndex);
30
        System.out.println(oaf);
31

  
32

  
33
        Map<String, String> map = dao.getOaf2FieldMapFunction("organization", Lists.newArrayList("legalname", "legalshortname", "country", "websiteurl", "provenance")).apply(oaf);
34

  
35
        System.out.println(map);
36

  
37

  
38
    }
39

  
40
}
modules/dnet-deduplication/trunk/src/main/java/eu/dnetlib/msro/workflows/hadoop/hbase/DefineHBaseOpenaireSchemaJobNode.java
5 5
import com.googlecode.sarasvati.Arc;
6 6
import com.googlecode.sarasvati.NodeToken;
7 7
import eu.dnetlib.openaire.hadoop.utils.HBaseTableUtils;
8
import org.apache.commons.lang3.StringUtils;
8 9
import org.apache.commons.logging.Log;
9 10
import org.apache.commons.logging.LogFactory;
10 11

  
......
12 13

  
13 14
	private static final Log log = LogFactory.getLog(DefineHBaseOpenaireSchemaJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
14 15

  
16
	private String schema;
17

  
15 18
	@Override
16 19
	protected String execute(final NodeToken token) throws Exception {
17 20

  
18
		final Set<String> columns = HBaseTableUtils.listAllColumns();
19
		log.info("table definition: " + columns);
20
		token.getEnv().setAttribute(getTableColumnsParamName(), asCSV(columns));
21
		final String schemaOverride = StringUtils.isNotBlank(getSchema()) ? getSchema() : asCSV(HBaseTableUtils.listAllColumns());
22
		log.info("table definition: " + schemaOverride);
23
		token.getEnv().setAttribute(getTableColumnsParamName(), schemaOverride);
21 24

  
22 25
		return Arc.DEFAULT_ARC;
23 26
	}
24 27

  
28
	public String getSchema() {
29
		return schema;
30
	}
31

  
32
	public void setSchema(final String schema) {
33
		this.schema = schema;
34
	}
25 35
}
modules/dnet-deduplication/trunk/src/main/java/eu/dnetlib/data/dedup/DedupIndexDAO.java
12 12
import com.google.common.collect.Lists;
13 13
import com.google.common.collect.Maps;
14 14
import com.google.common.collect.Sets;
15
import com.google.protobuf.GeneratedMessage;
15 16
import eu.dnetlib.data.mapreduce.util.OafDecoder;
16 17
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
17 18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.data.transform.AbstractProtoMapper;
18 20
import eu.dnetlib.data.transform.OafEntityMerger;
19 21
import eu.dnetlib.data.transform.SolrProtoMapper;
20 22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
......
23 25
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24 26
import eu.dnetlib.functionality.index.client.IndexClient;
25 27
import eu.dnetlib.functionality.index.client.IndexClientException;
26
import eu.dnetlib.functionality.index.client.ResolvingIndexClientFactory;
27 28
import eu.dnetlib.functionality.index.client.response.LookupResponse;
29
import eu.dnetlib.functionality.index.client.solr.SolrIndexClient;
30
import eu.dnetlib.functionality.index.client.solr.SolrIndexClientFactory;
28 31
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
29 32
import eu.dnetlib.pace.config.DedupConfig;
33
import eu.dnetlib.pace.model.Field;
34
import eu.dnetlib.pace.model.FieldDef;
35
import eu.dnetlib.pace.model.FieldValueImpl;
36
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
30 37
import org.apache.commons.codec.binary.Base64;
31 38
import org.apache.commons.lang.StringUtils;
32 39
import org.apache.commons.logging.Log;
33 40
import org.apache.commons.logging.LogFactory;
34
import org.apache.solr.client.solrj.impl.CloudSolrServer;
41

  
42
import org.apache.solr.client.solrj.impl.CloudSolrClient;
35 43
import org.apache.solr.common.SolrInputDocument;
36 44
import org.dom4j.DocumentException;
37 45
import org.springframework.beans.factory.annotation.Autowired;
......
74 82
	 * The index client factory.
75 83
	 */
76 84
	@Autowired
77
	private ResolvingIndexClientFactory indexClientFactory;
85
	private SolrIndexClientFactory indexClientFactory;
78 86

  
79 87
	private IndexClient indexClient = null;
80 88

  
81
	@Value("${dnet.dedup.index.format}")
82
	private String indexFormat;
83

  
84 89
	@Value("${dnet.dedup.index.collection}")
85 90
	private String dedupIndexCollection;
86 91

  
......
119 124

  
120 125
		log.info("starting index update");
121 126

  
122
		final CloudSolrServer solrServer = getSolrServer();
123
		try {
124
			final SolrProtoMapper mapper = initProtoMapper();
127
		final SolrIndexClient indexClient = (SolrIndexClient) getIndexClient();
128
		final SolrProtoMapper mapper = initProtoMapper();
125 129

  
126
			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
127
			final List<SolrInputDocument> buffer = Lists.newLinkedList();
130
		final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
131
		final List<SolrInputDocument> buffer = Lists.newLinkedList();
128 132

  
129
			// mark as deleted all the documents in the group
130
			final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
131
			buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
133
		// mark as deleted all the documents in the group
134
		final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
135
		buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
132 136

  
133
			// elect a new representative
134
			final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
135
			final String newRootId = (String) newRoot.getFieldValue("objidentifier");
136
			// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
137
			buffer.add(newRoot);
137
		// elect a new representative
138
		final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
139
		final String newRootId = (String) newRoot.getFieldValue("objidentifier");
140
		// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
141
		buffer.add(newRoot);
138 142

  
139
			// mark as non deleted the documents taken away from the group
140
			final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
141
			buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
143
		// mark as non deleted the documents taken away from the group
144
		final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
145
		buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
142 146

  
143
			log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
147
		log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
144 148

  
145
			// add the changes to the server
146
			addStatus = solrServer.add(buffer).getStatus();
147
			log.debug("solr add status: " + addStatus);
149
		// add the changes to the server
150
		addStatus = indexClient.feed(buffer);
151
		log.debug("solr add status: " + addStatus);
148 152

  
149
			// delete the old representatives, avoiding to remove the current one (if it didn't change)
150
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
151
			for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
152
				solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
153
			}
153
		// delete the old representatives, avoiding to remove the current one (if it didn't change)
154
		log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
155
		for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
156
			indexClient.remove(mapper.getRecordId(rootId, group.getActionSet()));
157
		}
154 158

  
155
			commitStatus = solrServer.commit().getStatus();
159
		commitStatus = indexClient.commit().getStatus();
156 160

  
157
			log.debug("solr commit status: " + commitStatus);
158
		} finally {
159
			solrServer.shutdown();
160
		}
161
		log.debug("solr commit status: " + commitStatus);
161 162

  
162 163
		return (addStatus == 0) && (commitStatus == 0);
163 164
	}
......
170 171
		return Iterables.transform(r, getXml2OafFunction());
171 172
	}
172 173

  
173
	private Function<String, Oaf> getXml2OafFunction() {
174
	protected Function<String, Oaf> getXml2OafFunction() {
174 175
		return s -> {
175 176
			// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
176 177
			final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
......
189 190
				serviceLocator
190 191
						.getService(ISLookUpService.class)
191 192
						.getResourceProfileByQuery(
192
								"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='" + indexFormat
193
										+ "']//LAYOUT[@name='index']/FIELDS"));
193
								"collection('')//RESOURCE_PROFILE["
194
										+ ".//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and "
195
										+ ".//NAME='OPENAIRE']//LAYOUT[@name='index']/FIELDS"));
194 196
	}
195 197

  
196
	private CloudSolrServer getSolrServer() {
197
		final String zk = getIndexSolrUrlZk();
198
		log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
199
		final CloudSolrServer solrServer = new CloudSolrServer(zk);
200
		solrServer.setDefaultCollection(dedupIndexCollection);
201

  
202
		return solrServer;
203
	}
204

  
205
	private String getIndexSolrUrlZk() {
206
		try {
207
			return getResourceProfileByQuery(
208
					"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
209
		} catch (final ISLookUpException e) {
210
			throw new IllegalStateException("unable to read solr ZK url from service profile", e);
211
		}
212
	}
213

  
214 198
	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
215 199
		log.debug("quering for service property: " + xquery);
216 200
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
......
218 202
		return res;
219 203
	}
220 204

  
221
	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
205
	protected Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
222 206
		return oaf -> {
223 207

  
224
			final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
225 208
			final Map<String, String> res = Maps.newHashMap();
226 209
			final String oafId = cleanId(oaf.getEntity().getId());
227 210

  
......
237 220
			res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
238 221

  
239 222
			for (final String fieldName : fields) {
240
				res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
223

  
224
				res.put(fieldName, Joiner.on("; ").skipNulls().join(getFieldValues((GeneratedMessage) oaf.getEntity(), fieldName, paths.get(type).get(fieldName))));
241 225
			}
242 226

  
243 227
			return res;
244 228
		};
245 229
	}
246 230

  
231
	private List<String> getFieldValues(final GeneratedMessage m, final String fieldName, final String path) {
232
		return new SolrDocumentMapper().processPath(m, fieldName, path).stream()
233
				.map(o -> o.toString())
234
				.collect(Collectors.toCollection(LinkedList::new));
235
	}
236

  
237
	class SolrDocumentMapper extends AbstractProtoMapper {
238

  
239
		public List<Object> processPath(final GeneratedMessage m, final String fieldName, final String path) {
240
			final FieldDef fd = new FieldDef();
241
			fd.setName(fieldName);
242
			return processPath(m, fd, path);
243
		}
244
	}
245

  
247 246
	private String cleanId(final String id) {
248 247
		return id.replaceFirst(ID_PREFIX_REGEX, "");
249 248
	}
250 249

  
251 250
	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
252 251
		if (indexClient == null) {
253
			indexClient = indexClientFactory.getClient(indexFormat, "index", "dedup", "solr");
252
			indexClient = indexClientFactory.getClient(dedupIndexCollection);
254 253
		}
255 254
		return indexClient;
256 255
	}
modules/dnet-deduplication/trunk/pom.xml
10 10
	<groupId>eu.dnetlib</groupId>
11 11
	<artifactId>dnet-deduplication</artifactId>
12 12
	<packaging>jar</packaging>
13
	<version>1.6.1-SNAPSHOT</version>
13
	<version>1.6.7-SNAPSHOT</version>
14 14
	<scm>
15 15
		<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-deduplication/trunk</developerConnection>
16 16
	</scm>
......
38 38

  
39 39
		<dependency>
40 40
			<groupId>eu.dnetlib</groupId>
41
			<artifactId>dnet-index-solr-client</artifactId>
42
			<version>[2.4.2]</version>
41
			<artifactId>dnet-index-client</artifactId>
42
			<version>[2.3.3-solr75]</version>
43 43
		</dependency>
44 44

  
45 45
		<dependency>
46 46
			<groupId>eu.dnetlib</groupId>
47 47
			<artifactId>dnet-openaireplus-mapping-utils</artifactId>
48
			<version>[6.0.0,7.0.0)</version>
48
			<version>[6.2.21-solr75]</version>
49
			<exclusions>
50
				<exclusion>
51
					<groupId>eu.dnetlib</groupId>
52
					<artifactId>dnet-hadoop-commons</artifactId>
53
				</exclusion>
54
			</exclusions>
49 55
		</dependency>
50 56

  
51 57

  

Also available in: Unified diff