Revision 45419
Added by Claudio Atzori over 7 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
public class FindPersonCoauthorsReducer { |
|
4 |
|
|
5 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/FindPersonCoauthorsMapper.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.util.List; |
|
5 |
//import java.util.NavigableMap; |
|
6 |
//import java.util.stream.Collectors; |
|
7 |
// |
|
8 |
//import org.apache.hadoop.hbase.client.Result; |
|
9 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
//import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
11 |
//import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
//import org.apache.hadoop.io.Text; |
|
13 |
//import org.apache.http.HttpResponse; |
|
14 |
//import org.apache.http.client.HttpClient; |
|
15 |
//import org.apache.http.client.methods.HttpPost; |
|
16 |
//import org.apache.http.client.methods.HttpUriRequest; |
|
17 |
//import org.apache.http.impl.client.DefaultHttpClient; |
|
18 |
//import org.apache.http.params.BasicHttpParams; |
|
19 |
//import org.apache.http.params.HttpParams; |
|
20 |
// |
|
21 |
//import com.google.common.collect.Lists; |
|
22 |
//import com.google.gson.Gson; |
|
23 |
// |
|
24 |
//import eu.dnetlib.data.mapreduce.hbase.VolatileColumnFamily; |
|
25 |
//import eu.dnetlib.data.proto.RelTypeProtos.RelType; |
|
26 |
// |
|
27 |
//public class FindPersonCoauthorsMapper extends TableMapper<Text, Text> { |
|
28 |
// |
|
29 |
// private final HttpClient client = new DefaultHttpClient(); |
|
30 |
// |
|
31 |
// private final String url = "http://146.48.87.97:8888/addData"; |
|
32 |
// |
|
33 |
// @Override |
|
34 |
// protected void setup(final Context context) { |
|
35 |
// // url = context.getConfiguration().get("dedup.person.coauthors.service.url"); |
|
36 |
// } |
|
37 |
// |
|
38 |
// @Override |
|
39 |
// protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException { |
|
40 |
// final NavigableMap<byte[], byte[]> candidates = row.getFamilyMap(Bytes.toBytes(VolatileColumnFamily.dedupPerson.toString())); |
|
41 |
// if ((candidates == null) || candidates.isEmpty()) return; |
|
42 |
// |
|
43 |
// final List<String> coauthors = Lists.newArrayList(); |
|
44 |
// final byte[] family = Bytes.toBytes(RelType.personPublication.toString()); |
|
45 |
// coauthors.addAll(row.getFamilyMap(family).keySet().stream().map(Bytes::toString).collect(Collectors.toList())); |
|
46 |
// |
|
47 |
// for (final byte[] candidate : candidates.keySet()) { |
|
48 |
// emit(context, Bytes.toString(candidate), coauthors); |
|
49 |
// } |
|
50 |
// } |
|
51 |
// |
|
52 |
// private void emit(final Context context, final String candidate, final List<String> coauthors) { |
|
53 |
// try { |
|
54 |
// final HttpUriRequest request = new HttpPost(url); |
|
55 |
// final HttpParams params = new BasicHttpParams(); |
|
56 |
// params.setParameter("id", candidate); |
|
57 |
// params.setParameter("data", (new Gson().toJson(coauthors))); |
|
58 |
// request.setParams(params); |
|
59 |
// final HttpResponse response = client.execute(request); |
|
60 |
// context.getCounter("HTTP call", "code " + response.getStatusLine().getStatusCode()).increment(1); |
|
61 |
// } catch (final Exception e) { |
|
62 |
// context.getCounter("HTTP call", "Exception " + e.getClass()).increment(1); |
|
63 |
// } |
|
64 |
// } |
|
65 |
//} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/DedupUtils.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.util; |
|
2 |
// |
|
3 |
import java.nio.ByteBuffer; |
|
4 |
|
|
5 |
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel; |
|
6 |
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel.Builder; |
|
7 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
|
8 |
|
|
9 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
10 |
import eu.dnetlib.pace.config.DedupConfig; |
|
11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
12 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 |
|
|
14 |
public class DedupUtils { |
|
15 |
|
|
16 |
public static final String CF_SEPARATOR = "_"; |
|
17 |
|
|
18 |
public static final String ROOT = "dedup_wf"; |
|
19 |
|
|
20 |
public static final String BODY_S = "body"; |
|
21 |
|
|
22 |
public static final byte[] BODY_B = Bytes.toBytes(BODY_S); |
|
23 |
|
|
24 |
public static String dedupPrefix(final String dedupRun) { |
|
25 |
return "|" + ROOT + "_" + dedupRun + "::"; |
|
26 |
} |
|
27 |
|
|
28 |
public static String newId(final String id, final String dedupRun) { |
|
29 |
if ((dedupRun == null) || (dedupRun.length() != 3)) throw new IllegalArgumentException("wrong dedupRun param"); |
|
30 |
|
|
31 |
return id.replaceFirst("\\|.*\\:\\:", dedupPrefix(dedupRun)); |
|
32 |
} |
|
33 |
|
|
34 |
public static byte[] newIdBytes(final String s, final String dedupRun) { |
|
35 |
return newId(s, dedupRun).getBytes(); |
|
36 |
} |
|
37 |
|
|
38 |
public static byte[] newIdBytes(final ByteBuffer b, final String dedupRun) { |
|
39 |
return newId(new String(b.array()), dedupRun).getBytes(); |
|
40 |
} |
|
41 |
|
|
42 |
public static boolean isRoot(final String s) { |
|
43 |
return s.contains(ROOT); |
|
44 |
} |
|
45 |
|
|
46 |
public static boolean isRoot(final ImmutableBytesWritable s) { |
|
47 |
return isRoot(s.copyBytes()); |
|
48 |
} |
|
49 |
|
|
50 |
public static boolean isRoot(final byte[] s) { |
|
51 |
return isRoot(new String(s)); |
|
52 |
} |
|
53 |
|
|
54 |
public static String getDedupCF_merges(final Type type) { |
|
55 |
return getRelType(type) + CF_SEPARATOR + "dedup" + CF_SEPARATOR + "merges"; |
|
56 |
} |
|
57 |
|
|
58 |
public static String getDedupCF_merges(final String type) { |
|
59 |
return getDedupCF_merges(Type.valueOf(type)); |
|
60 |
} |
|
61 |
|
|
62 |
public static byte[] getDedupCF_mergesBytes(final Type type) { |
|
63 |
return Bytes.toBytes(getDedupCF_merges(type)); |
|
64 |
} |
|
65 |
|
|
66 |
public static byte[] getDedupCF_mergesBytes(final String type) { |
|
67 |
return getDedupCF_mergesBytes(Type.valueOf(type)); |
|
68 |
} |
|
69 |
|
|
70 |
public static String getDedupCF_mergedIn(final Type type) { |
|
71 |
return getRelType(type) + CF_SEPARATOR + "dedup" + CF_SEPARATOR + "isMergedIn"; |
|
72 |
} |
|
73 |
|
|
74 |
public static String getDedupCF_mergedIn(final String type) { |
|
75 |
return getDedupCF_mergedIn(Type.valueOf(type)); |
|
76 |
} |
|
77 |
|
|
78 |
public static byte[] getDedupCF_mergedInBytes(final Type type) { |
|
79 |
return Bytes.toBytes(getDedupCF_mergedIn(type)); |
|
80 |
} |
|
81 |
|
|
82 |
public static byte[] getDedupCF_mergedInBytes(final String type) { |
|
83 |
return getDedupCF_mergedInBytes(Type.valueOf(type)); |
|
84 |
} |
|
85 |
|
|
86 |
public static String getSimilarityCF(final Type type) { |
|
87 |
return getRelType(type) + CF_SEPARATOR + "dedupSimilarity" + CF_SEPARATOR + "isSimilarTo"; |
|
88 |
} |
|
89 |
|
|
90 |
public static String getSimilarityCF(final String type) { |
|
91 |
return getSimilarityCF(Type.valueOf(type)); |
|
92 |
} |
|
93 |
|
|
94 |
public static byte[] getSimilarityCFBytes(final Type type) { |
|
95 |
return Bytes.toBytes(getSimilarityCF(type)); |
|
96 |
} |
|
97 |
|
|
98 |
public static byte[] getSimilarityCFBytes(final String type) { |
|
99 |
return getSimilarityCFBytes(Type.valueOf(type)); |
|
100 |
} |
|
101 |
|
|
102 |
public static String getRelTypeString(final Type type) { |
|
103 |
return getRelType(type).toString(); |
|
104 |
} |
|
105 |
|
|
106 |
public static String getRelType(final Type type) { |
|
107 |
switch (type) { |
|
108 |
case organization: |
|
109 |
return "organizationOrganization"; |
|
110 |
case person: |
|
111 |
return "personPerson"; |
|
112 |
case publication: |
|
113 |
return "publicationPublication"; |
|
114 |
case dataset: |
|
115 |
return "datasetDataset"; |
|
116 |
default: |
|
117 |
throw new IllegalArgumentException("Deduplication not supported for entity type: " + type); |
|
118 |
} |
|
119 |
} |
|
120 |
|
|
121 |
public static ColumnFamily decodeCF(final byte[] b) { |
|
122 |
final String[] s = new String(b).split(CF_SEPARATOR); |
|
123 |
return new DedupUtils().getCF( RelType.valueOf(s[0]), SubRelType.valueOf(s[1])); |
|
124 |
} |
|
125 |
|
|
126 |
private ColumnFamily getCF(final RelType relType, final SubRelType subRelType) { |
|
127 |
return new ColumnFamily(relType, subRelType); |
|
128 |
} |
|
129 |
|
|
130 |
public static DNGFRel.Builder getDedup(final DedupConfig dedupConf, final String from, final String to, final String relClass) { |
|
131 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
132 |
|
|
133 |
RelDescriptor descriptor = new RelDescriptor(relClass); |
|
134 |
|
|
135 |
final Builder oafRel = |
|
136 |
DNGFRel.newBuilder() |
|
137 |
.setRelType(Qualifier.newBuilder() |
|
138 |
.setClassid(descriptor.getTermCode()).setClassname(descriptor.getTermCode()) |
|
139 |
.setSchemeid(descriptor.getOntologyCode()).setSchemename(descriptor.getOntologyCode())) |
|
140 |
.setSource(new String(from)).setSourceType(DNGFRowKeyDecoder.decode(new String(from)).getType()) |
|
141 |
.setTarget(new String(to)).setTargetType(DNGFRowKeyDecoder.decode(new String(to)).getType()) |
|
142 |
.setChild(false); |
|
143 |
switch (type) { |
|
144 |
case organization: |
|
145 |
oafRel.setOrganizationOrganization(OrganizationOrganization.newBuilder().setDedup( |
|
146 |
DedupUtils.dedup(relClass, "dnet:organization_organization_relations"))); |
|
147 |
break; |
|
148 |
case person: |
|
149 |
oafRel.setPersonPerson(PersonPerson.newBuilder().setDedup(DedupUtils.dedup(relClass, "dnet:person_person_relations"))); |
|
150 |
break; |
|
151 |
case publication: |
|
152 |
oafRel.setPublicationPublication(PublicationPublication.newBuilder().setDedup(DedupUtils.dedup(relClass, "dnet:publication_publication_relations"))); |
|
153 |
break; |
|
154 |
case dataset: |
|
155 |
oafRel.setDatasetDataset(DatasetDataset.newBuilder().setDedup(DedupUtils.dedup(relClass, "dnet:dataset_dataset_relations"))); |
|
156 |
break; |
|
157 |
default: |
|
158 |
throw new IllegalArgumentException("Deduplication not supported for entity type: " + dedupConf.getWf().getEntityType()); |
|
159 |
} |
|
160 |
return oafRel; |
|
161 |
} |
|
162 |
|
|
163 |
private static Dedup.Builder dedup(final Dedup.RelName relClass, final String scheme) { |
|
164 |
return Dedup.newBuilder().setRelMetadata( |
|
165 |
RelMetadata.newBuilder().setSemantics( |
|
166 |
Qualifier.newBuilder().setClassid(relClass.toString()).setClassname(relClass.toString()).setSchemeid(scheme).setSchemename(scheme))); |
|
167 |
} |
|
168 |
|
|
169 |
class ColumnFamily { |
|
170 |
|
|
171 |
private final RelType relType; |
|
172 |
private final SubRelType subRelType; |
|
173 |
|
|
174 |
public ColumnFamily(final RelType relType, final SubRelType subRelType) { |
|
175 |
this.relType = relType; |
|
176 |
this.subRelType = subRelType; |
|
177 |
} |
|
178 |
|
|
179 |
@Override |
|
180 |
public String toString() { |
|
181 |
return getRelType() + CF_SEPARATOR + getSubRelType(); |
|
182 |
} |
|
183 |
|
|
184 |
public RelType getRelType() { |
|
185 |
return relType; |
|
186 |
} |
|
187 |
|
|
188 |
public SubRelType getSubRelType() { |
|
189 |
return subRelType; |
|
190 |
} |
|
191 |
|
|
192 |
} |
|
193 |
|
|
194 |
} |
modules/dnet-mapreduce-jobs/trunk/src/test/java/eu/dnetlib/data/transform/HBaseReadTest.java | ||
---|---|---|
1 |
package eu.dnetlib.data.transform; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
import java.util.concurrent.atomic.AtomicInteger; |
|
6 |
|
|
7 |
import eu.dnetlib.data.graph.model.DNGFDecoder; |
|
8 |
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset; |
|
9 |
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset.GeoLocation; |
|
10 |
import org.apache.commons.io.IOUtils; |
|
11 |
import org.apache.commons.lang3.StringUtils; |
|
12 |
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; |
|
13 |
import org.apache.hadoop.conf.Configuration; |
|
14 |
import org.apache.hadoop.hbase.client.*; |
|
15 |
import org.apache.hadoop.hbase.util.Bytes; |
|
16 |
import org.junit.*; |
|
17 |
import org.springframework.core.io.ClassPathResource; |
|
18 |
import org.springframework.core.io.Resource; |
|
19 |
|
|
20 |
/** |
|
21 |
* Created by claudio on 05/09/16. |
|
22 |
*/ |
|
23 |
public class HBaseReadTest { |
|
24 |
|
|
25 |
private static final String TABLE_NAME = "db_wds"; |
|
26 |
private Resource confIn = new ClassPathResource("eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties"); |
|
27 |
|
|
28 |
private HTable table; |
|
29 |
|
|
30 |
@Before |
|
31 |
public void setUp() throws IOException { |
|
32 |
|
|
33 |
final Configuration conf = new Configuration(); |
|
34 |
|
|
35 |
for(final String line : IOUtils.readLines(confIn.getInputStream())) { |
|
36 |
System.out.println("line = " + line); |
|
37 |
if (!line.trim().isEmpty() && !line.startsWith("#")) { |
|
38 |
final String[] split = line.split("="); |
|
39 |
conf.set(split[0].trim(), split[1].trim()); |
|
40 |
} |
|
41 |
} |
|
42 |
|
|
43 |
table = new HTable(conf, Bytes.toBytes(TABLE_NAME)); |
|
44 |
} |
|
45 |
|
|
46 |
@Ignore |
|
47 |
@Test |
|
48 |
public void testReadGeoLocations() throws IOException { |
|
49 |
|
|
50 |
final Scan scan = new Scan(); |
|
51 |
scan.addColumn(Bytes.toBytes("dataset"), Bytes.toBytes("body")); |
|
52 |
|
|
53 |
final ResultScanner rs = table.getScanner(scan); |
|
54 |
|
|
55 |
System.out.println("start iteration"); |
|
56 |
|
|
57 |
final DescriptiveStatistics statN = new DescriptiveStatistics(); |
|
58 |
final AtomicInteger invalid = new AtomicInteger(0); |
|
59 |
rs.forEach(r -> { |
|
60 |
final byte[] b = r.getValue(Bytes.toBytes("dataset"), Bytes.toBytes("body")); |
|
61 |
final DNGFDecoder d = DNGFDecoder.decode(b, WdsDataset.geolocation); |
|
62 |
final List<GeoLocation> geoList = d.getDNGF().getEntity().getDataset().getMetadata().getExtension(WdsDataset.geolocation); |
|
63 |
geoList.forEach(g -> g.getBoxList().forEach(box -> { |
|
64 |
if (StringUtils.isNotBlank(box)) { |
|
65 |
final String[] split = box.trim().split(" "); |
|
66 |
try { |
|
67 |
statN.addValue(split.length); |
|
68 |
Assert.assertTrue("bad number of coordinates", split.length == 4); |
|
69 |
|
|
70 |
// Rect(minX=-180.0,maxX=180.0,minY=-90.0,maxY=90.0) |
|
71 |
|
|
72 |
Assert.assertTrue("minX=-180", Double.parseDouble(split[1]) >= -180.0); |
|
73 |
Assert.assertTrue("maxX= 180", Double.parseDouble(split[3]) <= 180.0); |
|
74 |
Assert.assertTrue("minY= -90", Double.parseDouble(split[0]) >= -90.0); |
|
75 |
Assert.assertTrue("maxY= 90", Double.parseDouble(split[2]) <= 90.0); |
|
76 |
|
|
77 |
//maxY must be >= minY: 90.0 to -90.0 |
|
78 |
Assert.assertTrue("maxY must be >= minY", Double.parseDouble(split[2]) >= Double.parseDouble(split[0])); |
|
79 |
|
|
80 |
//maxY must be >= minY: 90.0 to -90.0 |
|
81 |
Assert.assertTrue("maxX must be >= minX", Double.parseDouble(split[3]) >= Double.parseDouble(split[1])); |
|
82 |
} catch (AssertionError e) { |
|
83 |
invalid.set(invalid.get() + 1); |
|
84 |
//System.err.println(String.format("document %s has %s coordinates: %s", d.getDNGF().getEntity().getId(), split.length, e.getMessage())); |
|
85 |
//throw e; |
|
86 |
} |
|
87 |
} |
|
88 |
})); |
|
89 |
}); |
|
90 |
|
|
91 |
rs.close(); |
|
92 |
|
|
93 |
System.out.println(String.format("stat N: %s", statN)); |
|
94 |
System.out.println(String.format("invalid N: %s", invalid.get())); |
|
95 |
} |
|
96 |
|
|
97 |
|
|
98 |
@After |
|
99 |
public void tearDown() throws IOException { |
|
100 |
table.close(); |
|
101 |
} |
|
102 |
} |
modules/dnet-mapreduce-jobs/trunk/src/test/resources/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties | ||
---|---|---|
1 |
dnet.clustername = DM |
|
2 |
|
|
3 |
#CORE-SITE |
|
4 |
fs.defaultFS = hdfs://nmis-hadoop-cluster |
|
5 |
|
|
6 |
hadoop.security.authentication = simple |
|
7 |
hadoop.security.auth_to_local = DEFAULT |
|
8 |
|
|
9 |
hadoop.rpc.socket.factory.class.default = org.apache.hadoop.net.StandardSocketFactory |
|
10 |
|
|
11 |
#HBASE-SITE |
|
12 |
hbase.rootdir = hdfs://nmis-hadoop-cluster/hbase |
|
13 |
|
|
14 |
hbase.security.authentication = simple |
|
15 |
zookeeper.znode.rootserver = root-region-server |
|
16 |
hbase.zookeeper.quorum = quorum1.t.hadoop.research-infrastructures.eu,quorum2.t.hadoop.research-infrastructures.eu,quorum3.t.hadoop.research-infrastructures.eu,quorum4.t.hadoop.research-infrastructures.eu,jobtracker.t.hadoop.research-infrastructures.eu |
|
17 |
hbase.zookeeper.property.clientPort = 2182 |
|
18 |
hbase.zookeeper.client.port = 2182 |
|
19 |
zookeeper.znode.parent = /hbase |
|
20 |
|
|
21 |
#HDFS-SITE |
|
22 |
dfs.replication = 2 |
|
23 |
dfs.nameservices = nmis-hadoop-cluster |
|
24 |
dfs.ha.namenodes.nmis-hadoop-cluster = nn1,nn2 |
|
25 |
|
|
26 |
dfs.namenode.rpc-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:8020 |
|
27 |
dfs.namenode.http-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:50070 |
|
28 |
dfs.namenode.rpc-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:8020 |
|
29 |
dfs.namenode.http-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:50070 |
|
30 |
|
|
31 |
dfs.client.failover.proxy.provider.nmis-hadoop-cluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider |
|
32 |
|
|
33 |
#MAPRED-SITE |
|
34 |
mapred.job.tracker = nmis-hadoop-jt |
|
35 |
mapred.jobtrackers.nmis-hadoop-jt = jt1,jt2 |
|
36 |
|
|
37 |
mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt1 = jobtracker.t.hadoop.research-infrastructures.eu:8021 |
|
38 |
mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt2 = quorum4.t.hadoop.research-infrastructures.eu:8022 |
|
39 |
|
|
40 |
mapred.client.failover.proxy.provider.nmis-hadoop-jt = org.apache.hadoop.mapred.ConfiguredFailoverProxyProvider |
|
41 |
|
|
42 |
mapred.mapper.new-api = true |
|
43 |
mapred.reducer.new-api = true |
|
44 |
|
|
45 |
#OOZIE SERVER |
|
46 |
oozie.service.loc = http://oozie.t.hadoop.research-infrastructures.eu:11000/oozie |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/experiment/PublicationAnalysisMapper.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.util.List; |
|
5 |
// |
|
6 |
//import eu.dnetlib.data.mapreduce.util.DNGFDecoder; |
|
7 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
8 |
//import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
|
9 |
//import eu.dnetlib.data.proto.PublicationProtos.Publication; |
|
10 |
//import org.apache.commons.lang3.StringUtils; |
|
11 |
//import org.apache.hadoop.hbase.client.Result; |
|
12 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
13 |
//import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
14 |
//import org.apache.hadoop.io.NullWritable; |
|
15 |
// |
|
16 |
///** |
|
17 |
// * Created by claudio on 22/04/16. |
|
18 |
// */ |
|
19 |
//public class PublicationAnalysisMapper extends TableMapper<NullWritable, NullWritable> { |
|
20 |
// |
|
21 |
// public static final String RESULT = "result"; |
|
22 |
// private static final int MAX_DESCRIPTIONS = 50; |
|
23 |
// |
|
24 |
// @Override |
|
25 |
// protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
// super.setup(context); |
|
27 |
// } |
|
28 |
// |
|
29 |
// @Override |
|
30 |
// protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
31 |
// |
|
32 |
// if (new String(key.copyBytes()).contains("dedup_wf")) { |
|
33 |
// context.getCounter(RESULT, "roots").increment(1); |
|
34 |
// return; |
|
35 |
// } |
|
36 |
// |
|
37 |
// final byte[] body = value.getValue(RESULT.getBytes(), DedupUtils.BODY_B); |
|
38 |
// if (body == null) { |
|
39 |
// context.getCounter(RESULT, "missing body").increment(1); |
|
40 |
// return; |
|
41 |
// } |
|
42 |
// final DNGFDecoder decoder = DNGFDecoder.decode(body); |
|
43 |
// final Publication publication = decoder.getEntity().getPublication(); |
|
44 |
// if (publication.getMetadata().getResulttype().getClassid().equals("dataset")) { |
|
45 |
// context.getCounter(RESULT, "dataset").increment(1); |
|
46 |
// return; |
|
47 |
// } else { |
|
48 |
// context.getCounter(RESULT, "publication").increment(1); |
|
49 |
// } |
|
50 |
// |
|
51 |
// if (publication.getMetadata().getDescriptionCount() > MAX_DESCRIPTIONS) { |
|
52 |
// context.getCounter(RESULT, "abstracts > " + MAX_DESCRIPTIONS).increment(1); |
|
53 |
// } else { |
|
54 |
// context.getCounter(RESULT, "abstracts: " + publication.getMetadata().getDescriptionCount()).increment(1); |
|
55 |
// } |
|
56 |
// |
|
57 |
// final List<StringField> descList = publication.getMetadata().getDescriptionList(); |
|
58 |
// |
|
59 |
// boolean empty = true; |
|
60 |
// for(StringField desc : descList) { |
|
61 |
// empty = empty && StringUtils.isBlank(desc.getValue()); |
|
62 |
// } |
|
63 |
// |
|
64 |
// context.getCounter(RESULT, "empty abstract: " + empty).increment(1); |
|
65 |
// } |
|
66 |
// |
|
67 |
// @Override |
|
68 |
// protected void cleanup(final Context context) throws IOException, InterruptedException { |
|
69 |
// super.cleanup(context); |
|
70 |
// } |
|
71 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
|
|
6 |
import eu.dnetlib.data.graph.model.DNGFDecoder; |
|
7 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
8 |
import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
|
9 |
import eu.dnetlib.data.proto.PublicationProtos.Publication; |
|
10 |
import org.apache.commons.lang3.StringUtils; |
|
11 |
import org.apache.hadoop.hbase.client.Result; |
|
12 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
13 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
14 |
import org.apache.hadoop.io.NullWritable; |
|
15 |
|
|
16 |
/** |
|
17 |
* Created by claudio on 22/04/16. |
|
18 |
*/ |
|
19 |
public class PublicationAnalysisMapper extends TableMapper<NullWritable, NullWritable> { |
|
20 |
|
|
21 |
public static final String RESULT = "result"; |
|
22 |
private static final int MAX_DESCRIPTIONS = 50; |
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
super.setup(context); |
|
27 |
} |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
|
31 |
|
|
32 |
if (new String(key.copyBytes()).contains("dedup_wf")) { |
|
33 |
context.getCounter(RESULT, "roots").increment(1); |
|
34 |
return; |
|
35 |
} |
|
36 |
|
|
37 |
final byte[] body = value.getValue(RESULT.getBytes(), HBaseTableDAO.cfMetadataByte()); |
|
38 |
if (body == null) { |
|
39 |
context.getCounter(RESULT, "missing body").increment(1); |
|
40 |
return; |
|
41 |
} |
|
42 |
final DNGFDecoder decoder = DNGFDecoder.decode(body); |
|
43 |
final Publication publication = decoder.getEntity().getPublication(); |
|
44 |
if (publication.getMetadata().getResulttype().getClassid().equals("dataset")) { |
|
45 |
context.getCounter(RESULT, "dataset").increment(1); |
|
46 |
return; |
|
47 |
} else { |
|
48 |
context.getCounter(RESULT, "publication").increment(1); |
|
49 |
} |
|
50 |
|
|
51 |
if (publication.getMetadata().getDescriptionCount() > MAX_DESCRIPTIONS) { |
|
52 |
context.getCounter(RESULT, "abstracts > " + MAX_DESCRIPTIONS).increment(1); |
|
53 |
} else { |
|
54 |
context.getCounter(RESULT, "abstracts: " + publication.getMetadata().getDescriptionCount()).increment(1); |
|
55 |
} |
|
56 |
|
|
57 |
final List<StringField> descList = publication.getMetadata().getDescriptionList(); |
|
58 |
|
|
59 |
boolean empty = true; |
|
60 |
for(StringField desc : descList) { |
|
61 |
empty = empty && StringUtils.isBlank(desc.getValue()); |
|
62 |
} |
|
63 |
|
|
64 |
context.getCounter(RESULT, "empty abstract: " + empty).increment(1); |
|
65 |
} |
|
66 |
|
|
67 |
@Override |
|
68 |
protected void cleanup(final Context context) throws IOException, InterruptedException { |
|
69 |
super.cleanup(context); |
|
70 |
} |
|
71 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMarkDeletedEntityMapper.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.util.Map; |
|
5 |
// |
|
6 |
//import eu.dnetlib.data.mapreduce.JobParams; |
|
7 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
8 |
//import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
|
9 |
//import eu.dnetlib.data.proto.DNGFProtos.DNGFRel.Builder; |
|
10 |
//import eu.dnetlib.data.proto.KindProtos.Kind; |
|
11 |
//import eu.dnetlib.data.proto.SubRelProtos.Dedup; |
|
12 |
//import eu.dnetlib.data.proto.TypeProtos.Type; |
|
13 |
//import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
14 |
//import eu.dnetlib.pace.config.DedupConfig; |
|
15 |
//import org.apache.commons.logging.Log; |
|
16 |
//import org.apache.commons.logging.LogFactory; |
|
17 |
//import org.apache.hadoop.hbase.client.Put; |
|
18 |
//import org.apache.hadoop.hbase.client.Result; |
|
19 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
20 |
//import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
21 |
//import org.apache.hadoop.hbase.util.Bytes; |
|
22 |
// |
|
23 |
//public class DedupMarkDeletedEntityMapper extends TableMapper<ImmutableBytesWritable, Put> { |
|
24 |
// |
|
25 |
// private static final Log log = LogFactory.getLog(DedupMarkDeletedEntityMapper.class); |
|
26 |
// |
|
27 |
// private DedupConfig dedupConf; |
|
28 |
// |
|
29 |
// @Override |
|
30 |
// protected void setup(final Context context) throws IOException, InterruptedException { |
|
31 |
// dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
32 |
// log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
33 |
// } |
|
34 |
// |
|
35 |
// @Override |
|
36 |
// protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
37 |
// // System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
38 |
// |
|
39 |
// final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
40 |
// final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(type)); |
|
41 |
// |
|
42 |
// if ((mergedIn != null) && !mergedIn.isEmpty()) { |
|
43 |
// |
|
44 |
// final byte[] row = rowkey.copyBytes(); |
|
45 |
// |
|
46 |
// // marks the original body deleted |
|
47 |
// emitBody(context, row, value.getValue(Bytes.toBytes(type.toString()), DedupUtils.BODY_B)); |
|
48 |
// |
|
49 |
// } else { |
|
50 |
// context.getCounter(type.toString(), "row not merged").increment(1); |
|
51 |
// } |
|
52 |
// } |
|
53 |
// |
|
54 |
// |
|
55 |
// private void emitBody(final Context context, final byte[] row, final byte[] body) throws IOException, InterruptedException { |
|
56 |
// final String type = dedupConf.getWf().getEntityType(); |
|
57 |
// if (body == null) { |
|
58 |
// context.getCounter(type, "missing body").increment(1); |
|
59 |
// System.err.println("missing body: " + new String(row)); |
|
60 |
// return; |
|
61 |
// } |
|
62 |
// final DNGF prototype = DNGF.parseFrom(body); |
|
63 |
// |
|
64 |
// if (prototype.getDataInfo().getDeletedbyinference()) { |
|
65 |
// context.getCounter(type, "bodies already deleted").increment(1); |
|
66 |
// } else { |
|
67 |
// final DNGF.Builder oafRoot = DNGF.newBuilder(prototype); |
|
68 |
// oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId()); |
|
69 |
// final byte[] family = Bytes.toBytes(type); |
|
70 |
// final Put put = new Put(row).add(family, DedupUtils.BODY_B, oafRoot.build().toByteArray()); |
|
71 |
// put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
72 |
// context.write(new ImmutableBytesWritable(row), put); |
|
73 |
// context.getCounter(type, "bodies marked deleted").increment(1); |
|
74 |
// } |
|
75 |
// } |
|
76 |
// |
|
77 |
// private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) { |
|
78 |
// final Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass); |
|
79 |
// final DNGF oaf = |
|
80 |
// DNGF.newBuilder() |
|
81 |
// .setKind(Kind.relation) |
|
82 |
// .setLastupdatetimestamp(System.currentTimeMillis()) |
|
83 |
// .setDataInfo( |
|
84 |
// AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance( |
|
85 |
// dedupConf.getWf().getConfigurationId())).setRel(oafRel) |
|
86 |
// .build(); |
|
87 |
// return oaf.toByteArray(); |
|
88 |
// } |
|
89 |
// |
|
90 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
7 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
8 |
import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
|
9 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
10 |
import eu.dnetlib.pace.config.DedupConfig; |
|
11 |
import org.apache.commons.logging.Log; |
|
12 |
import org.apache.commons.logging.LogFactory; |
|
13 |
import org.apache.hadoop.hbase.client.Put; |
|
14 |
import org.apache.hadoop.hbase.client.Result; |
|
15 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
16 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
17 |
import org.apache.hadoop.hbase.util.Bytes; |
|
18 |
|
|
19 |
public class DedupMarkDeletedEntityMapper extends TableMapper<ImmutableBytesWritable, Put> { |
|
20 |
|
|
21 |
private static final Log log = LogFactory.getLog(DedupMarkDeletedEntityMapper.class); |
|
22 |
|
|
23 |
private DedupConfig dedupConf; |
|
24 |
|
|
25 |
@Override |
|
26 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
27 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
28 |
log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
29 |
} |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
33 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
34 |
|
|
35 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
36 |
final Map<byte[], byte[]> mergedIn = value.getFamilyMap(HBaseTableDAO.getDedupQualifier_mergedInBytes(type)); |
|
37 |
|
|
38 |
if ((mergedIn != null) && !mergedIn.isEmpty()) { |
|
39 |
|
|
40 |
final byte[] row = rowkey.copyBytes(); |
|
41 |
|
|
42 |
// marks the original body deleted |
|
43 |
emitBody(context, row, value.getValue(Bytes.toBytes(type.toString()), HBaseTableDAO.cfMetadataByte())); |
|
44 |
|
|
45 |
} else { |
|
46 |
context.getCounter(type.toString(), "row not merged").increment(1); |
|
47 |
} |
|
48 |
} |
|
49 |
|
|
50 |
|
|
51 |
private void emitBody(final Context context, final byte[] row, final byte[] body) throws IOException, InterruptedException { |
|
52 |
final String type = dedupConf.getWf().getEntityType(); |
|
53 |
if (body == null) { |
|
54 |
context.getCounter(type, "missing body").increment(1); |
|
55 |
System.err.println("missing body: " + new String(row)); |
|
56 |
return; |
|
57 |
} |
|
58 |
final DNGF prototype = DNGF.parseFrom(body); |
|
59 |
|
|
60 |
if (prototype.getDataInfo().getDeletedbyinference()) { |
|
61 |
context.getCounter(type, "bodies already deleted").increment(1); |
|
62 |
} else { |
|
63 |
final DNGF.Builder oafRoot = DNGF.newBuilder(prototype); |
|
64 |
oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId()); |
|
65 |
final byte[] family = Bytes.toBytes(type); |
|
66 |
final Put put = new Put(row).add(family, HBaseTableDAO.cfMetadataByte(), oafRoot.build().toByteArray()); |
|
67 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
68 |
context.write(new ImmutableBytesWritable(row), put); |
|
69 |
context.getCounter(type, "bodies marked deleted").increment(1); |
|
70 |
} |
|
71 |
} |
|
72 |
|
|
73 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/UpdateMerger.java | ||
---|---|---|
6 | 6 |
import java.util.Map.Entry; |
7 | 7 |
|
8 | 8 |
import com.google.common.collect.Maps; |
9 |
import eu.dnetlib.data.graph.model.DNGFDecoder; |
|
10 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
9 | 11 |
import eu.dnetlib.data.proto.WdsDatasetProtos.WdsDataset; |
10 | 12 |
import org.apache.hadoop.hbase.util.Bytes; |
11 | 13 |
import org.apache.hadoop.mapreduce.Mapper.Context; |
... | ... | |
39 | 41 |
private static DNGF doMerge(final Context context, final Map<String, byte[]> map) |
40 | 42 |
throws InvalidProtocolBufferException { |
41 | 43 |
|
42 |
final byte[] value = map.get(DedupUtils.BODY_S);
|
|
44 |
final byte[] value = map.get(HBaseTableDAO.cfMetadata());
|
|
43 | 45 |
if (value == null) return null; |
44 | 46 |
|
45 | 47 |
DNGF.Builder builder = DNGF.newBuilder(DNGFDecoder.decode(value, WdsDataset.geolocation).getDNGF()); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/AbstractRecordFactory.java | ||
---|---|---|
10 | 10 |
import eu.dnetlib.data.mapreduce.hbase.index.config.ContextMapper; |
11 | 11 |
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable; |
12 | 12 |
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor; |
13 |
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses; |
|
14 | 13 |
import eu.dnetlib.data.proto.DNGFProtos; |
15 | 14 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
16 | 15 |
import eu.dnetlib.data.proto.PublicationProtos; |
... | ... | |
34 | 33 |
protected List<DNGFDecoder> children = Lists.newLinkedList(); |
35 | 34 |
protected EntityConfigTable entityConfigTable; |
36 | 35 |
protected ContextMapper contextMapper; |
37 |
protected RelClasses relClasses; |
|
36 |
|
|
38 | 37 |
protected Map<String, Integer> counters = Maps.newHashMap(); |
39 | 38 |
protected boolean entityDefaults; |
40 | 39 |
protected boolean relDefaults; |
... | ... | |
49 | 48 |
this.relDefaults = relDefaults; |
50 | 49 |
this.childDefaults = childDefaults; |
51 | 50 |
this.ontologies = ontologies; |
52 |
|
|
53 | 51 |
} |
54 | 52 |
|
55 | 53 |
public Map<String, Integer> getRelCounters() { |
56 | 54 |
return relCounters; |
57 | 55 |
} |
58 | 56 |
|
59 |
public RelClasses getRelClasses() { |
|
60 |
return relClasses; |
|
61 |
} |
|
62 |
|
|
63 | 57 |
public void setMainEntity(final DNGFDecoder mainEntity) { |
64 | 58 |
this.mainEntity = mainEntity; |
65 | 59 |
this.key = mainEntity.decodeEntity().getId(); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupDeleteSimRelMapper.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.util.Map; |
|
5 |
// |
|
6 |
//import org.apache.hadoop.hbase.client.Delete; |
|
7 |
//import org.apache.hadoop.hbase.client.Result; |
|
8 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
9 |
//import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
10 |
//import org.apache.hadoop.io.Writable; |
|
11 |
// |
|
12 |
//import eu.dnetlib.data.mapreduce.JobParams; |
|
13 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
14 |
//import eu.dnetlib.data.proto.TypeProtos.Type; |
|
15 |
//import eu.dnetlib.pace.config.DedupConfig; |
|
16 |
// |
|
17 |
//public class DedupDeleteSimRelMapper extends TableMapper<ImmutableBytesWritable, Writable> { |
|
18 |
// |
|
19 |
// private DedupConfig dedupConf; |
|
20 |
// |
|
21 |
// private ImmutableBytesWritable outKey; |
|
22 |
// |
|
23 |
// @Override |
|
24 |
// protected void setup(final Context context) throws IOException, InterruptedException { |
|
25 |
// dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
26 |
// System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
27 |
// |
|
28 |
// outKey = new ImmutableBytesWritable(); |
|
29 |
// } |
|
30 |
// |
|
31 |
// @Override |
|
32 |
// protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
33 |
// // System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
34 |
// |
|
35 |
// final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
36 |
// |
|
37 |
// final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type)); |
|
38 |
// |
|
39 |
// if ((similarRels != null) && !similarRels.isEmpty()) { |
|
40 |
// |
|
41 |
// final byte[] row = rowkey.copyBytes(); |
|
42 |
// final Delete delete = new Delete(row); |
|
43 |
// |
|
44 |
// for (final byte[] q : similarRels.keySet()) { |
|
45 |
// delete.deleteColumns(DedupUtils.getSimilarityCFBytes(type), q); |
|
46 |
// } |
|
47 |
// |
|
48 |
// outKey.set(row); |
|
49 |
// context.write(outKey, delete); |
|
50 |
// context.getCounter(dedupConf.getWf().getEntityType(), "similarity deleted").increment(similarRels.size()); |
|
51 |
// |
|
52 |
// } else { |
|
53 |
// context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1); |
|
54 |
// } |
|
55 |
// } |
|
56 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
7 |
import org.apache.hadoop.hbase.client.Delete; |
|
8 |
import org.apache.hadoop.hbase.client.Result; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
11 |
import org.apache.hadoop.io.Writable; |
|
12 |
|
|
13 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
14 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
15 |
import eu.dnetlib.pace.config.DedupConfig; |
|
16 |
|
|
17 |
public class DedupDeleteSimRelMapper extends TableMapper<ImmutableBytesWritable, Writable> { |
|
18 |
|
|
19 |
private DedupConfig dedupConf; |
|
20 |
|
|
21 |
private ImmutableBytesWritable outKey; |
|
22 |
|
|
23 |
@Override |
|
24 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
25 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
26 |
System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
27 |
|
|
28 |
outKey = new ImmutableBytesWritable(); |
|
29 |
} |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
|
33 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
|
34 |
|
|
35 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
36 |
|
|
37 |
final Map<byte[], byte[]> similarRels = value.getFamilyMap(HBaseTableDAO.getSimilarityQualifierBytes(type)); |
|
38 |
|
|
39 |
if ((similarRels != null) && !similarRels.isEmpty()) { |
|
40 |
|
|
41 |
final byte[] row = rowkey.copyBytes(); |
|
42 |
final Delete delete = new Delete(row); |
|
43 |
|
|
44 |
for (final byte[] q : similarRels.keySet()) { |
|
45 |
delete.deleteColumns(HBaseTableDAO.getSimilarityQualifierBytes(type), q); |
|
46 |
} |
|
47 |
|
|
48 |
outKey.set(row); |
|
49 |
context.write(outKey, delete); |
|
50 |
context.getCounter(dedupConf.getWf().getEntityType(), "similarity deleted").increment(similarRels.size()); |
|
51 |
|
|
52 |
} else { |
|
53 |
context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1); |
|
54 |
} |
|
55 |
} |
|
56 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.nio.ByteBuffer; |
|
5 |
//import java.util.Set; |
|
6 |
// |
|
7 |
//import com.google.common.collect.Sets; |
|
8 |
//import eu.dnetlib.data.mapreduce.JobParams; |
|
9 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
10 |
//import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
|
11 |
//import eu.dnetlib.data.proto.DNGFProtos.DNGFRel; |
|
12 |
//import eu.dnetlib.data.proto.KindProtos.Kind; |
|
13 |
//import eu.dnetlib.data.proto.SubRelProtos.Dedup; |
|
14 |
//import eu.dnetlib.data.proto.TypeProtos.Type; |
|
15 |
//import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
16 |
//import eu.dnetlib.pace.config.DedupConfig; |
|
17 |
//import org.apache.commons.logging.Log; |
|
18 |
//import org.apache.commons.logging.LogFactory; |
|
19 |
//import org.apache.hadoop.hbase.client.Put; |
|
20 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
21 |
//import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
22 |
//import org.apache.hadoop.hbase.util.Bytes; |
|
23 |
//import org.apache.hadoop.io.Text; |
|
24 |
// |
|
25 |
///** |
|
26 |
// * Created by claudio on 15/10/15. |
|
27 |
// */ |
|
28 |
//public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> { |
|
29 |
// |
|
30 |
// private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class); |
|
31 |
// |
|
32 |
// private DedupConfig dedupConf; |
|
33 |
// |
|
34 |
// private byte[] cfMergedIn; |
|
35 |
// |
|
36 |
// private byte[] cfMerges; |
|
37 |
// |
|
38 |
// @Override |
|
39 |
// protected void setup(final Context context) { |
|
40 |
// |
|
41 |
// dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
42 |
// log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
43 |
// |
|
44 |
// final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
45 |
// cfMergedIn = DedupUtils.getDedupCF_mergedInBytes(type); |
|
46 |
// cfMerges = DedupUtils.getDedupCF_mergesBytes(type); |
|
47 |
// } |
|
48 |
// |
|
49 |
// @Override |
|
50 |
// protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException { |
|
51 |
// |
|
52 |
// final Set<String> set = Sets.newHashSet(); |
|
53 |
// |
|
54 |
// for(VertexWritable v : values) { |
|
55 |
// for(Text t : v.getEdges()) { |
|
56 |
// set.add(t.toString()); |
|
57 |
// } |
|
58 |
// } |
|
59 |
// |
|
60 |
// final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun()); |
|
61 |
// |
|
62 |
// for(String q : set) { |
|
63 |
// final byte[] qb = Bytes.toBytes(q); |
|
64 |
// emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn)); |
|
65 |
// emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges)); |
|
66 |
// |
|
67 |
// context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1); |
|
68 |
// } |
|
69 |
// |
|
70 |
// } |
|
71 |
// |
|
72 |
// private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException, |
|
73 |
// InterruptedException { |
|
74 |
// final Put put = new Put(from).add(cf, to, value); |
|
75 |
// put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
76 |
// context.write(new ImmutableBytesWritable(from), put); |
|
77 |
// } |
|
78 |
// |
|
79 |
// private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) { |
|
80 |
// final DNGFRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass); |
|
81 |
// final DNGF oaf = |
|
82 |
// DNGF.newBuilder() |
|
83 |
// .setKind(Kind.relation) |
|
84 |
// .setLastupdatetimestamp(System.currentTimeMillis()) |
|
85 |
// .setDataInfo( |
|
86 |
// AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance( |
|
87 |
// dedupConf.getWf().getConfigurationId())).setRel(oafRel) |
|
88 |
// .build(); |
|
89 |
// return oaf.toByteArray(); |
|
90 |
// } |
|
91 |
// |
|
92 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.cc; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.ByteBuffer; |
|
5 |
import java.util.Set; |
|
6 |
|
|
7 |
import com.google.common.collect.Sets; |
|
8 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
9 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
10 |
import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
|
11 |
import eu.dnetlib.data.proto.DNGFProtos.DNGFRel; |
|
12 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
13 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
14 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
15 |
import eu.dnetlib.pace.config.DedupConfig; |
|
16 |
import org.apache.commons.logging.Log; |
|
17 |
import org.apache.commons.logging.LogFactory; |
|
18 |
import org.apache.hadoop.hbase.client.Put; |
|
19 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
20 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
21 |
import org.apache.hadoop.hbase.util.Bytes; |
|
22 |
import org.apache.hadoop.io.Text; |
|
23 |
|
|
24 |
/** |
|
25 |
* Created by claudio on 15/10/15. |
|
26 |
*/ |
|
27 |
public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> { |
|
28 |
|
|
29 |
private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class); |
|
30 |
|
|
31 |
private DedupConfig dedupConf; |
|
32 |
|
|
33 |
private byte[] cfMergedIn; |
|
34 |
|
|
35 |
private byte[] cfMerges; |
|
36 |
|
|
37 |
@Override |
|
38 |
protected void setup(final Context context) { |
|
39 |
|
|
40 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
41 |
log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString()); |
|
42 |
|
|
43 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
44 |
cfMergedIn = HBaseTableDAO.getDedupQualifier_mergedInBytes(type); |
|
45 |
cfMerges = HBaseTableDAO.getDedupQualifier_mergesBytes(type); |
|
46 |
} |
|
47 |
|
|
48 |
@Override |
|
49 |
protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException { |
|
50 |
|
|
51 |
final Set<String> set = Sets.newHashSet(); |
|
52 |
|
|
53 |
for(VertexWritable v : values) { |
|
54 |
for(Text t : v.getEdges()) { |
|
55 |
set.add(t.toString()); |
|
56 |
} |
|
57 |
} |
|
58 |
|
|
59 |
final byte[] root = HBaseTableDAO.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun()); |
|
60 |
|
|
61 |
for(String q : set) { |
|
62 |
final byte[] qb = Bytes.toBytes(q); |
|
63 |
emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, "isMergedIn")); |
|
64 |
emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, "merges")); |
|
65 |
|
|
66 |
context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1); |
|
67 |
} |
|
68 |
|
|
69 |
} |
|
70 |
|
|
71 |
private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException, |
|
72 |
InterruptedException { |
|
73 |
final Put put = new Put(from).add(cf, to, value); |
|
74 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
75 |
context.write(new ImmutableBytesWritable(from), put); |
|
76 |
} |
|
77 |
|
|
78 |
private byte[] buildRel(final byte[] from, final byte[] to, final String relClass) { |
|
79 |
final DNGFRel.Builder oafRel = HBaseTableDAO.getDedup(new String(from), new String(to), relClass); |
|
80 |
final DNGF oaf = |
|
81 |
DNGF.newBuilder() |
|
82 |
.setKind(Kind.relation) |
|
83 |
.setLastupdatetimestamp(System.currentTimeMillis()) |
|
84 |
.setDataInfo( |
|
85 |
AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance( |
|
86 |
dedupConf.getWf().getConfigurationId())).setRel(oafRel) |
|
87 |
.build(); |
|
88 |
return oaf.toByteArray(); |
|
89 |
} |
|
90 |
|
|
91 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.util.Collection; |
|
5 |
//import java.util.List; |
|
6 |
//import java.util.Map; |
|
7 |
// |
|
8 |
//import com.google.common.collect.Maps; |
|
9 |
//import eu.dnetlib.data.mapreduce.JobParams; |
|
10 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
11 |
//import eu.dnetlib.data.mapreduce.util.DNGFDecoder; |
|
12 |
//import eu.dnetlib.data.proto.DNGFProtos.DNGFEntity; |
|
13 |
//import eu.dnetlib.data.proto.TypeProtos.Type; |
|
14 |
//import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; |
|
15 |
//import eu.dnetlib.pace.config.DedupConfig; |
|
16 |
//import eu.dnetlib.pace.model.MapDocument; |
|
17 |
//import eu.dnetlib.pace.model.ProtoDocumentBuilder; |
|
18 |
//import org.apache.commons.logging.Log; |
|
19 |
//import org.apache.commons.logging.LogFactory; |
|
20 |
//import org.apache.hadoop.hbase.client.Result; |
|
21 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
22 |
//import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
23 |
//import org.apache.hadoop.hbase.util.Bytes; |
|
24 |
//import org.apache.hadoop.io.Text; |
|
25 |
// |
|
26 |
//public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> { |
|
27 |
// |
|
28 |
// private static final Log log = LogFactory.getLog(DedupMapper.class); |
|
29 |
// |
|
30 |
// private DedupConfig dedupConf; |
|
31 |
// |
|
32 |
// private Map<String, List<String>> blackListMap = Maps.newHashMap(); |
|
33 |
// |
|
34 |
// private Text outKey; |
|
35 |
// |
|
36 |
// private ImmutableBytesWritable ibw; |
|
37 |
// |
|
38 |
// @Override |
|
39 |
// protected void setup(final Context context) throws IOException, InterruptedException { |
|
40 |
// |
|
41 |
// final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF); |
|
42 |
// |
|
43 |
// log.info("pace conf strings"); |
|
44 |
// log.info("pace conf: " + dedupConfJson); |
|
45 |
// |
|
46 |
// dedupConf = DedupConfig.load(dedupConfJson); |
|
47 |
// |
|
48 |
// blackListMap = dedupConf.getPace().getBlacklists(); |
|
49 |
// |
|
50 |
// outKey = new Text(); |
|
51 |
// ibw = new ImmutableBytesWritable(); |
|
52 |
// |
|
53 |
// log.info("pace conf"); |
|
54 |
// log.info("entity type: " + dedupConf.getWf().getEntityType()); |
|
55 |
// log.info("clustering: " + dedupConf.getPace().getClustering()); |
|
56 |
// log.info("conditions: " + dedupConf.getPace().getConditions()); |
|
57 |
// log.info("fields: " + dedupConf.getPace().getModel()); |
|
58 |
// log.info("blacklists: " + blackListMap); |
|
59 |
// log.info("wf conf: " + dedupConf.toString()); |
|
60 |
// } |
|
61 |
// |
|
62 |
// @Override |
|
63 |
// protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException { |
|
64 |
// // log.info("got key: " + new String(keyIn.copyBytes())); |
|
65 |
// |
|
66 |
// final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B); |
|
67 |
// |
|
68 |
// if (body != null) { |
|
69 |
// |
|
70 |
// final DNGFDecoder decoder = DNGFDecoder.decode(body); |
|
71 |
// if (decoder.getDNGF().getDataInfo().getDeletedbyinference()) { |
|
72 |
// context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1); |
|
73 |
// return; |
|
74 |
// } |
|
75 |
// |
|
76 |
// final DNGFEntity entity = decoder.getEntity(); |
|
77 |
// |
|
78 |
// context.getCounter(entity.getType().toString(), "decoded").increment(1); |
|
79 |
// |
|
80 |
// if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) { |
|
81 |
// |
|
82 |
// // GeneratedMessage metadata = DNGFEntityDecoder.decode(entity).getEntity(); |
|
83 |
// final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel()); |
|
84 |
// emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap)); |
|
85 |
// } |
|
86 |
// } else { |
|
87 |
// context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1); |
|
88 |
// } |
|
89 |
// } |
|
90 |
// |
|
91 |
// private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException { |
|
92 |
// for (final String ngram : ngrams) { |
|
93 |
// outKey.set(ngram); |
|
94 |
// ibw.set(doc.toByteArray()); |
|
95 |
// context.write(outKey, ibw); |
|
96 |
// } |
|
97 |
// } |
|
98 |
// |
|
99 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Collection; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
7 |
|
|
8 |
import com.google.common.collect.Maps; |
|
9 |
import eu.dnetlib.data.graph.model.DNGFDecoder; |
|
10 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
11 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
12 |
import eu.dnetlib.data.proto.DNGFProtos.DNGFEntity; |
|
13 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
14 |
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; |
|
15 |
import eu.dnetlib.pace.config.DedupConfig; |
|
16 |
import eu.dnetlib.pace.model.MapDocument; |
|
17 |
import eu.dnetlib.pace.model.ProtoDocumentBuilder; |
|
18 |
import org.apache.commons.logging.Log; |
|
19 |
import org.apache.commons.logging.LogFactory; |
|
20 |
import org.apache.hadoop.hbase.client.Result; |
|
21 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
22 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
23 |
import org.apache.hadoop.hbase.util.Bytes; |
|
24 |
import org.apache.hadoop.io.Text; |
|
25 |
|
|
26 |
public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> { |
|
27 |
|
|
28 |
private static final Log log = LogFactory.getLog(DedupMapper.class); |
|
29 |
|
|
30 |
private DedupConfig dedupConf; |
|
31 |
|
|
32 |
private Map<String, List<String>> blackListMap = Maps.newHashMap(); |
|
33 |
|
|
34 |
private Text outKey; |
|
35 |
|
|
36 |
private ImmutableBytesWritable ibw; |
|
37 |
|
|
38 |
@Override |
|
39 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
40 |
|
|
41 |
final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF); |
|
42 |
|
|
43 |
log.info("pace conf strings"); |
|
44 |
log.info("pace conf: " + dedupConfJson); |
|
45 |
|
|
46 |
dedupConf = DedupConfig.load(dedupConfJson); |
|
47 |
|
|
48 |
blackListMap = dedupConf.getPace().getBlacklists(); |
|
49 |
|
|
50 |
outKey = new Text(); |
|
51 |
ibw = new ImmutableBytesWritable(); |
|
52 |
|
|
53 |
log.info("pace conf"); |
|
54 |
log.info("entity type: " + dedupConf.getWf().getEntityType()); |
|
55 |
log.info("clustering: " + dedupConf.getPace().getClustering()); |
|
56 |
log.info("conditions: " + dedupConf.getPace().getConditions()); |
|
57 |
log.info("fields: " + dedupConf.getPace().getModel()); |
|
58 |
log.info("blacklists: " + blackListMap); |
|
59 |
log.info("wf conf: " + dedupConf.toString()); |
|
60 |
} |
|
61 |
|
|
62 |
@Override |
|
63 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException { |
|
64 |
// log.info("got key: " + new String(keyIn.copyBytes())); |
|
65 |
|
|
66 |
final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), HBaseTableDAO.cfMetadataByte()); |
|
67 |
|
|
68 |
if (body != null) { |
|
69 |
|
|
70 |
final DNGFDecoder decoder = DNGFDecoder.decode(body); |
|
71 |
if (decoder.getDNGF().getDataInfo().getDeletedbyinference()) { |
|
72 |
context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1); |
|
73 |
return; |
|
74 |
} |
|
75 |
|
|
76 |
final DNGFEntity entity = decoder.getEntity(); |
|
77 |
|
|
78 |
context.getCounter(entity.getType().toString(), "decoded").increment(1); |
|
79 |
|
|
80 |
if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) { |
|
81 |
|
|
82 |
// GeneratedMessage metadata = DNGFEntityDecoder.decode(entity).getEntity(); |
|
83 |
final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel()); |
|
84 |
emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap)); |
|
85 |
} |
|
86 |
} else { |
|
87 |
context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1); |
|
88 |
} |
|
89 |
} |
|
90 |
|
|
91 |
private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException { |
|
92 |
for (final String ngram : ngrams) { |
|
93 |
outKey.set(ngram); |
|
94 |
ibw.set(doc.toByteArray()); |
|
95 |
context.write(outKey, ibw); |
|
96 |
} |
|
97 |
} |
|
98 |
|
|
99 |
} |
modules/dnet-mapreduce-jobs/trunk/pom.xml | ||
---|---|---|
314 | 314 |
</dependency> |
315 | 315 |
|
316 | 316 |
<dependency> |
317 |
<groupId>org.apache.commons</groupId> |
|
318 |
<artifactId>commons-lang3</artifactId> |
|
319 |
<version>${commons.lang.version}</version> |
|
320 |
</dependency> |
|
321 |
<dependency> |
|
317 | 322 |
<groupId>com.typesafe</groupId> |
318 | 323 |
<artifactId>config</artifactId> |
319 | 324 |
<version>1.3.0</version> |
... | ... | |
324 | 329 |
<artifactId>mongo-java-driver</artifactId> |
325 | 330 |
<version>${mongodb.driver.version}</version> |
326 | 331 |
</dependency> |
327 |
<dependency>
|
|
332 |
<dependency> |
|
328 | 333 |
<groupId>org.elasticsearch</groupId> |
329 | 334 |
<artifactId>elasticsearch-hadoop-mr</artifactId> |
330 | 335 |
<version>2.0.2</version> |
... | ... | |
335 | 340 |
<version>1.8.5</version> |
336 | 341 |
<scope>test</scope> |
337 | 342 |
</dependency> |
338 |
|
|
339 |
|
|
340 | 343 |
<dependency> |
344 |
<groupId>org.apache.zookeeper</groupId> |
|
345 |
<artifactId>zookeeper</artifactId> |
|
346 |
<version>3.4.5-cdh4.3.0</version> |
|
347 |
<scope>test</scope> |
|
348 |
</dependency> |
|
349 |
<dependency> |
|
341 | 350 |
<groupId>org.json</groupId> |
342 | 351 |
<artifactId>json</artifactId> |
343 | 352 |
<version>20160212</version> |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/SimpleDedupPersonMapper.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
// |
|
5 |
//import org.apache.hadoop.hbase.client.Result; |
|
6 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
7 |
//import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
8 |
//import org.apache.hadoop.io.Text; |
|
9 |
// |
|
10 |
//import eu.dnetlib.data.mapreduce.JobParams; |
|
11 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
12 |
//import eu.dnetlib.data.mapreduce.util.DNGFDecoder; |
|
13 |
//import eu.dnetlib.pace.config.DedupConfig; |
|
14 |
//import eu.dnetlib.pace.model.Person; |
|
15 |
// |
|
16 |
//public class SimpleDedupPersonMapper extends TableMapper<Text, ImmutableBytesWritable> { |
|
17 |
// |
|
18 |
// private DedupConfig dedupConf; |
|
19 |
// |
|
20 |
// private Text rowKey; |
|
21 |
// |
|
22 |
// private ImmutableBytesWritable ibw; |
|
23 |
// |
|
24 |
// @Override |
|
25 |
// protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
// dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
27 |
// rowKey = new Text(); |
|
28 |
// ibw = new ImmutableBytesWritable(); |
|
29 |
// } |
|
30 |
// |
|
31 |
// @Override |
|
32 |
// protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException { |
|
33 |
// // System.out.println("got key: " + new String(keyIn.copyBytes())); |
|
34 |
// |
|
35 |
// if (DedupUtils.isRoot(new String(keyIn.copyBytes()))) { |
|
36 |
// context.getCounter(dedupConf.getWf().getEntityType(), "roots skipped").increment(1); |
|
37 |
// return; |
|
38 |
// } |
|
39 |
// final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B); |
|
40 |
// |
|
41 |
// if (body != null) { |
|
42 |
// try { |
|
43 |
// final DNGFDecoder decoder = DNGFDecoder.decode(body); |
|
44 |
// |
|
45 |
// final String hash = new Person(decoder.getEntity().getPerson().getMetadata().getFullname().getValue(), false).hash(); |
|
46 |
// // String hash = new Person(getPersonName(decoder), true).hash(); |
|
47 |
// |
|
48 |
// rowKey.set(hash); |
|
49 |
// ibw.set(body); |
|
50 |
// context.write(rowKey, ibw); |
|
51 |
// |
|
52 |
// } catch (final Throwable e) { |
|
53 |
// System.out.println("GOT EX " + e); |
|
54 |
// e.printStackTrace(System.err); |
|
55 |
// context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1); |
|
56 |
// } |
|
57 |
// } else { |
|
58 |
// context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1); |
|
59 |
// } |
|
60 |
// } |
|
61 |
// |
|
62 |
// // private String getPersonName(DNGFDecoder decoder) { |
|
63 |
// // Metadata m = decoder.getEntity().getPerson().getMetadata(); |
|
64 |
// // String secondnames = Joiner.on(" ").join(m.getSecondnamesList()); |
|
65 |
// // |
|
66 |
// // return isValid(m.getFullname()) ? m.getFullname() : (secondnames + ", " + m.getFirstname()); |
|
67 |
// // } |
|
68 |
// |
|
69 |
// // private boolean isValid(String fullname) { |
|
70 |
// // return fullname != null && !fullname.isEmpty(); |
|
71 |
// // } |
|
72 |
// |
|
73 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import eu.dnetlib.data.graph.model.DNGFDecoder; |
|
6 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
7 |
import org.apache.hadoop.hbase.client.Result; |
|
8 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
9 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
|
|
12 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
13 |
import eu.dnetlib.pace.config.DedupConfig; |
|
14 |
import eu.dnetlib.pace.model.Person; |
|
15 |
|
|
16 |
public class SimpleDedupPersonMapper extends TableMapper<Text, ImmutableBytesWritable> { |
|
17 |
|
|
18 |
private DedupConfig dedupConf; |
|
19 |
|
|
20 |
private Text rowKey; |
|
21 |
|
|
22 |
private ImmutableBytesWritable ibw; |
|
23 |
|
|
24 |
@Override |
|
25 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
26 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
27 |
rowKey = new Text(); |
|
28 |
ibw = new ImmutableBytesWritable(); |
|
29 |
} |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException { |
|
33 |
// System.out.println("got key: " + new String(keyIn.copyBytes())); |
|
34 |
|
|
35 |
if (HBaseTableDAO.isRoot(new String(keyIn.copyBytes()))) { |
|
36 |
context.getCounter(dedupConf.getWf().getEntityType(), "roots skipped").increment(1); |
|
37 |
return; |
|
38 |
} |
|
39 |
final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), HBaseTableDAO.cfMetadataByte()); |
|
40 |
|
|
41 |
if (body != null) { |
|
42 |
try { |
|
43 |
final DNGFDecoder decoder = DNGFDecoder.decode(body); |
|
44 |
|
|
45 |
final String hash = new Person(decoder.getEntity().getPerson().getMetadata().getFullname().getValue(), false).hash(); |
|
46 |
// String hash = new Person(getPersonName(decoder), true).hash(); |
|
47 |
|
|
48 |
rowKey.set(hash); |
|
49 |
ibw.set(body); |
|
50 |
context.write(rowKey, ibw); |
|
51 |
|
|
52 |
} catch (final Throwable e) { |
|
53 |
System.out.println("GOT EX " + e); |
|
54 |
e.printStackTrace(System.err); |
|
55 |
context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1); |
|
56 |
} |
|
57 |
} else { |
|
58 |
context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1); |
|
59 |
} |
|
60 |
} |
Also available in: Unified diff
work in progress, adapting m/r jobs to the updated graph domain version