support for s3 and minio

This commit is contained in:
2021-11-18 19:16:44 +01:00
parent 63e801ad05
commit c05ee888eb
10 changed files with 604 additions and 21 deletions

View File

@@ -6,9 +6,11 @@ convict.addFormat(require('convict-format-with-validator').ipaddress)
export enum StorageType {
Local = 'local',
// S3 = 's3',
Minio = 'minio',
S3 = 's3',
// GCS = 'gcs',
// Azure = 'azure',
// B2 = 'b2',
}
export enum URLClean {
@@ -53,6 +55,14 @@ const config = convict({
env: 'ADDRESS',
},
// Logging
logLevel: {
doc: 'The level of logging to use.',
format: ['trace', 'debug', 'info', 'warn', 'error', 'fatal'],
default: 'info',
env: 'LOG_LEVEL',
},
// Security
allowedDomains: {
doc: 'The domains that are allowed to be used as image sources',
@@ -97,6 +107,72 @@ const config = convict({
default: './assets',
env: 'LOCAL_ASSETS',
},
// Minio storage
minio: {
accessKey: {
doc: 'The access key for Minio',
format: String,
default: '',
env: 'MINIO_ACCESS_KEY',
sensitive: true,
},
secretKey: {
doc: 'The secret key for Minio',
format: String,
default: '',
env: 'MINIO_SECRET_KEY',
sensitive: true,
},
endpoint: {
doc: 'The endpoint for Minio',
format: String,
default: '',
env: 'MINIO_ENDPOINT',
},
bucket: {
doc: 'The bucket to use for Minio',
format: String,
default: '',
env: 'MINIO_BUCKET',
},
region: {
doc: 'The region for Minio',
format: String,
default: '',
env: 'MINIO_REGION',
},
},
// S3 storage
s3: {
bucket: {
doc: 'The S3 bucket to use',
format: String,
default: '',
env: 'S3_BUCKET',
},
region: {
doc: 'The S3 region to use',
format: String,
default: '',
env: 'S3_REGION',
},
accessKey: {
doc: 'The S3 access key id to use',
format: String,
default: '',
env: 'S3_ACCESS_KEY_ID',
sensitive: true,
},
secretKey: {
doc: 'The S3 secret access key to use',
format: String,
default: '',
env: 'S3_SECRET_ACCESS_KEY',
sensitive: true,
},
},
})
for (const file of ['morphus.yaml', 'morphus.yaml']) {

View File

@@ -187,9 +187,12 @@ export const image: RouteHandlerMethod = async (request, reply) => {
// @ts-ignore
reply.expires(new Date(Date.now() + ms(Config.maxAge)))
let stream: NodeJS.ReadableStream = (await storage.exists(q.hash))
? await storage.readStream(q.hash)
: await transform(q)
let stream: NodeJS.ReadableStream
try {
stream = await storage.readStream(q.hash)
} catch (err) {
stream = await transform(q)
}
reply.code(200).headers({
'Content-Type': `image/${q.format?.name}`,

View File

@@ -6,7 +6,7 @@ import { init as initStorage } from './storage'
import { init as initMiddleware } from './fastify/middleware'
import { init as initHooks } from './fastify/hooks'
export const App = fastify({ logger: { prettyPrint: true } })
export const App = fastify({ logger: { prettyPrint: true, level: Config.logLevel } })
process.on('SIGINT', async function () {
App.log.info('Stopping server')
@@ -19,7 +19,7 @@ async function main() {
try {
// Internal
initConfig(App)
initStorage()
await initStorage(App)
// Fastify
initMiddleware(App)

View File

@@ -1,30 +1,63 @@
import { FastifyInstance } from 'fastify'
import { Config, StorageType } from '../config'
import { Local } from './local'
import { Minio } from './minio'
export abstract class Storage {
abstract init(): Promise<void>
abstract read(path: string): Promise<Buffer>
abstract write(path: string, data: Buffer): Promise<void>
abstract exists(path: string): Promise<boolean>
abstract delete(path: string): Promise<void>
abstract readStream(path: string): Promise<NodeJS.ReadableStream>
abstract writeStream(path: string): Promise<NodeJS.WritableStream>
// list(path: string): Promise<string[]>
abstract init(): Promise<void>
// list(path: string): Promise<string[]>
abstract exists(path: string): Promise<boolean>
abstract delete(path: string): Promise<void>
}
export let storage: Storage
export async function init() {
export async function init(App: FastifyInstance) {
if (!storage) {
switch (Config.storage) {
case StorageType.Local:
storage = new Local(Config.localAssets)
break
case StorageType.S3:
// storage = new S3({
// accessKeyId: Config.s3.accessKey,
// secretAccessKey: Config.s3.secretKey,
// bucket: Config.s3.bucket,
// region: Config.s3.region,
// })
storage = new Minio({
accessKey: Config.s3.accessKey,
secretKey: Config.s3.secretKey,
bucket: Config.s3.bucket,
region: Config.s3.region,
endpoint: 'https://s3.amazonaws.com',
})
break
case StorageType.Minio:
storage = new Minio({
accessKey: Config.minio.accessKey,
secretKey: Config.minio.secretKey,
endpoint: Config.minio.endpoint,
region: Config.minio.region,
bucket: Config.minio.bucket,
})
break
default:
throw new Error(`Unknown storage type: ${Config.storage}`)
}
await storage.init()
try {
await storage.init()
App.log.debug(`Storage initialized: ${Config.storage}`)
} catch (e) {
App.log.error(`Storage initialization failed: ${Config.storage}`)
process.exit(1)
}
}
}

64
src/storage/minio.ts Normal file
View File

@@ -0,0 +1,64 @@
import { Client } from 'minio'
import { PassThrough } from 'stream'
import { Storage } from '.'
import { StreamUtils } from '../utils/utils'
export type MinioConfig = {
accessKey: string
secretKey: string
endpoint: string
region?: string
bucket: string
}
export class Minio implements Storage {
client: Client
constructor(private options: MinioConfig) {
const url = new URL(this.options.endpoint)
this.client = new Client({
accessKey: options.accessKey,
secretKey: options.secretKey,
endPoint: url.hostname,
port: parseInt(url.port),
useSSL: url.protocol === 'https:',
})
}
async init(): Promise<void> {
await this.client.bucketExists(this.options.bucket)
}
async read(path: string): Promise<Buffer> {
const stream = await this.client.getObject(this.options.bucket, path)
return StreamUtils.toBuffer(stream)
}
async write(path: string, data: Buffer): Promise<void> {
const stream = await StreamUtils.fromBuffer(data)
await this.client.putObject(this.options.bucket, path, stream)
}
async readStream(path: string): Promise<NodeJS.ReadableStream> {
const stream = await this.client.getObject(this.options.bucket, path)
return stream
}
async writeStream(path: string): Promise<NodeJS.WritableStream> {
const stream = new PassThrough()
this.client.putObject(this.options.bucket, path, stream)
return stream
}
async exists(path: string): Promise<boolean> {
try {
await this.client.statObject(this.options.bucket, path)
return true
} catch {
return false
}
}
delete(path: string): Promise<void> {
throw new Error('Method not implemented. Delete')
}
}

View File

@@ -6,8 +6,8 @@ import { ComplexParameter, TransformQueryBase } from '../controllers/image'
import { storage } from '../storage'
import { sha3, splitter } from '../utils/utils'
async function downloadImage(url: string): Promise<NodeJS.ReadableStream> {
const disk = await storage.writeStream(sha3(url))
async function downloadAndSaveImage(url: string, path: string): Promise<NodeJS.ReadableStream> {
const disk = await storage.writeStream(path)
return new Promise((resolve) => {
get(url, (res) => {
const out = new PassThrough()
@@ -19,10 +19,11 @@ async function downloadImage(url: string): Promise<NodeJS.ReadableStream> {
export async function getImage(url: string): Promise<NodeJS.ReadableStream> {
const id = sha3(url)
if (!(await storage.exists(id))) {
return await downloadImage(url)
try {
return await storage.readStream(id)
} catch {
return await downloadAndSaveImage(url, id)
}
return await storage.readStream(id)
}
function applyOperation(pipeline: sharp.Sharp, { name, options }: ComplexParameter<string, any>): sharp.Sharp {

View File

@@ -1,7 +1,6 @@
import { createHash } from 'crypto'
import { validateSync, ValidatorOptions, ValidationError as VE } from 'class-validator'
import { PassThrough, Readable } from 'stream'
import { NullableStringOrRegexpArray } from '../config'
export class ValidationError extends Error {
override message: string
@@ -47,3 +46,21 @@ export function testForPrefixOrRegexp(str: string, values: (string | RegExp)[]):
}
return false
}
export class StreamUtils {
static fromBuffer(buffer: Buffer) {
const stream = new Readable()
stream.push(buffer)
stream.push(null)
return stream
}
static toBuffer(stream: NodeJS.ReadableStream) {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = []
stream.on('data', (chunk) => chunks.push(chunk))
stream.on('error', reject)
stream.on('end', () => resolve(Buffer.concat(chunks)))
})
}
}