Project

General

Profile

1
package eu.dnetlib.data.objectstore.s3;
2

    
3
import com.amazonaws.ClientConfiguration;
4
import com.amazonaws.Protocol;
5
import com.amazonaws.auth.AWSCredentials;
6
import com.amazonaws.auth.AWSStaticCredentialsProvider;
7
import com.amazonaws.auth.BasicAWSCredentials;
8
import com.amazonaws.client.builder.AwsClientBuilder;
9
import com.amazonaws.services.s3.AmazonS3;
10
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
11
import com.amazonaws.services.s3.model.ListObjectsV2Request;
12
import com.amazonaws.services.s3.model.ListObjectsV2Result;
13
import com.amazonaws.services.s3.model.S3ObjectSummary;
14
import com.google.common.collect.Lists;
15
import com.mongodb.client.MongoCollection;
16
import com.mongodb.client.model.Filters;
17
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
18
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
19
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
20
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
21
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
22
import eu.dnetlib.enabling.resultset.ResultSetListener;
23
import org.apache.commons.lang3.StringUtils;
24
import org.apache.commons.lang3.tuple.ImmutableTriple;
25
import org.apache.commons.lang3.tuple.Triple;
26
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.LogFactory;
28
import org.bson.Document;
29

    
30
import java.io.*;
31
import java.nio.file.Files;
32
import java.nio.file.Path;
33
import java.nio.file.StandardCopyOption;
34
import java.util.concurrent.atomic.AtomicInteger;
35
import java.util.regex.Pattern;
36

    
37
public class S3ObjectStore implements ObjectStore {
38

    
39
    //CONSTANT VARIABLE NAME
40
    private static final String S3_REGION = "eu-west-3";
41
    private static final String URI_FIELD = "uri";
42
    private static final String ID_FIELD = "id";
43
    private static final String MIME_FIELD = "mime";
44
    private static final String ORIGINAL_OBJECT_FIELD = "originalObject";
45
    private static final String TIMESTAMP_FIELD = "timestamp";
46
    private static final String MD5_FIELD = "md5Sum";
47
    private static final String SIZE_FIELD = "size";
48

    
49

    
50
    private final String id;
51
    private final String interpretation;
52

    
53
    private final String s3AccessKey;
54
    private final String s3SecretKey;
55
    private final String s3EndPoint;
56
    private final String objectStoreBucket;
57

    
58

    
59
    private final MongoCollection<Document> mongoCollection;
60

    
61
    private AmazonS3 client;
62

    
63
    private static final Log log = LogFactory.getLog(S3ObjectStore.class);
64

    
65
    public S3ObjectStore(final String identifier, final String interpretation, final String s3AccessKey, final String s3SecretKey, final String s3EndPoint, final String objectStoreBucket, final MongoCollection<Document> mongoCollection) throws ObjectStoreServiceException {
66
        this.id = identifier;
67
        this.interpretation = interpretation;
68
        this.s3AccessKey = s3AccessKey;
69
        this.s3SecretKey = s3SecretKey;
70
        this.mongoCollection = mongoCollection;
71
        this.s3EndPoint = s3EndPoint;
72
        this.objectStoreBucket = objectStoreBucket;
73
    }
74

    
75

    
76
    private AmazonS3 initializeClient() throws ObjectStoreServiceException {
77
        try {
78
            final AWSCredentials credentials = new BasicAWSCredentials(this.s3AccessKey, this.s3SecretKey);
79
            final ClientConfiguration cfg = new ClientConfiguration().withProtocol(Protocol.HTTPS);
80
            final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials))
81
                    .withClientConfiguration(cfg)
82
                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.s3EndPoint, S3_REGION))
83
                    .build();
84
            if (s3 == null)
85
                throw new ObjectStoreServiceException("Cannot initialize s3 client because is NULL");
86
            return s3;
87
        } catch (Throwable e) {
88
            log.error("An Error happen on initialize client ", e);
89
            throw new ObjectStoreServiceException("Cannot initialize s3 client", e);
90
        }
91
    }
92

    
93

    
94
    @Override
