Revision 41680
Added by Eri Katsari about 8 years ago
modules/dnet-openaire-stats/branches/full_export/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/exporters/UsageStatsExporter.java | ||
---|---|---|
1 |
/* |
|
2 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.exporters; |
|
3 |
|
|
4 |
import eu.dnetlib.data.mapreduce.hbase.statsExport.daos.UsageStatsDAO; |
|
5 |
import org.apache.log4j.Logger; |
|
6 |
|
|
7 |
public class UsageStatsExporter { |
|
8 |
|
|
9 |
|
|
10 |
private Logger log = Logger.getLogger(this.getClass()); |
|
11 |
private UsageStatsDAO usageStatsDAO; |
|
12 |
|
|
13 |
public UsageStatsExporter(String dbUrl, String dbDriver, String delim,String outputPath) throws Exception { |
|
14 |
usageStatsDAO = new UsageStatsDAO(dbUrl, dbDriver, delim, outputPath); |
|
15 |
} |
|
16 |
|
|
17 |
public void exportUsageStats(String entity) throws Exception { |
|
18 |
usageStatsDAO.getUsageStatistics(entity); |
|
19 |
log.info("Extracted Stats for entity " + entity); |
|
20 |
|
|
21 |
} |
|
22 |
|
|
23 |
|
|
24 |
public UsageStatsDAO getUsageStatsDAO() { |
|
25 |
return usageStatsDAO; |
|
26 |
} |
|
27 |
|
|
28 |
public void setUsageStatsDAO(UsageStatsDAO usageStatsDAO) { |
|
29 |
this.usageStatsDAO = usageStatsDAO; |
|
30 |
} |
|
31 |
} |
|
32 |
*/ |
modules/dnet-openaire-stats/branches/full_export/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/exporters/ContextExporter.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.statsExport.exporters; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.ContextTransformer; |
|
4 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
5 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
6 |
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; |
|
7 |
import org.apache.hadoop.conf.Configuration; |
|
8 |
import org.apache.hadoop.fs.FSDataOutputStream; |
|
9 |
import org.apache.hadoop.fs.FileSystem; |
|
10 |
import org.apache.hadoop.fs.Path; |
|
11 |
import org.apache.log4j.Logger; |
|
12 |
|
|
13 |
import java.util.ArrayList; |
|
14 |
import java.util.List; |
|
15 |
|
|
16 |
public class ContextExporter { |
|
17 |
private ContextTransformer contextTransformer = new ContextTransformer(); |
|
18 |
private String outputPath; |
|
19 |
private Logger log = Logger.getLogger(this.getClass()); |
|
20 |
|
|
21 |
|
|
22 |
private ArrayList<String> context = new ArrayList<String>(); |
|
23 |
private ArrayList<String> category = new ArrayList<String>(); |
|
24 |
private ArrayList<String> concept = new ArrayList<String>(); |
|
25 |
|
|
26 |
public ContextExporter(String outputPath, String contextMap, boolean readFromUrl) throws Exception { |
|
27 |
if (!outputPath.endsWith("/")) { |
|
28 |
outputPath += "/"; |
|
29 |
} |
|
30 |
this.outputPath = outputPath; |
|
31 |
if (readFromUrl) { |
|
32 |
readFromUrl(contextMap); |
|
33 |
} else { |
|
34 |
readFromBuffer(contextMap); |
|
35 |
} |
|
36 |
|
|
37 |
} |
|
38 |
|
|
39 |
public void readFromUrl(String url) throws Exception { |
|
40 |
|
|
41 |
List<String> concepts = getContextResouces(url); |
|
42 |
log.info("Returned concept " + concepts.size() |
|
43 |
); |
|
44 |
|
|
45 |
for (String data : concepts) { |
|
46 |
log.info("++++++++++++++ Transforming concept data "); |
|
47 |
String res = contextTransformer.transformXSL(data); |
|
48 |
|
|
49 |
processData(res); |
|
50 |
} |
|
51 |
|
|
52 |
writeData(this.context, "context"); |
|
53 |
writeData(this.category, "category"); |
|
54 |
writeData(this.concept, "concept"); |
|
55 |
|
|
56 |
|
|
57 |
} |
|
58 |
|
|
59 |
private void readFromBuffer(String contextMap) throws Exception { |
|
60 |
|
|
61 |
if (contextMap == null || contextMap.isEmpty()) { |
|
62 |
log.error("Context Resources file is empty."); |
|
63 |
throw new Exception("Context Resources file is empty."); |
|
64 |
} |
|
65 |
|
|
66 |
String data = contextTransformer.transformXSL(contextMap); |
|
67 |
|
|
68 |
log.info(data); |
|
69 |
processData(data); |
|
70 |
} |
|
71 |
|
|
72 |
private List<String> getContextResouces(String url) throws ISLookUpException { |
|
73 |
ISLookUpService lookUpService; |
|
74 |
|
|
75 |
JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); |
|
76 |
factory.setServiceClass(ISLookUpService.class); |
|
77 |
factory.setAddress(url); |
|
78 |
|
|
79 |
lookUpService = (ISLookUpService) factory.create(); |
|
80 |
// for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType'] |
|
81 |
// [.//RESOURCE_KIND/@value='ContextDSResources'] return $x |
|
82 |
return lookUpService.quickSearchProfile("//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType'][.//RESOURCE_KIND/@value='ContextDSResources']"); |
|
83 |
} |
|
84 |
|
|
85 |
|
|
86 |
private void writeData(ArrayList<String> dataList, String listName) throws Exception { |
|
87 |
if (dataList.isEmpty()) return; |
|
88 |
log.info(listName + " size " + dataList.size()); |
|
89 |
|
|
90 |
String data = new String(); |
|
91 |
for (int i = 0; i < dataList.size(); i++) { |
|
92 |
|
|
93 |
data += dataList.get(i); |
|
94 |
|
|
95 |
|
|
96 |
} |
|
97 |
|
|
98 |
|
|
99 |
data = data.substring(0, data.lastIndexOf("\n")); |
|
100 |
|
|
101 |
|
|
102 |
flushString(data, outputPath + listName); |
|
103 |
|
|
104 |
|
|
105 |
} |
|
106 |
|
|
107 |
private void processData(String data) throws Exception { |
|
108 |
try { |
|
109 |
|
|
110 |
String[] split = data.split("COPY\n"); |
|
111 |
|
|
112 |
if (split.length > 0) { |
|
113 |
context.add(split[0]); |
|
114 |
} |
|
115 |
|
|
116 |
|
|
117 |
if (split.length > 1) { |
|
118 |
category.add(split[1]); |
|
119 |
} |
|
120 |
|
|
121 |
if (split.length > 2) { |
|
122 |
|
|
123 |
concept.add(split[2]); |
|
124 |
} |
|
125 |
|
|
126 |
} catch (Exception e) { |
|
127 |
String msg = " Unable to create file with context, " + "concept and category values in output path " + outputPath + ". Reason: "; |
|
128 |
log.error(msg); |
|
129 |
throw new Exception(msg, e); |
|
130 |
} |
|
131 |
|
|
132 |
} |
|
133 |
|
|
134 |
private void flushString(String data, String destination) throws Exception { |
|
135 |
|
|
136 |
FSDataOutputStream fin = null; |
|
137 |
try { |
|
138 |
|
|
139 |
log.info("***********************Writing data:***********************\n" + data); |
|
140 |
log.info("*********************** data:***********************\n"); |
|
141 |
FileSystem fs = FileSystem.get(new Configuration()); |
|
142 |
fin = fs.create(new Path(destination), true); |
|
143 |
|
|
144 |
fin.write(data.getBytes()); |
|
145 |
|
|
146 |
} catch (Exception e) { |
|
147 |
log.error("Failed to write exported data to a file : ", e); |
|
148 |
throw new Exception("Failed to write exported data to a file : " + e.toString(), e); |
|
149 |
|
|
150 |
} finally { |
|
151 |
|
|
152 |
fin.close(); |
|
153 |
|
|
154 |
} |
|
155 |
} |
|
156 |
|
|
157 |
public String getOutputPath() { |
|
158 |
return outputPath; |
|
159 |
} |
|
160 |
|
|
161 |
public void setOutputPath(String outputPath) { |
|
162 |
this.outputPath = outputPath; |
|
163 |
} |
|
164 |
|
|
165 |
} |
modules/dnet-openaire-stats/branches/full_export/src/main/java/eu/dnetlib/data/mapreduce/hbase/statsExport/drivers/SqoopDriver.java | ||
---|---|---|
51 | 51 |
/** |
52 | 52 |
* Driver for the Sqoop tool. Calls the Sqoop Client for each <input file, |
53 | 53 |
* destination table> pair given in the @tables argument. |
54 |
*
|
|
54 |
* |
|
55 | 55 |
* Needed parameters ( connection, database name, etc are set in class |
56 | 56 |
* parameters when an new Sqoop Driver instance is created. |
57 |
*
|
|
57 |
* |
|
58 | 58 |
* @param tables |
59 | 59 |
*/ |
60 | 60 |
public void run(Multimap<String, String> tables) throws Exception { |
... | ... | |
65 | 65 |
|
66 | 66 |
String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans, "-Dmapreduce.job.reduces = " + sqoopReducersCount, |
67 | 67 |
|
68 |
"--connect", connectionUrl, "--table", table.getKey(), |
|
68 |
"--connect", connectionUrl, "--table", table.getKey(),
|
|
69 | 69 |
|
70 |
"--export-dir", table.getValue(), |
|
70 |
"--export-dir", table.getValue(),
|
|
71 | 71 |
|
72 |
"--input-fields-terminated-by", delim,
|
|
72 |
"--input-fields-terminated-by", delim,
|
|
73 | 73 |
// "--input-enclosed-by", enclosed, |
74 |
"--optionally-enclosed-by", enclosed, |
|
74 |
"--optionally-enclosed-by", enclosed,
|
|
75 | 75 |
|
76 |
"--verbose", "--username", dbUser, "--password", dbPass, |
|
77 |
"--driver", "org.postgresql.Driver", "--batch", "--mapreduce-job-name", "Sqoop Stats Import Job for " + table.getKey(), "--m", sqoopReducersCount }; |
|
76 |
"--verbose", "--username", dbUser, "--password", dbPass, "--driver", "org.postgresql.Driver", "--batch", "--mapreduce-job-name", "Sqoop Stats Import Job for " + table.getKey(), "--m", sqoopReducersCount }; |
|
78 | 77 |
|
79 | 78 |
int ret = Sqoop.runTool(str); |
80 | 79 |
|
... | ... | |
223 | 222 |
|
224 | 223 |
/** |
225 | 224 |
* Cleans up auto-generated Sqoop class files |
226 |
*
|
|
225 |
* |
|
227 | 226 |
* @param table |
228 | 227 |
*/ |
229 | 228 |
|
Also available in: Unified diff
updates for multible output names and tables in job.properties