前两篇我们介绍了JDBC和Cassandra的gRPC streaming实现。相对MongoDB来说,JDBC和Cassandra支持字符类型的query语句SQL,CQL,所以把query指令转换成protobuf structures是简单直接的。而MongoDB没有提供字符类的query,所以我们必须进行MongoDB query涉及的所有类型与protobuf类型的相互转换,实现gRPC功能会复杂的多。我们在这篇讨论里先介绍MongoDB query的protobuf转换。
在前面的MongoDB-Engine讨论里我们设计了个MGOContext作为JVM内部传递MongoDB query的数据结构:
- case class MGOContext(
- dbName: String,
- collName: String,
- actionType: MGO_ACTION_TYPE = MGO_QUERY,
- action: Option[MGOCommands] = None
- ) {
- ctx =>
- def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
- def setCollName(name: String): MGOContext = ctx.copy(collName = name)
- def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
- def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
- }
下面是这个结构支持的action清单:
- object MGOCommands {
- case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
- case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
- /* org.mongodb.scala.FindObservable
- import com.mongodb.async.client.FindIterable
- val resultDocType = FindIterable[Document]
- val resultOption = FindObservable(resultDocType)
- .maxScan(...)
- .limit(...)
- .sort(...)
- .project(...) */
- case class Find(filter: Option[Bson] = None,
- andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
- firstOnly: Boolean = false) extends MGOCommands
- case class DocumentStream(filter: Option[Bson] = None,
- andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
- ) extends MGOCommands
- case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
- case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
- case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
- case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
- case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
- case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
- case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
- }
- object MGOAdmins {
- case class DropCollection(collName: String) extends MGOCommands
- case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
- case class ListCollection(dbName: String) extends MGOCommands
- case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
- case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
- case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
- case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
- case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
- }
可以看到,我们必须把Bson、Document、FindObservable这几个类型对应到protobuf格式。下面是.proto文件里的部分内容:
- message MGODocument {
- bytes document = 1;
- }
- message MGOBson {
- bytes bson = 1;
- }
- message ResultTransformer { //FindObservable
- int32 optType = 1;
- MGOBson bsonParam = 2;
- int32 valueParam = 3;
- }
- message MGOAdminOptons {
- string tarName = 1;
- repeated MGOBson bsonParam = 2;
- OptionAny options = 3;
- string objName = 4;
- }
- message MGOOperations { //MGOContext
- string dbName = 1;
- string collName = 2;
- int32 commandType = 3;
- repeated MGOBson bsonParam = 4;
- repeated ResultTransformer resultOptions = 5;
- OptionAny options = 6;
- repeated MGODocument documents = 7;
- google.protobuf.BoolValue only = 8;
- MGOAdminOptons adminOptions = 9;
- }
首先,Document是个serializable类,可以直接进行序列/反序列化:
- val po = Document (
- "ponum" -> "po18012301",
- "vendor" -> "The smartphone compay",
- "remarks" -> "urgent, rush order",
- "podtl" -> Seq(
- Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
- Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
- )
- )
- println(po)
- val pobytes = marshal(po)
- println(s"po bytes: ${pobytes}")
- val po1 = unmarshal[Document](pobytes)
- println(s"back to po document: $po1")
下一个是Bson,它是个java interface:
- /**
- * An interface for types that are able to render themselves into a {@code BsonDocument}.
- *
- * @since 3.0
- */
- public interface Bson {
- /**
- * Render the filter into a BsonDocument.
- *
- * @param documentClass the document class in scope for the collection. This parameter may be ignored, but it may be used to alter
- * the structure of the returned {@code BsonDocument} based on some knowledge of the document class.
- * @param codecRegistry the codec registry. This parameter may be ignored, but it may be used to look up {@code Codec} instances for
- * the document class or any other related class.
- * @param <TDocument> the type of the document class
- * @return the BsonDocument
- */
- <TDocument> BsonDocument toBsonDocument(Class<TDocument> documentClass, CodecRegistry codecRegistry);
- }
Bson只是一个interface,不是serilizable,不过BsonDocument可以:
- /**
- * A type-safe container for a BSON document. This class should NOT be sub-classed by third parties.
- *
- * @since 3.0
- */
- public class BsonDocument extends BsonValue implements Map<String, BsonValue>, Cloneable, Bson, Serializable {...}
所以我们可以用BsonDocument来进行序列/反序列后在再用它来构建一个新的Bson对象:
- def bsonToProto(bson: Bson) =
- MGOBson(marshal(bson.toBsonDocument(
- classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
- def protoToBson(proto: MGOBson): Bson = new Bson {
- val bsdoc = unmarshal[BsonDocument](proto.bson)
- override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
- }
最后是这个FindObservable:这个类型的应用场景是这样的:
- /* org.mongodb.scala.FindObservable
- import com.mongodb.async.client.FindIterable
- val resultDocType = FindIterable[Document]
- val resultOption = FindObservable(resultDocType)
- .maxScan(...)
- .limit(...)
- .sort(...)
- .project(...) */
- case class Find(filter: Option[Bson] = None,
- andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
- firstOnly: Boolean = false) extends MGOCommands
FindObservable类型的效果可以是一连串施用的结果,因为是FindObservable[A] => FindObservable[A]这样的款式,所以我们可以用一串FindObservable[Document]来进行序列/反序列化处理,然后再重新串连施用来获得最终的FindObservable。FindObservable对应的protobuf结构如下:
- message ResultTransformer { //FindObservable
- int32 optType = 1;
- MGOBson bsonParam = 2;
- int32 valueParam = 3;
- }
- type FOD_TYPE = Int
- val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
- val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
- val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
- val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
- val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
- //Sets a document describing the fields to return for all matching documents
- val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
- val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
- //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
- val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
- //Sets the cursor type
- val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
- //Sets the hint for which index to use. A null value means no hint is set
- val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
- //Sets the exclusive upper bound for a specific index. A null value means no max is set
- val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
- //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
- val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
- //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
- val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
- //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
- case class ResultOptions(
- optType: FOD_TYPE,
- bsonParam: Option[Bson] = None,
- valueParam: Int = 0 ){
- def toProto = new sdp.grpc.services.ResultTransformer(
- optType = this.optType,
- bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
- valueParam = this.valueParam
- )
- def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
- optType match {
- case FOD_FIRST => find
- case FOD_FILTER => find.filter(bsonParam.get)
- case FOD_LIMIT => find.limit(valueParam)
- case FOD_SKIP => find.skip(valueParam)
- case FOD_PROJECTION => find.projection(bsonParam.get)
- case FOD_SORT => find.sort(bsonParam.get)
- case FOD_PARTIAL => find.partial(valueParam != 0)
- case FOD_CURSORTYPE => find
- case FOD_HINT => find.hint(bsonParam.get)
- case FOD_MAX => find.max(bsonParam.get)
- case FOD_MIN => find.min(bsonParam.get)
- case FOD_RETURNKEY => find.returnKey(valueParam != 0)
- case FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
- }
- }
- }
- object ResultOptions {
- def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
- optType = msg.optType,
- bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
- valueParam = msg.valueParam
- )
- }
我们可以用这个ResultOptions类型的toProto,fromProto来进行protobuf的转换处理。然后用aggregation实现连串施用:
- def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
下面这个函数示范了Find Context的反序列:
- def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
- case MGO_COMMAND_FIND => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Find())
- )
- def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
- (proto.bsonParam, proto.resultOptions, proto.only) match {
- case (Nil, Nil, None) => ctx
- case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
- case (bp,Nil,None) => ctx.setCommand(
- Find(Some(protoToBson(bp.head)),None,false))
- case (bp,Nil,Some(b)) => ctx.setCommand(
- Find(Some(protoToBson(bp.head)),None,b))
- case (bp,fo,None) => {
- ctx.setCommand(
- Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
- }
- case (bp,fo,Some(b)) => {
- ctx.setCommand(
- Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
- }
- case _ => ctx
- }
- }
- }
具体的应用示范例如下:
- val eqState = equal("state","California")
- val proj = exclude("rowid","_id")
- val rtxfmr = Seq(
- ResultOptions(
- optType = FOD_LIMIT,
- valueParam = 3)
- ,ResultOptions(
- optType = FOD_PROJECTION,
- bsonParam = Some(proj))
- )
- val protoCtx = MGOProtoMsg(
- dbName = "testdb",
- collName = "aqmrpt",
- commandType = MGO_COMMAND_FIND,
- bsonParam = Seq(eqState),
- resultOptions = rtxfmr
- ).toProto
- val findCtx = CtxFromProto(protoCtx)
- val futFind = mgoQuery[Seq[Document]](findCtx)
- futFind.onComplete {
- case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
- case Failure(e) => println(e.getMessage)
- }
下面是本次讨论的部分源代码:
MongoDBEngine.scala
- package sdp.mongo.engine
- import java.text.SimpleDateFormat
- import akka.NotUsed
- import akka.stream.alpakka.mongodb.scaladsl._
- import akka.stream.scaladsl.{Flow, Sink, Source}
- import org.mongodb.scala.MongoClient
- import org.mongodb.scala.bson.collection.immutable.Document
- import org.bson.conversions.Bson
- import org.mongodb.scala._
- import org.mongodb.scala.model._
- import java.util.Calendar
- import scala.collection.JavaConverters._
- import sdp.file.Streaming._
- import akka.stream.Materializer
- import org.mongodb.scala.bson.{BsonArray, BsonBinary}
- import scala.concurrent._
- import scala.concurrent.duration._
- import sdp.logging.LogSupport
- object MGOContext {
- type MGO_ACTION_TYPE = Int
- val MGO_QUERY = 0
- val MGO_UPDATE = 1
- val MGO_ADMIN = 2
- trait MGOCommands
- object MGOCommands {
- case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
- case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
- /* org.mongodb.scala.FindObservable
- import com.mongodb.async.client.FindIterable
- val resultDocType = FindIterable[Document]
- val resultOption = FindObservable(resultDocType)
- .maxScan(...)
- .limit(...)
- .sort(...)
- .project(...) */
- case class Find(filter: Option[Bson] = None,
- andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
- firstOnly: Boolean = false) extends MGOCommands
- case class DocumentStream(filter: Option[Bson] = None,
- andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
- ) extends MGOCommands
- case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
- case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
- case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
- case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
- case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
- case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
- case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
- }
- object MGOAdmins {
- case class DropCollection(collName: String) extends MGOCommands
- case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
- case class ListCollection(dbName: String) extends MGOCommands
- case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
- case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
- case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
- case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
- case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
- }
- case class MGOContext(
- dbName: String,
- collName: String,
- actionType: MGO_ACTION_TYPE = MGO_QUERY,
- action: Option[MGOCommands] = None
- ) {
- ctx =>
- def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
- def setCollName(name: String): MGOContext = ctx.copy(collName = name)
- def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
- def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
- }
- object MGOContext {
- def apply(db: String, coll: String) = new MGOContext(db, coll)
- }
- case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
- ctxs =>
- def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
- def appendContext(ctx: MGOContext): MGOBatContext =
- ctxs.copy(contexts = contexts :+ ctx)
- }
- object MGOBatContext {
- def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
- }
- type MGODate = java.util.Date
- def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
- val ca = Calendar.getInstance()
- ca.set(yyyy,mm,dd)
- ca.getTime()
- }
- def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
- val ca = Calendar.getInstance()
- ca.set(yyyy,mm,dd,hr,min,sec)
- ca.getTime()
- }
- def mgoDateTimeNow: MGODate = {
- val ca = Calendar.getInstance()
- ca.getTime
- }
- def mgoDateToString(dt: MGODate, formatString: String): String = {
- val fmt= new SimpleDateFormat(formatString)
- fmt.format(dt)
- }
- type MGOBlob = BsonBinary
- type MGOArray = BsonArray
- def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
- implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
- def mgoBlobToFile(blob: MGOBlob, fileName: String)(
- implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName)
- def mgoGetStringOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getString(fieldName))
- else None
- }
- def mgoGetIntOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getInteger(fieldName))
- else None
- }
- def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getLong(fieldName))
- else None
- }
- def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getDouble(fieldName))
- else None
- }
- def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getBoolean(fieldName))
- else None
- }
- def mgoGetDateOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getDate(fieldName))
- else None
- }
- def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
- else None
- }
- def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- doc.get(fieldName).asInstanceOf[Option[MGOArray]]
- else None
- }
- def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
- (arr.getValues.asScala.toList)
- .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
- }
- type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
- }
- object MGOEngine extends LogSupport {
- import MGOContext._
- import MGOCommands._
- import MGOAdmins._
- object TxUpdateMode {
- private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
- implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
- log.info(s"mgoTxUpdate> calling ...")
- observable.map(clientSession => {
- val transactionOptions =
- TransactionOptions.builder()
- .readConcern(ReadConcern.SNAPSHOT)
- .writeConcern(WriteConcern.MAJORITY).build()
- clientSession.startTransaction(transactionOptions)
- val fut = Future.traverse(ctxs.contexts) { ctx =>
- mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
- }
- Await.ready(fut, 3 seconds)
- clientSession
- })
- }
- private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
- log.info(s"commitAndRetry> calling ...")
- observable.recoverWith({
- case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
- log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
- commitAndRetry(observable)
- }
- case e: Exception => {
- log.error(s"commitAndRetry> Exception during commit ...: $e")
- throw e
- }
- })
- }
- private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
- log.info(s"runTransactionAndRetry> calling ...")
- observable.recoverWith({
- case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
- log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
- runTransactionAndRetry(observable)
- }
- })
- }
- def mgoTxBatch(ctxs: MGOBatContext)(
- implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
- log.info(s"mgoTxBatch> MGOBatContext: ${ctxs}")
- val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
- val commitTransactionObservable: SingleObservable[Completed] =
- updateObservable.flatMap(clientSession => clientSession.commitTransaction())
- val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
- runTransactionAndRetry(commitAndRetryObservable)
- }.toFuture()
- }
- def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
- log.info(s"mgoUpdateBatch> MGOBatContext: ${ctxs}")
- if (ctxs.tx) {
- TxUpdateMode.mgoTxBatch(ctxs)
- } else {
- val fut = Future.traverse(ctxs.contexts) { ctx =>
- mgoUpdate[Completed](ctx).map(identity) }
- Await.ready(fut, 3 seconds)
- Future.successful(new Completed)
- }
- }
- // T => FindIterable e.g List[Document]
- def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): Future[T] = {
- log.info(s"mgoQuery> MGOContext: ${ctx}")
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mgoQuery> uery action cannot be null!")
- throw new IllegalArgumentException("query action cannot be null!")
- }
- ctx.action.get match {
- /* count */
- case Count(Some(filter), Some(opt)) => //SingleObservable
- coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
- .toFuture().asInstanceOf[Future[T]]
- case Count(Some(filter), None) => //SingleObservable
- coll.countDocuments(filter).toFuture()
- .asInstanceOf[Future[T]]
- case Count(None, None) => //SingleObservable
- coll.countDocuments().toFuture()
- .asInstanceOf[Future[T]]
- /* distinct */
- case Distict(field, Some(filter)) => //DistinctObservable
- coll.distinct(field, filter).toFuture()
- .asInstanceOf[Future[T]]
- case Distict(field, None) => //DistinctObservable
- coll.distinct((field)).toFuture()
- .asInstanceOf[Future[T]]
- /* find */
- case Find(None, None, false) => //FindObservable
- if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
- else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
- case Find(None, None, true) => //FindObservable
- if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
- else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
- case Find(Some(filter), None, false) => //FindObservable
- if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
- else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), None, true) => //FindObservable
- if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
- else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
- case Find(None, Some(next), _) => //FindObservable
- if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), Some(next), _) => //FindObservable
- if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
- /* aggregate AggregateObservable*/
- case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
- /* mapReduce MapReduceObservable*/
- case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
- /* list collection */
- case ListCollection(dbName) => //ListConllectionObservable
- client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
- }
- }
- //T => Completed, result.UpdateResult, result.DeleteResult
- def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] =
- mgoUpdateObservable[T](ctx).toFuture()
- def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
- log.info(s"mgoUpdateObservable> MGOContext: ${ctx}")
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mgoUpdateObservable> uery action cannot be null!")
- throw new IllegalArgumentException("query action cannot be null!")
- }
- ctx.action.get match {
- /* insert */
- case Insert(docs, Some(opt)) => //SingleObservable[Completed]
- if (docs.size > 1)
- coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
- else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
- case Insert(docs, None) => //SingleObservable
- if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
- else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
- /* delete */
- case Delete(filter, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
- else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
- case Delete(filter, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
- else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
- /* replace */
- case Replace(filter, replacement, None) => //SingleObservable
- coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
- case Replace(filter, replacement, Some(opt)) => //SingleObservable
- coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
- /* update */
- case Update(filter, update, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
- else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
- case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
- else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
- /* bulkWrite */
- case BulkWrite(commands, None) => //SingleObservable
- coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
- case BulkWrite(commands, Some(opt)) => //SingleObservable
- coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
- }
- }
- def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): SingleObservable[Completed] = {
- log.info(s"mgoAdmin> MGOContext: ${ctx}")
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mgoAdmin> uery action cannot be null!")
- throw new IllegalArgumentException("query action cannot be null!")
- }
- ctx.action.get match {
- /* drop collection */
- case DropCollection(collName) => //SingleObservable
- val coll = db.getCollection(collName)
- coll.drop()
- /* create collection */
- case CreateCollection(collName, None) => //SingleObservable
- db.createCollection(collName)
- case CreateCollection(collName, Some(opt)) => //SingleObservable
- db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions])
- /* list collection
- case ListCollection(dbName) => //ListConllectionObservable
- client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
- */
- /* create view */
- case CreateView(viewName, viewOn, pline, None) => //SingleObservable
- db.createView(viewName, viewOn, pline)
- case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
- db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions])
- /* create index */
- case CreateIndex(key, None) => //SingleObservable
- coll.createIndex(key).asInstanceOf[SingleObservable[Completed]]
- case CreateIndex(key, Some(opt)) => //SingleObservable
- coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[SingleObservable[Completed]]
- /* drop index */
- case DropIndexByName(indexName, None) => //SingleObservable
- coll.dropIndex(indexName)
- case DropIndexByName(indexName, Some(opt)) => //SingleObservable
- coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions])
- case DropIndexByKey(key, None) => //SingleObservable
- coll.dropIndex(key)
- case DropIndexByKey(key, Some(opt)) => //SingleObservable
- coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions])
- case DropAllIndexes(None) => //SingleObservable
- coll.dropIndexes()
- case DropAllIndexes(Some(opt)) => //SingleObservable
- coll.dropIndexes(opt.asInstanceOf[DropIndexOptions])
- }
- }
- /*
- def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- ctx.action match {
- /* count */
- case Count(Some(filter), Some(opt)) => //SingleObservable
- coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
- .toFuture().asInstanceOf[Future[T]]
- case Count(Some(filter), None) => //SingleObservable
- coll.countDocuments(filter).toFuture()
- .asInstanceOf[Future[T]]
- case Count(None, None) => //SingleObservable
- coll.countDocuments().toFuture()
- .asInstanceOf[Future[T]]
- /* distinct */
- case Distict(field, Some(filter)) => //DistinctObservable
- coll.distinct(field, filter).toFuture()
- .asInstanceOf[Future[T]]
- case Distict(field, None) => //DistinctObservable
- coll.distinct((field)).toFuture()
- .asInstanceOf[Future[T]]
- /* find */
- case Find(None, None, optConv, false) => //FindObservable
- if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
- else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
- case Find(None, None, optConv, true) => //FindObservable
- if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
- else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
- case Find(Some(filter), None, optConv, false) => //FindObservable
- if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
- else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), None, optConv, true) => //FindObservable
- if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
- else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
- case Find(None, Some(next), optConv, _) => //FindObservable
- if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), Some(next), optConv, _) => //FindObservable
- if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
- /* aggregate AggregateObservable*/
- case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
- /* mapReduce MapReduceObservable*/
- case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
- /* insert */
- case Insert(docs, Some(opt)) => //SingleObservable[Completed]
- if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
- .asInstanceOf[Future[T]]
- else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
- .asInstanceOf[Future[T]]
- case Insert(docs, None) => //SingleObservable
- if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
- else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
- /* delete */
- case Delete(filter, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
- else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
- case Delete(filter, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
- else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
- /* replace */
- case Replace(filter, replacement, None) => //SingleObservable
- coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
- case Replace(filter, replacement, Some(opt)) => //SingleObservable
- coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
- /* update */
- case Update(filter, update, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
- else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
- case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
- else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
- /* bulkWrite */
- case BulkWrite(commands, None) => //SingleObservable
- coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
- case BulkWrite(commands, Some(opt)) => //SingleObservable
- coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
- /* drop collection */
- case DropCollection(collName) => //SingleObservable
- val coll = db.getCollection(collName)
- coll.drop().toFuture().asInstanceOf[Future[T]]
- /* create collection */
- case CreateCollection(collName, None) => //SingleObservable
- db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
- case CreateCollection(collName, Some(opt)) => //SingleObservable
- db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
- /* list collection */
- case ListCollection(dbName) => //ListConllectionObservable
- client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
- /* create view */
- case CreateView(viewName, viewOn, pline, None) => //SingleObservable
- db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
- case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
- db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
- /* create index */
- case CreateIndex(key, None) => //SingleObservable
- coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
- case CreateIndex(key, Some(opt)) => //SingleObservable
- coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
- /* drop index */
- case DropIndexByName(indexName, None) => //SingleObservable
- coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
- case DropIndexByName(indexName, Some(opt)) => //SingleObservable
- coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
- case DropIndexByKey(key, None) => //SingleObservable
- coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
- case DropIndexByKey(key, Some(opt)) => //SingleObservable
- coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
- case DropAllIndexes(None) => //SingleObservable
- coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
- case DropAllIndexes(Some(opt)) => //SingleObservable
- coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
- }
- }
- */
- def mongoStream(ctx: MGOContext)(
- implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
- log.info(s"mongoStream> MGOContext: ${ctx}")
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mongoStream> uery action cannot be null!")
- throw new IllegalArgumentException("query action cannot be null!")
- }
- ctx.action.get match {
- case DocumentStream(None, None) =>
- MongoSource(coll.find())
- case DocumentStream(Some(filter), None) =>
- MongoSource(coll.find(filter))
- case DocumentStream(None, Some(next)) =>
- MongoSource(next(coll.find()))
- case DocumentStream(Some(filter), Some(next)) =>
- MongoSource(next(coll.find(filter)))
- }
- }
- }
- object MongoActionStream {
- import MGOContext._
- case class StreamingInsert[A](dbName: String,
- collName: String,
- converter: A => Document,
- parallelism: Int = 1
- ) extends MGOCommands
- case class StreamingDelete[A](dbName: String,
- collName: String,
- toFilter: A => Bson,
- parallelism: Int = 1,
- justOne: Boolean = false
- ) extends MGOCommands
- case class StreamingUpdate[A](dbName: String,
- collName: String,
- toFilter: A => Bson,
- toUpdate: A => Bson,
- parallelism: Int = 1,
- justOne: Boolean = false
- ) extends MGOCommands
- case class InsertAction[A](ctx: StreamingInsert[A])(
- implicit mongoClient: MongoClient) {
- val database = mongoClient.getDatabase(ctx.dbName)
- val collection = database.getCollection(ctx.collName)
- def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
- Flow[A].map(ctx.converter)
- .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
- }
- case class UpdateAction[A](ctx: StreamingUpdate[A])(
- implicit mongoClient: MongoClient) {
- val database = mongoClient.getDatabase(ctx.dbName)
- val collection = database.getCollection(ctx.collName)
- def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
- if (ctx.justOne) {
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
- } else
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
- }
- case class DeleteAction[A](ctx: StreamingDelete[A])(
- implicit mongoClient: MongoClient) {
- val database = mongoClient.getDatabase(ctx.dbName)
- val collection = database.getCollection(ctx.collName)
- def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
- if (ctx.justOne) {
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
- } else
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
- }
- }
- object MGOHelpers {
- implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
- override val converter: (Document) => String = (doc) => doc.toJson
- }
- implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
- override val converter: (C) => String = (doc) => doc.toString
- }
- trait ImplicitObservable[C] {
- val observable: Observable[C]
- val converter: (C) => String
- def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
- def headResult() = Await.result(observable.head(), 10 seconds)
- def printResults(initial: String = ""): Unit = {
- if (initial.length > 0) print(initial)
- results().foreach(res => println(converter(res)))
- }
- def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
- }
- def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
- Await.result(fut, timeOut)
- }
- def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
- Await.result(fut, timeOut)
- }
- import monix.eval.Task
- import monix.execution.Scheduler.Implicits.global
- final class FutureToTask[A](x: => Future[A]) {
- def asTask: Task[A] = Task.deferFuture[A](x)
- }
- final class TaskToFuture[A](x: => Task[A]) {
- def asFuture: Future[A] = x.runAsync
- }
- }
MgoProtoConversion.scala
- package sdp.mongo.engine
- import org.mongodb.scala.bson.collection.immutable.Document
- import org.bson.conversions.Bson
- import sdp.grpc.services._
- import protobuf.bytes.Converter._
- import com.google.protobuf.ByteString
- import MGOContext._
- import MGOAdmins._
- import MGOCommands._
- import org.bson.BsonDocument
- import org.bson.codecs.configuration.CodecRegistry
- import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
- import org.mongodb.scala.FindObservable
- object MgoProtoConvertion {
- /* org.mongodb.scala.FindObservable
- import com.mongodb.async.client.FindIterable
- val resultDocType = FindIterable[Document]
- val resultOption = FindObservable(resultDocType)
- .maxScan(...)
- .limit(...)
- .sort(...)
- .project(...) */
- type FOD_TYPE = Int
- val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
- val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
- val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
- val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
- val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
- //Sets a document describing the fields to return for all matching documents
- val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
- val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
- //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
- val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
- //Sets the cursor type
- val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
- //Sets the hint for which index to use. A null value means no hint is set
- val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
- //Sets the exclusive upper bound for a specific index. A null value means no max is set
- val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
- //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
- val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
- //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
- val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
- //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
- case class ResultOptions(
- optType: FOD_TYPE,
- bsonParam: Option[Bson] = None,
- valueParam: Int = 0 ){
- def toProto = new sdp.grpc.services.ResultTransformer(
- optType = this.optType,
- bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
- valueParam = this.valueParam
- )
- def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
- optType match {
- case FOD_FIRST => find
- case FOD_FILTER => find.filter(bsonParam.get)
- case FOD_LIMIT => find.limit(valueParam)
- case FOD_SKIP => find.skip(valueParam)
- case FOD_PROJECTION => find.projection(bsonParam.get)
- case FOD_SORT => find.sort(bsonParam.get)
- case FOD_PARTIAL => find.partial(valueParam != 0)
- case FOD_CURSORTYPE => find
- case FOD_HINT => find.hint(bsonParam.get)
- case FOD_MAX => find.max(bsonParam.get)
- case FOD_MIN => find.min(bsonParam.get)
- case FOD_RETURNKEY => find.returnKey(valueParam != 0)
- case FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
- }
- }
- }
- object ResultOptions {
- def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
- optType = msg.optType,
- bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
- valueParam = msg.valueParam
- )
- }
- type MGO_COMMAND_TYPE = Int
- val MGO_COMMAND_FIND = 0
- val MGO_COMMAND_COUNT = 20
- val MGO_COMMAND_DISTICT = 21
- val MGO_COMMAND_DOCUMENTSTREAM = 1
- val MGO_COMMAND_AGGREGATE = 2
- val MGO_COMMAND_INSERT = 3
- val MGO_COMMAND_DELETE = 4
- val MGO_COMMAND_REPLACE = 5
- val MGO_COMMAND_UPDATE = 6
- val MGO_ADMIN_DROPCOLLECTION = 8
- val MGO_ADMIN_CREATECOLLECTION = 9
- val MGO_ADMIN_LISTCOLLECTION = 10
- val MGO_ADMIN_CREATEVIEW = 11
- val MGO_ADMIN_CREATEINDEX = 12
- val MGO_ADMIN_DROPINDEXBYNAME = 13
- val MGO_ADMIN_DROPINDEXBYKEY = 14
- val MGO_ADMIN_DROPALLINDEXES = 15
- case class MGOAdminCtx(
- tarName: String = "",
- bsonParam: Seq[Bson] = Nil,
- options: Option[Any] = None,
- objName: String = ""
- ){
- def toProto = sdp.grpc.services.MGOAdminOptons(
- tarName = this.tarName,
- bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
- objName = this.objName,
- options = this.options.map(b => OptionAny(marshal(b)))
- )
- }
- object MGOAdminCtx {
- def fromProto(msg: sdp.grpc.services.MGOAdminOptons) = new MGOAdminCtx(
- tarName = msg.tarName,
- bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson))
- )
- }
- case class MGOProtoMsg(
- dbName: String = "",
- collName: String = "",
- commandType: MGO_COMMAND_TYPE,
- bsonParam: Seq[Bson] = Nil,
- resultOptions: Seq[ResultOptions] = Nil,
- options: ByteString = com.google.protobuf.ByteString.EMPTY,
- documents: Seq[Document] = Nil,
- only: Boolean = false,
- adminOptions: Option[MGOAdminCtx] = None
- ){
- def toProto = new sdp.grpc.services.MGOOperations(
- dbName = this.dbName,
- collName = this.collName,
- commandType = this.commandType,
- bsonParam = this.bsonParam.map(bsonToProto),
- resultOptions = this.resultOptions.map(_.toProto),
- documents = this.documents.map(d => sdp.grpc.services.MGODocument(marshal(d))),
- only = Some(this.only),
- adminOptions = this.adminOptions.map(_.toProto)
- )
- }
- object MGOProtoMsg {
- def fromProto(msg: sdp.grpc.services.MGOOperations) = new MGOProtoMsg(
- dbName = msg.dbName,
- collName = msg.collName,
- commandType = msg.commandType,
- bsonParam = msg.bsonParam.map(protoToBson),
- resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r))
- )
- }
- def bsonToProto(bson: Bson) =
- MGOBson(marshal(bson.toBsonDocument(
- classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
- def protoToBson(proto: MGOBson): Bson = new Bson {
- val bsdoc = unmarshal[BsonDocument](proto.bson)
- override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
- }
- def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
- case MGO_COMMAND_FIND => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Find())
- )
- def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
- (proto.bsonParam, proto.resultOptions, proto.only) match {
- case (Nil, Nil, None) => ctx
- case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
- case (bp,Nil,None) => ctx.setCommand(
- Find(Some(protoToBson(bp.head)),None,false))
- case (bp,Nil,Some(b)) => ctx.setCommand(
- Find(Some(protoToBson(bp.head)),None,b))
- case (bp,fo,None) => {
- ctx.setCommand(
- Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
- }
- case (bp,fo,Some(b)) => {
- ctx.setCommand(
- Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
- }
- case _ => ctx
- }
- }
- }
- }
BytesConverter.scala
- package protobuf.bytes
- import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
- import com.google.protobuf.ByteString
- object Converter {
- def marshal(value: Any): ByteString = {
- val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(stream)
- oos.writeObject(value)
- oos.close()
- ByteString.copyFrom(stream.toByteArray())
- }
- def unmarshal[A](bytes: ByteString): A = {
- val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
- val value = ois.readObject()
- ois.close()
- value.asInstanceOf[A]
- }
- }
FindDemo.scala
- package demo.sdp.mgo.localapp
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import org.mongodb.scala._
- import scala.util._
- import scala.collection.JavaConverters._
- import sdp.mongo.engine._
- import org.mongodb.scala.model._
- import akka.stream.scaladsl.{Sink, Source}
- import org.bson.codecs.configuration.CodecRegistry
- import org.mongodb.scala.bson.{BsonDocument, BsonValue}
- import scalikejdbc._
- import sdp.jdbc.engine._
- import sdp.jdbc.config._
- object ProtoTests extends App {
- import MGOContext._
- import MGOEngine._
- import MGOCommands._
- import MongoActionStream._
- import MgoProtoConvertion._
- import org.mongodb.scala.model._
- import Projections._
- import Filters._
- implicit val system = ActorSystem()
- implicit val mat = ActorMaterializer()
- implicit val ec = system.dispatcher
- val clientSettings: MongoClientSettings = MongoClientSettings.builder()
- .applyToClusterSettings {b =>
- b.hosts(List(new ServerAddress("localhost")).asJava)
- }.build()
- implicit val client: MongoClient = MongoClient(clientSettings)
- val eqState = equal("state","California")
- val proj = exclude("rowid","_id")
- val rtxfmr = Seq(
- ResultOptions(
- optType = FOD_LIMIT,
- valueParam = 3)
- ,ResultOptions(
- optType = FOD_PROJECTION,
- bsonParam = Some(proj))
- )
- val protoCtx = MGOProtoMsg(
- dbName = "testdb",
- collName = "aqmrpt",
- commandType = MGO_COMMAND_FIND,
- bsonParam = Seq(eqState),
- resultOptions = rtxfmr
- ).toProto
- val findCtx = CtxFromProto(protoCtx)
- val futFind = mgoQuery[Seq[Document]](findCtx)
- futFind.onComplete {
- case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
- case Failure(e) => println(e.getMessage)
- }
- scala.io.StdIn.readLine()
- system.terminate()
- }