95
    public String getId() {
96
        return this.id;
97
    }
98

    
99
    @Override
100
    public String getInterpretation() {
101
        return this.interpretation;
102
    }
103

    
104
    @Override
105
    public int feed(Iterable<ObjectStoreRecord> iterable, boolean upsert) throws ObjectStoreServiceException {
106
        final AtomicInteger count = new AtomicInteger();
107
        iterable.forEach(objectStoreRecord -> {
108
            try {
109
                feedObjectRecord(objectStoreRecord);
110
                count.incrementAndGet();
111
            } catch (ObjectStoreServiceException e) {
112
                log.error("Error on saving file in a temporary Folder");
113
            }
114
        });
115
        return count.intValue();
116
    }
117

    
118
    @Override
119
    public int feedMetadataRecord(Iterable<MetadataObjectRecord> iterable, boolean b) throws ObjectStoreServiceException {
120
        final AtomicInteger count = new AtomicInteger();
121
        iterable.forEach(mor -> {
122
            final ObjectStoreRecord r = new ObjectStoreRecord();
123
            r.setInputStream(new ByteArrayInputStream(mor.getRecord().getBytes()));
124
            final ObjectStoreFile fileMetadata = new ObjectStoreFile();
125
            fileMetadata.setObjectID(mor.getId());
126
            fileMetadata.setMimeType(mor.getMime());
127
            r.setFileMetadata(fileMetadata);
128
            try {
129
                feedObjectRecord(r);
130
                count.incrementAndGet();
131
            } catch (ObjectStoreServiceException e) {
132
                log.error("Unable to store record r", e);
133
            }
134
        });
135
        return count.intValue();
136
    }
137

    
138
    @Override
139
    public String feedObjectRecord(ObjectStoreRecord objectStoreRecord) throws ObjectStoreServiceException {
140
        if (client == null)
141
            this.client = initializeClient();
142
        try {
143
            long start = System.currentTimeMillis();
144
            final Triple<String, Long, Path> result = saveAndGenerateMD5(objectStoreRecord.getInputStream());
145
            log.debug("Total time to download into fileSystem " + (System.currentTimeMillis() - start));
146
            if (StringUtils.isNotBlank(result.getLeft())) {
147
                log.debug("Saving object with ID: " + objectStoreRecord.getFileMetadata().getObjectID() + " on s3 ");
148
                start = System.currentTimeMillis();
149
                this.client.putObject(objectStoreBucket, id + "/" + objectStoreRecord.getFileMetadata().getObjectID(), result.getRight().toFile());
150
                log.debug("Total time to put into the ObjectStore " + (System.currentTimeMillis() - start));
151
                log.debug("Deleting file");
152
                boolean deleteSuccess = Files.deleteIfExists(result.getRight());
153
                log.debug("Temporary file deleting success " + deleteSuccess);
154
                log.debug("Saving object on s3 ");
155
                double timestamp = System.currentTimeMillis();
156
                Document metadata = new Document()
157
                        .append(ID_FIELD, objectStoreRecord.getFileMetadata().getObjectID())
158
                        .append(MIME_FIELD, objectStoreRecord.getFileMetadata().getMimeType())
159
                        .append(ORIGINAL_OBJECT_FIELD, objectStoreRecord.getFileMetadata().toJSON())
160
                        .append(TIMESTAMP_FIELD, timestamp)
161
                        .append(MD5_FIELD, result.getLeft())
162
                        .append(SIZE_FIELD, result.getMiddle())
163
                        .append(URI_FIELD, String.format("s3://%s/%s/%s", objectStoreBucket, id, objectStoreRecord.getFileMetadata().getObjectID()));
164
                log.debug("saving metadata object to the collection: " + metadata.toString());
165
                start = System.currentTimeMillis();
166
                mongoCollection.insertOne(metadata);
167
                log.debug("Total time to sav in Mongo " + (System.currentTimeMillis() - start));
168

    
169
            }
170
        } catch (IOException e) {
171
            log.error("Error on saving file in a temporary Folder");
172
            throw new ObjectStoreServiceException(e);
173

    
174
        }
175
        return null;
176
    }
