Project

General

Profile

1
package eu.dnetlib.data.objectstore.s3;
2

    
3
import com.amazonaws.auth.AWSCredentials;
4
import com.amazonaws.auth.AWSStaticCredentialsProvider;
5
import com.amazonaws.auth.BasicAWSCredentials;
6
import com.amazonaws.client.builder.AwsClientBuilder;
7
import com.amazonaws.services.s3.AmazonS3;
8
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
9
import com.amazonaws.services.s3.model.ListObjectsV2Request;
10
import com.amazonaws.services.s3.model.ListObjectsV2Result;
11
import com.amazonaws.services.s3.model.S3ObjectSummary;
12
import com.google.common.collect.Lists;
13
import com.mongodb.client.MongoCollection;
14
import com.mongodb.client.model.Filters;
15
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
16
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
17
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
18
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
19
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
20
import eu.dnetlib.enabling.resultset.ResultSetListener;
21
import org.apache.commons.lang3.StringUtils;
22
import org.apache.commons.lang3.tuple.ImmutableTriple;
23
import org.apache.commons.lang3.tuple.Triple;
24
import org.apache.commons.logging.Log;
25
import org.apache.commons.logging.LogFactory;
26
import org.bson.Document;
27

    
28
import java.io.*;
29
import java.nio.file.Files;
30
import java.nio.file.Path;
31
import java.nio.file.StandardCopyOption;
32
import java.util.concurrent.atomic.AtomicInteger;
33
import java.util.regex.Pattern;
34

    
35
public class S3ObjectStore implements ObjectStore {
36

    
37
    //CONSTANT VARIABLE NAME
38
    private static final String S3_REGION = "eu-west-3";
39
    private static final String URI_FIELD = "uri";
40
    private static final String ID_FIELD = "id";
41
    private static final String MIME_FIELD = "mime";
42
    public static final String ORIGINAL_OBJECT_FIELD = "originalObject";
43
    public static final String TIMESTAMP_FIELD = "timestamp";
44
    public static final String MD5_FIELD = "md5Sum";
45
    public static final String SIZE_FIELD = "size";
46

    
47

    
48
    private final String id;
49
    private final String interpretation;
50

    
51
    private final String s3AccessKey;
52
    private final String s3SecretKey;
53
    private final String s3EndPoint;
54
    private final String objectStoreBucket;
55

    
56

    
57
    private final MongoCollection<Document> mongoCollection;
58

    
59
    private AmazonS3 client;
60

    
61
    private static final Log log = LogFactory.getLog(S3ObjectStore.class);
62

    
63
    public S3ObjectStore(final String identifier, final String interpretation, final String s3AccessKey, final String s3SecretKey, final String s3EndPoint, final String objectStoreBucket, final MongoCollection<Document> mongoCollection) throws ObjectStoreServiceException {
64
        this.id = identifier;
65
        this.interpretation = interpretation;
66
        this.s3AccessKey = s3AccessKey;
67
        this.s3SecretKey = s3SecretKey;
68
        this.mongoCollection = mongoCollection;
69
        this.s3EndPoint = s3EndPoint;
70
        this.objectStoreBucket = objectStoreBucket;
71
    }
72

    
73

    
74
    private AmazonS3 initializeClient() throws ObjectStoreServiceException {
75
        try {
76
            final AWSCredentials credentials = new BasicAWSCredentials(this.s3AccessKey, this.s3SecretKey);
77

    
78

    
79
            final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(credentials))
80
                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(this.s3EndPoint, S3_REGION))
81
                    .build();
82
            if (s3 == null)
83
                throw new ObjectStoreServiceException("Cannot initialize s3 client because is NULL");
84
            return s3;
85
        } catch (Throwable e) {
86
            log.error("An Error happen on initialize client ", e);
87
            throw new ObjectStoreServiceException("Cannot initialize s3 client", e);
88
        }
89
    }
90

    
91

    
92
    @Override
93
    public String getId() {
94
        return this.id;
95
    }
96

    
97
    @Override
98
    public String getInterpretation() {
99
        return this.interpretation;
100
    }
