Mengenal masalahnya
Dalam pengembangan komersial, banyak kasus penggunaan pembelajaran mesin menyiratkan arsitektur multi-tenant dan memerlukan pelatihan model terpisah untuk setiap klien dan / atau pengguna.
Sebagai contoh, pertimbangkan untuk memperkirakan pembelian dan permintaan produk tertentu menggunakan pembelajaran mesin. Jika Anda menjalankan rantai toko ritel, Anda dapat menggunakan data riwayat pembelian pelanggan dan total permintaan untuk produk tersebut untuk memprediksi biaya dan volume pembelian untuk setiap toko satu per satu.
Paling sering, dalam kasus seperti itu, untuk menerapkan model, Anda menulis layanan Flask dan meletakkannya di container Docker. Ada banyak contoh server pembelajaran mesin model tunggal, tetapi ketika harus menerapkan beberapa model, pengembang memiliki beberapa opsi yang tersedia untuk menyelesaikan masalah.
Dalam aplikasi multi-tenant, jumlah tenant tidak diketahui sebelumnya dan bisa dibilang tidak terbatas - pada satu saat Anda mungkin hanya memiliki satu klien, dan pada saat lain Anda dapat menyajikan model terpisah untuk setiap pengguna kepada ribuan pengguna. Di sinilah keterbatasan pendekatan penerapan standar mulai muncul:
Jika kita menerapkan kontainer Docker untuk setiap klien, maka kita akan mendapatkan aplikasi yang sangat besar dan mahal yang akan cukup sulit untuk dikelola.
Penampung tunggal, dalam gambar yang semua modelnya ada, juga tidak berfungsi untuk kita, karena ribuan model dapat bekerja di server, dan model baru ditambahkan saat runtime.
Keputusan
, . , Airflow S3, ML — .
ML — , : -> .
, :
Model — , ; SklearnModel, TensorFlowModel, MyCustomModel . .
ModelInfoRepository — , userid -> modelid. , SQAlchemyModelInfoRepository.
ModelRepository — , ID. FileSystemRepository, S3Repository .
from abc import ABC
class Model(ABC):
@abstractmethod
def predict(self, data: pd.DataFrame) -> np.ndarray:
raise NotImplementedError
class ModelInfoRepository(ABC):
@abstractmethod
def get_model_id_by_user_id(self, user_id: str) -> str:
raise NotImplementedError
class ModelRepository(ABC):
@abstractmethod
def get_model(self, model_id: str) -> Model:
raise NotImplementedError
, sklearn, Amazon S3 userid -> modelid, .
class SklearnModel(Model):
def __init__(self, model):
self.model = model
def predict(self, data: pd.DataFrame):
return self.model.predict(data)
class SQAlchemyModelInfoRepository(ModelInfoRepository):
def __init__(self, sqalchemy_session: Session):
self.session = sqalchemy_session
def get_model_id_by_user_id(user_id: str) -> str:
# implementation goes here, query a table in any Database
class S3ModelRepository(ModelRepository):
def __init__(self, s3_client):
self.s3_client = s3_client
def get_model(self, model_id: str) -> Model:
# load and deserialize pickle from S3, implementation goes here
:
def make_app(model_info_repository: ModelInfoRepository,
model_repsitory: ModelRepository) -> Flask:
app = Flask("multi-model-server")
@app.predict("/predict/<user_id>")
def predict(user_id):
model_id = model_info_repository.get_model_id_by_user_id(user_id)
model = model_repsitory.get_model(model_id)
data = pd.DataFrame(request.json())
predictions = model.predict(data)
return jsonify(predictions.tolist())
return app
, Flask ; sklearn TensorFlow S3 , Flask .
, , . , . cachetools:
from cachetools import Cache
class CachedModelRepository(ModelRepository):
def __init__(self, model_repository: ModelRepository, cache: Cache):
self.model_repository = model_repository
self.cache = cache
@abstractmethod
def get_model(self, model_id: str) -> Model:
if model_id not in self.cache:
self.cache[model_id] = self.model_repository.get_model(model_id)
return self.cache[model_id]
:
from cachetools import LRUCache
model_repository = CachedModelRepository(
S3ModelRepository(s3_client),
LRUCache(max_size=10)
)
- , . , , MLOps . . , . №4 Google: , - .