177

    
178
    @Override
179
    public ResultSetListener deliver(final Long from, final Long until) throws ObjectStoreServiceException {
180
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
181
        resultSet.setMongoCollection(this.mongoCollection);
182
        resultSet.setObjectStoreID(getId());
183
        resultSet.setFromDate(from);
184
        resultSet.setUntilDate(until);
185
        return resultSet;
186
    }
187

    
188
    @Override
189
    public ResultSetListener deliverIds(final Iterable<String> ids) throws ObjectStoreServiceException {
190
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
191
        resultSet.setMongoCollection(this.mongoCollection);
192
        resultSet.setObjectStoreID(getId());
193
        resultSet.setRecords(Lists.newArrayList(ids));
194
        return resultSet;
195
    }
196

    
197
    @Override
198
    public ObjectStoreFile deliverObject(String objectId) throws ObjectStoreServiceException {
199
        Document resultQuery = this.mongoCollection.find(Filters.eq("id", objectId)).first();
200
        if (resultQuery!= null)
201
            return ObjectStoreS3Utility.build(resultQuery);
202
        else return null;
203
    }
204

    
205
    @Override
206
    public int getSize() throws ObjectStoreServiceException {
207
        return (int) this.mongoCollection.count();
208
    }
209

    
210
    @Override
211
    public void deleteObject(String objectId) throws ObjectStoreServiceException {
212
        final Document response =this.mongoCollection.findOneAndDelete(Filters.eq("id", objectId));
213
        if (response == null)
214
            throw new ObjectStoreServiceException("Error document not found with objectId: "+objectId);
215

    
216
        if (this.client == null)
217
            this.client = initializeClient();
218

    
219
        this.client.deleteObject(this.objectStoreBucket, String.format("%s/%s", this.id, response.get(ID_FIELD)));
220
    }
221

    
222
    @Override
223
    public String getObject(String objectId) throws ObjectStoreServiceException {
224

    
225
        Document response = this.mongoCollection.find(Filters.eq("id", objectId)).first();
226
        if (response == null || !response.containsKey(URI_FIELD))
227
            return null;
228
        return response.getString(URI_FIELD);
229
    }
230

    
231
    @Override
232
    public boolean existIDStartsWith(String startId) throws ObjectStoreServiceException {
233
        return this.mongoCollection.count(Filters.regex("id", Pattern.compile(startId))) > 0;
234
    }
235

    
236
    @Override
237
    public boolean dropContent() throws ObjectStoreServiceException {
238
        if (this.client == null) {
239
            this.client = initializeClient();
240
        }
241
        final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(objectStoreBucket).withPrefix(id);
242
        ListObjectsV2Result result;
243
        do {
244
            result = this.client.listObjectsV2(req);
245

    
246
            for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
247
                log.debug(String.format(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()));
248
                this.client.deleteObject("openaire-sandro-test", objectSummary.getKey());
249
                log.debug("Object Deleted");
250
            }
251
            String token = result.getNextContinuationToken();
252
            log.debug("Next Continuation Token: " + token);
253
            req.setContinuationToken(token);
254
        } while (result.isTruncated());
255

    
256
        this.mongoCollection.drop();
257
        return true;
258
    }
259

    
260
    private static Triple<String, Long, Path> saveAndGenerateMD5(final InputStream inputStream) throws IOException {
261
        if (inputStream == null) return null;
262

    
263
        File tempFile = File.createTempFile("obsFile", ".tmp");
264
        log.debug("Temp file On Default Location: " + tempFile.getAbsolutePath());
265
        String md5 = null;
266
        long size = 0;
267
        try {
268
            Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
269

    
270
            final FileInputStream fis = new FileInputStream(tempFile);
271
            md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis);
272
            fis.close();
273
            size = Files.size(tempFile.toPath());
274

    
275
        } catch (IOException e1) {
276
            log.error(e1);
277
        }
278
        return new ImmutableTriple<>(md5, size, tempFile.toPath());
279
    }
280
}
(3-3/5)