101

    
102
    @Override
103
    public int feed(Iterable<ObjectStoreRecord> iterable, boolean upsert) throws ObjectStoreServiceException {
104
        final AtomicInteger count = new AtomicInteger();
105
        iterable.forEach(objectStoreRecord -> {
106
            try {
107
                feedObjectRecord(objectStoreRecord);
108
                count.incrementAndGet();
109
            } catch (ObjectStoreServiceException e) {
110
                log.error("Error on saving file in a temporary Folder");
111
            }
112
        });
113
        return count.intValue();
114
    }
115

    
116
    @Override
117
    public int feedMetadataRecord(Iterable<MetadataObjectRecord> iterable, boolean b) throws ObjectStoreServiceException {
118
        final AtomicInteger count = new AtomicInteger();
119
        iterable.forEach(mor -> {
120
            final ObjectStoreRecord r = new ObjectStoreRecord();
121
            r.setInputStream(new ByteArrayInputStream(mor.getRecord().getBytes()));
122
            final ObjectStoreFile fileMetadata = new ObjectStoreFile();
123
            fileMetadata.setObjectID(mor.getId());
124
            fileMetadata.setMimeType(mor.getMime());
125
            r.setFileMetadata(fileMetadata);
126
            try {
127
                feedObjectRecord(r);
128
                count.incrementAndGet();
129
            } catch (ObjectStoreServiceException e) {
130
                log.error("Unable to store record r", e);
131
            }
132
        });
133
        return count.intValue();
134
    }
135

    
136
    @Override
137
    public String feedObjectRecord(ObjectStoreRecord objectStoreRecord) throws ObjectStoreServiceException {
138
        if (client == null)
139
            this.client = initializeClient();
140
        try {
141
            long start = System.currentTimeMillis();
142
            final Triple<String, Long, Path> result = saveAndGenerateMD5(objectStoreRecord.getInputStream());
143
            log.debug("Total time to download into fileSystem " + (System.currentTimeMillis() - start));
144
            if (StringUtils.isNotBlank(result.getLeft())) {
145
                log.debug("Saving object with ID: " + objectStoreRecord.getFileMetadata().getObjectID() + " on s3 ");
146
                start = System.currentTimeMillis();
147
                this.client.putObject(objectStoreBucket, id + "/" + objectStoreRecord.getFileMetadata().getObjectID(), result.getRight().toFile());
148
                log.debug("Total time to put into the ObjectStore " + (System.currentTimeMillis() - start));
149
                log.debug("Deleting file");
150
                boolean deleteSuccess = Files.deleteIfExists(result.getRight());
151
                log.debug("Temporary file deleting success " + deleteSuccess);
152
                log.debug("Saving object on s3 ");
153
                double timestamp = System.currentTimeMillis();
154
                Document metadata = new Document()
155
                        .append(ID_FIELD, objectStoreRecord.getFileMetadata().getObjectID())
156
                        .append(MIME_FIELD, objectStoreRecord.getFileMetadata().getMimeType())
157
                        .append(ORIGINAL_OBJECT_FIELD, objectStoreRecord.getFileMetadata().toJSON())
158
                        .append(TIMESTAMP_FIELD, timestamp)
159
                        .append(MD5_FIELD, result.getLeft())
160
                        .append(SIZE_FIELD, result.getMiddle())
161
                        .append(URI_FIELD, String.format("s3://%s/%s/%s", objectStoreBucket, id, objectStoreRecord.getFileMetadata().getObjectID()));
162
                log.debug("saving metadata object to the collection: " + metadata.toString());
163
                start = System.currentTimeMillis();
164
                mongoCollection.insertOne(metadata);
165
                log.debug("Total time to sav in Mongo " + (System.currentTimeMillis() - start));
166

    
167
            }
168
        } catch (IOException e) {
169
            log.error("Error on saving file in a temporary Folder");
170
            throw new ObjectStoreServiceException(e);
171

    
172
        }
173
        return null;
174
    }
175

    
176
    @Override
177
    public ResultSetListener deliver(final Long from, final Long until) throws ObjectStoreServiceException {
178
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
179
        resultSet.setMongoCollection(this.mongoCollection);
180
        resultSet.setObjectStoreID(getId());
181
        resultSet.setFromDate(from);
182
        resultSet.setUntilDate(until);
183
        return resultSet;
184
    }
185

    
186
    @Override
187
    public ResultSetListener deliverIds(final Iterable<String> ids) throws ObjectStoreServiceException {
188
        S3ObjectStoreResultSetListener resultSet = new S3ObjectStoreResultSetListener();
189
        resultSet.setMongoCollection(this.mongoCollection);
190
        resultSet.setObjectStoreID(getId());
191
        resultSet.setRecords(Lists.newArrayList(ids));
192
        return resultSet;
193
    }
194

    
195
    @Override
196
    public ObjectStoreFile deliverObject(String objectId) throws ObjectStoreServiceException {
197
        Document resultQuery = this.mongoCollection.find(Filters.eq("id", objectId)).first();
198
        return ObjectStoreS3Utility.build(resultQuery);
199
    }
200

    
201
    @Override
202
    public int getSize() throws ObjectStoreServiceException {
203
        return (int) this.mongoCollection.count();
204
    }
205

    
206
    @Override
207
    public void deleteObject(String objectId) throws ObjectStoreServiceException {
208
        final Document response =this.mongoCollection.findOneAndDelete(Filters.eq("id", objectId));
209
        if (response == null)
210
            throw new ObjectStoreServiceException("Error document not found with objectId: "+objectId);
211

    
212
        if (this.client == null)
213
            this.client = initializeClient();
214

    
215
        this.client.deleteObject(this.objectStoreBucket, String.format("%s/%s", this.id, response.get(ID_FIELD)));
216
    }
217

    
218
    @Override
219
    public String getObject(String objectId) throws ObjectStoreServiceException {
220

    
221
        Document response = this.mongoCollection.find(Filters.eq("id", objectId)).first();
222
        if (response == null || !response.containsKey(URI_FIELD))
223
            return null;
224
        return response.getString(URI_FIELD);
225
    }
226

    
227
    @Override
228
    public boolean existIDStartsWith(String startId) throws ObjectStoreServiceException {
229
        return this.mongoCollection.count(Filters.regex("id", Pattern.compile(startId))) > 0;
230
    }
231

    
232
    @Override
233
    public boolean dropContent() throws ObjectStoreServiceException {
234
        if (this.client == null) {
235
            this.client = initializeClient();
236
        }
237
        final ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(objectStoreBucket).withPrefix(id);
238
        ListObjectsV2Result result;
239
        do {
240
            result = this.client.listObjectsV2(req);
241

    
242
            for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
243
                log.debug(String.format(" - %s (size: %d)\n", objectSummary.getKey(), objectSummary.getSize()));
244
                this.client.deleteObject("openaire-sandro-test", objectSummary.getKey());
245
                log.debug("Object Deleted");
246
            }
247
            String token = result.getNextContinuationToken();
248
            log.debug("Next Continuation Token: " + token);
249
            req.setContinuationToken(token);
250
        } while (result.isTruncated());
251

    
252
        this.mongoCollection.drop();
253
        return true;
254
    }
255

    
256
    private static Triple<String, Long, Path> saveAndGenerateMD5(final InputStream inputStream) throws IOException {
257
        if (inputStream == null) return null;
258

    
259
        File tempFile = File.createTempFile("obsFile", ".tmp");
260
        log.debug("Temp file On Default Location: " + tempFile.getAbsolutePath());
261
        String md5 = null;
262
        long size = 0;
263
        try {
264
            Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
265

    
266
            final FileInputStream fis = new FileInputStream(tempFile);
267
            md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis);
268
            fis.close();
269
            size = Files.size(tempFile.toPath());
270

    
271
        } catch (IOException e1) {
272
            log.error(e1);
273
        }
274
        return new ImmutableTriple<>(md5, size, tempFile.toPath());
275
    }
276
}
(3-3/5)