经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
PICE(4):MongoDBStreaming - gRPC Protobuf conversion
来源:cnblogs  作者:雪川大虫  时间:2018/9/25 20:27:58  对本文有异议

   前两篇我们介绍了JDBC和Cassandra的gRPC streaming实现。相对MongoDB来说,JDBC和Cassandra支持字符类型的query语句SQL,CQL,所以把query指令转换成protobuf structures是简单直接的。而MongoDB没有提供字符类的query,所以我们必须进行MongoDB query涉及的所有类型与protobuf类型的相互转换,实现gRPC功能会复杂的多。我们在这篇讨论里先介绍MongoDB query的protobuf转换。

在前面的MongoDB-Engine讨论里我们设计了个MGOContext作为JVM内部传递MongoDB query的数据结构:

  1. case class MGOContext(
  2. dbName: String,
  3. collName: String,
  4. actionType: MGO_ACTION_TYPE = MGO_QUERY,
  5. action: Option[MGOCommands] = None
  6. ) {
  7. ctx =>
  8. def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
  9. def setCollName(name: String): MGOContext = ctx.copy(collName = name)
  10. def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
  11. def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
  12. }

 

下面是这个结构支持的action清单:

  1. object MGOCommands {
  2. case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
  3. case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
  4. /* org.mongodb.scala.FindObservable
  5. import com.mongodb.async.client.FindIterable
  6. val resultDocType = FindIterable[Document]
  7. val resultOption = FindObservable(resultDocType)
  8. .maxScan(...)
  9. .limit(...)
  10. .sort(...)
  11. .project(...) */
  12. case class Find(filter: Option[Bson] = None,
  13. andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
  14. firstOnly: Boolean = false) extends MGOCommands
  15. case class DocumentStream(filter: Option[Bson] = None,
  16. andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
  17. ) extends MGOCommands
  18. case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
  19. case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
  20. case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
  21. case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  22. case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
  23. case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  24. case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
  25. }
  26. object MGOAdmins {
  27. case class DropCollection(collName: String) extends MGOCommands
  28. case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
  29. case class ListCollection(dbName: String) extends MGOCommands
  30. case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
  31. case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
  32. case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
  33. case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
  34. case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
  35. }

可以看到,我们必须把Bson、Document、FindObservable这几个类型对应到protobuf格式。下面是.proto文件里的部分内容:

  1. message MGODocument {
  2. bytes document = 1;
  3. }
  4. message MGOBson {
  5. bytes bson = 1;
  6. }
  7. message ResultTransformer { //FindObservable
  8. int32 optType = 1;
  9. MGOBson bsonParam = 2;
  10. int32 valueParam = 3;
  11. }
  12. message MGOAdminOptons {
  13. string tarName = 1;
  14. repeated MGOBson bsonParam = 2;
  15. OptionAny options = 3;
  16. string objName = 4;
  17. }
  18. message MGOOperations { //MGOContext
  19. string dbName = 1;
  20. string collName = 2;
  21. int32 commandType = 3;
  22. repeated MGOBson bsonParam = 4;
  23. repeated ResultTransformer resultOptions = 5;
  24. OptionAny options = 6;
  25. repeated MGODocument documents = 7;
  26. google.protobuf.BoolValue only = 8;
  27. MGOAdminOptons adminOptions = 9;
  28. }

首先,Document是个serializable类,可以直接进行序列/反序列化:

  1. val po = Document (
  2. "ponum" -> "po18012301",
  3. "vendor" -> "The smartphone compay",
  4. "remarks" -> "urgent, rush order",
  5. "podtl" -> Seq(
  6. Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
  7. Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
  8. )
  9. )
  10. println(po)
  11. val pobytes = marshal(po)
  12. println(s"po bytes: ${pobytes}")
  13. val po1 = unmarshal[Document](pobytes)
  14. println(s"back to po document: $po1")

下一个是Bson,它是个java interface:

  1. /**
  2. * An interface for types that are able to render themselves into a {@code BsonDocument}.
  3. *
  4. * @since 3.0
  5. */
  6. public interface Bson {
  7. /**
  8. * Render the filter into a BsonDocument.
  9. *
  10. * @param documentClass the document class in scope for the collection. This parameter may be ignored, but it may be used to alter
  11. * the structure of the returned {@code BsonDocument} based on some knowledge of the document class.
  12. * @param codecRegistry the codec registry. This parameter may be ignored, but it may be used to look up {@code Codec} instances for
  13. * the document class or any other related class.
  14. * @param <TDocument> the type of the document class
  15. * @return the BsonDocument
  16. */
  17. <TDocument> BsonDocument toBsonDocument(Class<TDocument> documentClass, CodecRegistry codecRegistry);
  18. }

Bson只是一个interface,不是serilizable,不过BsonDocument可以:

  1. /**
  2. * A type-safe container for a BSON document. This class should NOT be sub-classed by third parties.
  3. *
  4. * @since 3.0
  5. */
  6. public class BsonDocument extends BsonValue implements Map<String, BsonValue>, Cloneable, Bson, Serializable {...}

所以我们可以用BsonDocument来进行序列/反序列后在再用它来构建一个新的Bson对象:

  1. def bsonToProto(bson: Bson) =
  2. MGOBson(marshal(bson.toBsonDocument(
  3. classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
  4. def protoToBson(proto: MGOBson): Bson = new Bson {
  5. val bsdoc = unmarshal[BsonDocument](proto.bson)
  6. override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
  7. }

最后是这个FindObservable:这个类型的应用场景是这样的:

  1. /* org.mongodb.scala.FindObservable
  2. import com.mongodb.async.client.FindIterable
  3. val resultDocType = FindIterable[Document]
  4. val resultOption = FindObservable(resultDocType)
  5. .maxScan(...)
  6. .limit(...)
  7. .sort(...)
  8. .project(...) */
  9. case class Find(filter: Option[Bson] = None,
  10. andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
  11. firstOnly: Boolean = false) extends MGOCommands

FindObservable类型的效果可以是一连串施用的结果,因为是FindObservable[A] => FindObservable[A]这样的款式,所以我们可以用一串FindObservable[Document]来进行序列/反序列化处理,然后再重新串连施用来获得最终的FindObservable。FindObservable对应的protobuf结构如下:

  1. message ResultTransformer { //FindObservable
  2. int32 optType = 1;
  3. MGOBson bsonParam = 2;
  4. int32 valueParam = 3;
  5. }
  6. type FOD_TYPE = Int
  7. val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
  8. val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
  9. val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
  10. val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
  11. val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
  12. //Sets a document describing the fields to return for all matching documents
  13. val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
  14. val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
  15. //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
  16. val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
  17. //Sets the cursor type
  18. val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
  19. //Sets the hint for which index to use. A null value means no hint is set
  20. val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
  21. //Sets the exclusive upper bound for a specific index. A null value means no max is set
  22. val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
  23. //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
  24. val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
  25. //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
  26. val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
  27. //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
  28. case class ResultOptions(
  29. optType: FOD_TYPE,
  30. bsonParam: Option[Bson] = None,
  31. valueParam: Int = 0 ){
  32. def toProto = new sdp.grpc.services.ResultTransformer(
  33. optType = this.optType,
  34. bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
  35. valueParam = this.valueParam
  36. )
  37. def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
  38. optType match {
  39. case FOD_FIRST => find
  40. case FOD_FILTER => find.filter(bsonParam.get)
  41. case FOD_LIMIT => find.limit(valueParam)
  42. case FOD_SKIP => find.skip(valueParam)
  43. case FOD_PROJECTION => find.projection(bsonParam.get)
  44. case FOD_SORT => find.sort(bsonParam.get)
  45. case FOD_PARTIAL => find.partial(valueParam != 0)
  46. case FOD_CURSORTYPE => find
  47. case FOD_HINT => find.hint(bsonParam.get)
  48. case FOD_MAX => find.max(bsonParam.get)
  49. case FOD_MIN => find.min(bsonParam.get)
  50. case FOD_RETURNKEY => find.returnKey(valueParam != 0)
  51. case FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
  52. }
  53. }
  54. }
  55. object ResultOptions {
  56. def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
  57. optType = msg.optType,
  58. bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
  59. valueParam = msg.valueParam
  60. )
  61. }

我们可以用这个ResultOptions类型的toProto,fromProto来进行protobuf的转换处理。然后用aggregation实现连串施用:

  1. def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
  2. rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))

下面这个函数示范了Find Context的反序列:

  1. def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
  2. case MGO_COMMAND_FIND => {
  3. var ctx = new MGOContext(
  4. dbName = proto.dbName,
  5. collName = proto.collName,
  6. actionType = MGO_QUERY,
  7. action = Some(Find())
  8. )
  9. def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
  10. rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
  11. (proto.bsonParam, proto.resultOptions, proto.only) match {
  12. case (Nil, Nil, None) => ctx
  13. case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
  14. case (bp,Nil,None) => ctx.setCommand(
  15. Find(Some(protoToBson(bp.head)),None,false))
  16. case (bp,Nil,Some(b)) => ctx.setCommand(
  17. Find(Some(protoToBson(bp.head)),None,b))
  18. case (bp,fo,None) => {
  19. ctx.setCommand(
  20. Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
  21. }
  22. case (bp,fo,Some(b)) => {
  23. ctx.setCommand(
  24. Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
  25. }
  26. case _ => ctx
  27. }
  28. }
  29. }

具体的应用示范例如下:

  1. val eqState = equal("state","California")
  2. val proj = exclude("rowid","_id")
  3. val rtxfmr = Seq(
  4. ResultOptions(
  5. optType = FOD_LIMIT,
  6. valueParam = 3)
  7. ,ResultOptions(
  8. optType = FOD_PROJECTION,
  9. bsonParam = Some(proj))
  10. )
  11. val protoCtx = MGOProtoMsg(
  12. dbName = "testdb",
  13. collName = "aqmrpt",
  14. commandType = MGO_COMMAND_FIND,
  15. bsonParam = Seq(eqState),
  16. resultOptions = rtxfmr
  17. ).toProto
  18. val findCtx = CtxFromProto(protoCtx)
  19. val futFind = mgoQuery[Seq[Document]](findCtx)
  20. futFind.onComplete {
  21. case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
  22. case Failure(e) => println(e.getMessage)
  23. } 

下面是本次讨论的部分源代码:

MongoDBEngine.scala

  1. package sdp.mongo.engine
  2. import java.text.SimpleDateFormat
  3. import akka.NotUsed
  4. import akka.stream.alpakka.mongodb.scaladsl._
  5. import akka.stream.scaladsl.{Flow, Sink, Source}
  6. import org.mongodb.scala.MongoClient
  7. import org.mongodb.scala.bson.collection.immutable.Document
  8. import org.bson.conversions.Bson
  9. import org.mongodb.scala._
  10. import org.mongodb.scala.model._
  11. import java.util.Calendar
  12. import scala.collection.JavaConverters._
  13. import sdp.file.Streaming._
  14. import akka.stream.Materializer
  15. import org.mongodb.scala.bson.{BsonArray, BsonBinary}
  16. import scala.concurrent._
  17. import scala.concurrent.duration._
  18. import sdp.logging.LogSupport
  19. object MGOContext {
  20. type MGO_ACTION_TYPE = Int
  21. val MGO_QUERY = 0
  22. val MGO_UPDATE = 1
  23. val MGO_ADMIN = 2
  24. trait MGOCommands
  25. object MGOCommands {
  26. case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands
  27. case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands
  28. /* org.mongodb.scala.FindObservable
  29. import com.mongodb.async.client.FindIterable
  30. val resultDocType = FindIterable[Document]
  31. val resultOption = FindObservable(resultDocType)
  32. .maxScan(...)
  33. .limit(...)
  34. .sort(...)
  35. .project(...) */
  36. case class Find(filter: Option[Bson] = None,
  37. andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
  38. firstOnly: Boolean = false) extends MGOCommands
  39. case class DocumentStream(filter: Option[Bson] = None,
  40. andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
  41. ) extends MGOCommands
  42. case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
  43. case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
  44. case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
  45. case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  46. case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
  47. case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  48. case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
  49. }
  50. object MGOAdmins {
  51. case class DropCollection(collName: String) extends MGOCommands
  52. case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
  53. case class ListCollection(dbName: String) extends MGOCommands
  54. case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
  55. case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
  56. case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
  57. case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
  58. case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
  59. }
  60. case class MGOContext(
  61. dbName: String,
  62. collName: String,
  63. actionType: MGO_ACTION_TYPE = MGO_QUERY,
  64. action: Option[MGOCommands] = None
  65. ) {
  66. ctx =>
  67. def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
  68. def setCollName(name: String): MGOContext = ctx.copy(collName = name)
  69. def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
  70. def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
  71. }
  72. object MGOContext {
  73. def apply(db: String, coll: String) = new MGOContext(db, coll)
  74. }
  75. case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
  76. ctxs =>
  77. def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
  78. def appendContext(ctx: MGOContext): MGOBatContext =
  79. ctxs.copy(contexts = contexts :+ ctx)
  80. }
  81. object MGOBatContext {
  82. def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
  83. }
  84. type MGODate = java.util.Date
  85. def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
  86. val ca = Calendar.getInstance()
  87. ca.set(yyyy,mm,dd)
  88. ca.getTime()
  89. }
  90. def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
  91. val ca = Calendar.getInstance()
  92. ca.set(yyyy,mm,dd,hr,min,sec)
  93. ca.getTime()
  94. }
  95. def mgoDateTimeNow: MGODate = {
  96. val ca = Calendar.getInstance()
  97. ca.getTime
  98. }
  99. def mgoDateToString(dt: MGODate, formatString: String): String = {
  100. val fmt= new SimpleDateFormat(formatString)
  101. fmt.format(dt)
  102. }
  103. type MGOBlob = BsonBinary
  104. type MGOArray = BsonArray
  105. def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  106. implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
  107. def mgoBlobToFile(blob: MGOBlob, fileName: String)(
  108. implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName)
  109. def mgoGetStringOrNone(doc: Document, fieldName: String) = {
  110. if (doc.keySet.contains(fieldName))
  111. Some(doc.getString(fieldName))
  112. else None
  113. }
  114. def mgoGetIntOrNone(doc: Document, fieldName: String) = {
  115. if (doc.keySet.contains(fieldName))
  116. Some(doc.getInteger(fieldName))
  117. else None
  118. }
  119. def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
  120. if (doc.keySet.contains(fieldName))
  121. Some(doc.getLong(fieldName))
  122. else None
  123. }
  124. def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
  125. if (doc.keySet.contains(fieldName))
  126. Some(doc.getDouble(fieldName))
  127. else None
  128. }
  129. def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
  130. if (doc.keySet.contains(fieldName))
  131. Some(doc.getBoolean(fieldName))
  132. else None
  133. }
  134. def mgoGetDateOrNone(doc: Document, fieldName: String) = {
  135. if (doc.keySet.contains(fieldName))
  136. Some(doc.getDate(fieldName))
  137. else None
  138. }
  139. def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
  140. if (doc.keySet.contains(fieldName))
  141. doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
  142. else None
  143. }
  144. def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
  145. if (doc.keySet.contains(fieldName))
  146. doc.get(fieldName).asInstanceOf[Option[MGOArray]]
  147. else None
  148. }
  149. def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
  150. (arr.getValues.asScala.toList)
  151. .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
  152. }
  153. type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
  154. }
  155. object MGOEngine extends LogSupport {
  156. import MGOContext._
  157. import MGOCommands._
  158. import MGOAdmins._
  159. object TxUpdateMode {
  160. private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
  161. implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
  162. log.info(s"mgoTxUpdate> calling ...")
  163. observable.map(clientSession => {
  164. val transactionOptions =
  165. TransactionOptions.builder()
  166. .readConcern(ReadConcern.SNAPSHOT)
  167. .writeConcern(WriteConcern.MAJORITY).build()
  168. clientSession.startTransaction(transactionOptions)
  169. val fut = Future.traverse(ctxs.contexts) { ctx =>
  170. mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
  171. }
  172. Await.ready(fut, 3 seconds)
  173. clientSession
  174. })
  175. }
  176. private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  177. log.info(s"commitAndRetry> calling ...")
  178. observable.recoverWith({
  179. case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
  180. log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
  181. commitAndRetry(observable)
  182. }
  183. case e: Exception => {
  184. log.error(s"commitAndRetry> Exception during commit ...: $e")
  185. throw e
  186. }
  187. })
  188. }
  189. private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  190. log.info(s"runTransactionAndRetry> calling ...")
  191. observable.recoverWith({
  192. case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
  193. log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
  194. runTransactionAndRetry(observable)
  195. }
  196. })
  197. }
  198. def mgoTxBatch(ctxs: MGOBatContext)(
  199. implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
  200. log.info(s"mgoTxBatch> MGOBatContext: ${ctxs}")
  201. val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
  202. val commitTransactionObservable: SingleObservable[Completed] =
  203. updateObservable.flatMap(clientSession => clientSession.commitTransaction())
  204. val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
  205. runTransactionAndRetry(commitAndRetryObservable)
  206. }.toFuture()
  207. }
  208. def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): Future[Completed] = {
  209. log.info(s"mgoUpdateBatch> MGOBatContext: ${ctxs}")
  210. if (ctxs.tx) {
  211. TxUpdateMode.mgoTxBatch(ctxs)
  212. } else {
  213. val fut = Future.traverse(ctxs.contexts) { ctx =>
  214. mgoUpdate[Completed](ctx).map(identity) }
  215. Await.ready(fut, 3 seconds)
  216. Future.successful(new Completed)
  217. }
  218. }
  219. // T => FindIterable e.g List[Document]
  220. def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): Future[T] = {
  221. log.info(s"mgoQuery> MGOContext: ${ctx}")
  222. val db = client.getDatabase(ctx.dbName)
  223. val coll = db.getCollection(ctx.collName)
  224. if ( ctx.action == None) {
  225. log.error(s"mgoQuery> uery action cannot be null!")
  226. throw new IllegalArgumentException("query action cannot be null!")
  227. }
  228. ctx.action.get match {
  229. /* count */
  230. case Count(Some(filter), Some(opt)) => //SingleObservable
  231. coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
  232. .toFuture().asInstanceOf[Future[T]]
  233. case Count(Some(filter), None) => //SingleObservable
  234. coll.countDocuments(filter).toFuture()
  235. .asInstanceOf[Future[T]]
  236. case Count(None, None) => //SingleObservable
  237. coll.countDocuments().toFuture()
  238. .asInstanceOf[Future[T]]
  239. /* distinct */
  240. case Distict(field, Some(filter)) => //DistinctObservable
  241. coll.distinct(field, filter).toFuture()
  242. .asInstanceOf[Future[T]]
  243. case Distict(field, None) => //DistinctObservable
  244. coll.distinct((field)).toFuture()
  245. .asInstanceOf[Future[T]]
  246. /* find */
  247. case Find(None, None, false) => //FindObservable
  248. if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
  249. else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
  250. case Find(None, None, true) => //FindObservable
  251. if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
  252. else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
  253. case Find(Some(filter), None, false) => //FindObservable
  254. if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
  255. else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  256. case Find(Some(filter), None, true) => //FindObservable
  257. if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
  258. else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
  259. case Find(None, Some(next), _) => //FindObservable
  260. if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
  261. else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  262. case Find(Some(filter), Some(next), _) => //FindObservable
  263. if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
  264. else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  265. /* aggregate AggregateObservable*/
  266. case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
  267. /* mapReduce MapReduceObservable*/
  268. case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
  269. /* list collection */
  270. case ListCollection(dbName) => //ListConllectionObservable
  271. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  272. }
  273. }
  274. //T => Completed, result.UpdateResult, result.DeleteResult
  275. def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] =
  276. mgoUpdateObservable[T](ctx).toFuture()
  277. def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
  278. log.info(s"mgoUpdateObservable> MGOContext: ${ctx}")
  279. val db = client.getDatabase(ctx.dbName)
  280. val coll = db.getCollection(ctx.collName)
  281. if ( ctx.action == None) {
  282. log.error(s"mgoUpdateObservable> uery action cannot be null!")
  283. throw new IllegalArgumentException("query action cannot be null!")
  284. }
  285. ctx.action.get match {
  286. /* insert */
  287. case Insert(docs, Some(opt)) => //SingleObservable[Completed]
  288. if (docs.size > 1)
  289. coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
  290. else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
  291. case Insert(docs, None) => //SingleObservable
  292. if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
  293. else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
  294. /* delete */
  295. case Delete(filter, None, onlyOne) => //SingleObservable
  296. if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
  297. else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
  298. case Delete(filter, Some(opt), onlyOne) => //SingleObservable
  299. if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
  300. else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
  301. /* replace */
  302. case Replace(filter, replacement, None) => //SingleObservable
  303. coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
  304. case Replace(filter, replacement, Some(opt)) => //SingleObservable
  305. coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
  306. /* update */
  307. case Update(filter, update, None, onlyOne) => //SingleObservable
  308. if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
  309. else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
  310. case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
  311. if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
  312. else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
  313. /* bulkWrite */
  314. case BulkWrite(commands, None) => //SingleObservable
  315. coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
  316. case BulkWrite(commands, Some(opt)) => //SingleObservable
  317. coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
  318. }
  319. }
  320. def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): SingleObservable[Completed] = {
  321. log.info(s"mgoAdmin> MGOContext: ${ctx}")
  322. val db = client.getDatabase(ctx.dbName)
  323. val coll = db.getCollection(ctx.collName)
  324. if ( ctx.action == None) {
  325. log.error(s"mgoAdmin> uery action cannot be null!")
  326. throw new IllegalArgumentException("query action cannot be null!")
  327. }
  328. ctx.action.get match {
  329. /* drop collection */
  330. case DropCollection(collName) => //SingleObservable
  331. val coll = db.getCollection(collName)
  332. coll.drop()
  333. /* create collection */
  334. case CreateCollection(collName, None) => //SingleObservable
  335. db.createCollection(collName)
  336. case CreateCollection(collName, Some(opt)) => //SingleObservable
  337. db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions])
  338. /* list collection
  339. case ListCollection(dbName) => //ListConllectionObservable
  340. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  341. */
  342. /* create view */
  343. case CreateView(viewName, viewOn, pline, None) => //SingleObservable
  344. db.createView(viewName, viewOn, pline)
  345. case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
  346. db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions])
  347. /* create index */
  348. case CreateIndex(key, None) => //SingleObservable
  349. coll.createIndex(key).asInstanceOf[SingleObservable[Completed]]
  350. case CreateIndex(key, Some(opt)) => //SingleObservable
  351. coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[SingleObservable[Completed]]
  352. /* drop index */
  353. case DropIndexByName(indexName, None) => //SingleObservable
  354. coll.dropIndex(indexName)
  355. case DropIndexByName(indexName, Some(opt)) => //SingleObservable
  356. coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions])
  357. case DropIndexByKey(key, None) => //SingleObservable
  358. coll.dropIndex(key)
  359. case DropIndexByKey(key, Some(opt)) => //SingleObservable
  360. coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions])
  361. case DropAllIndexes(None) => //SingleObservable
  362. coll.dropIndexes()
  363. case DropAllIndexes(Some(opt)) => //SingleObservable
  364. coll.dropIndexes(opt.asInstanceOf[DropIndexOptions])
  365. }
  366. }
  367. /*
  368. def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
  369. val db = client.getDatabase(ctx.dbName)
  370. val coll = db.getCollection(ctx.collName)
  371. ctx.action match {
  372. /* count */
  373. case Count(Some(filter), Some(opt)) => //SingleObservable
  374. coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
  375. .toFuture().asInstanceOf[Future[T]]
  376. case Count(Some(filter), None) => //SingleObservable
  377. coll.countDocuments(filter).toFuture()
  378. .asInstanceOf[Future[T]]
  379. case Count(None, None) => //SingleObservable
  380. coll.countDocuments().toFuture()
  381. .asInstanceOf[Future[T]]
  382. /* distinct */
  383. case Distict(field, Some(filter)) => //DistinctObservable
  384. coll.distinct(field, filter).toFuture()
  385. .asInstanceOf[Future[T]]
  386. case Distict(field, None) => //DistinctObservable
  387. coll.distinct((field)).toFuture()
  388. .asInstanceOf[Future[T]]
  389. /* find */
  390. case Find(None, None, optConv, false) => //FindObservable
  391. if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
  392. else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
  393. case Find(None, None, optConv, true) => //FindObservable
  394. if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
  395. else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
  396. case Find(Some(filter), None, optConv, false) => //FindObservable
  397. if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
  398. else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  399. case Find(Some(filter), None, optConv, true) => //FindObservable
  400. if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
  401. else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
  402. case Find(None, Some(next), optConv, _) => //FindObservable
  403. if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
  404. else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  405. case Find(Some(filter), Some(next), optConv, _) => //FindObservable
  406. if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
  407. else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  408. /* aggregate AggregateObservable*/
  409. case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
  410. /* mapReduce MapReduceObservable*/
  411. case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
  412. /* insert */
  413. case Insert(docs, Some(opt)) => //SingleObservable[Completed]
  414. if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
  415. .asInstanceOf[Future[T]]
  416. else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
  417. .asInstanceOf[Future[T]]
  418. case Insert(docs, None) => //SingleObservable
  419. if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
  420. else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
  421. /* delete */
  422. case Delete(filter, None, onlyOne) => //SingleObservable
  423. if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
  424. else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
  425. case Delete(filter, Some(opt), onlyOne) => //SingleObservable
  426. if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
  427. else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
  428. /* replace */
  429. case Replace(filter, replacement, None) => //SingleObservable
  430. coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
  431. case Replace(filter, replacement, Some(opt)) => //SingleObservable
  432. coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  433. /* update */
  434. case Update(filter, update, None, onlyOne) => //SingleObservable
  435. if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
  436. else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
  437. case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
  438. if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  439. else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  440. /* bulkWrite */
  441. case BulkWrite(commands, None) => //SingleObservable
  442. coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
  443. case BulkWrite(commands, Some(opt)) => //SingleObservable
  444. coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
  445. /* drop collection */
  446. case DropCollection(collName) => //SingleObservable
  447. val coll = db.getCollection(collName)
  448. coll.drop().toFuture().asInstanceOf[Future[T]]
  449. /* create collection */
  450. case CreateCollection(collName, None) => //SingleObservable
  451. db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
  452. case CreateCollection(collName, Some(opt)) => //SingleObservable
  453. db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
  454. /* list collection */
  455. case ListCollection(dbName) => //ListConllectionObservable
  456. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  457. /* create view */
  458. case CreateView(viewName, viewOn, pline, None) => //SingleObservable
  459. db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
  460. case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
  461. db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
  462. /* create index */
  463. case CreateIndex(key, None) => //SingleObservable
  464. coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
  465. case CreateIndex(key, Some(opt)) => //SingleObservable
  466. coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
  467. /* drop index */
  468. case DropIndexByName(indexName, None) => //SingleObservable
  469. coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
  470. case DropIndexByName(indexName, Some(opt)) => //SingleObservable
  471. coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  472. case DropIndexByKey(key, None) => //SingleObservable
  473. coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
  474. case DropIndexByKey(key, Some(opt)) => //SingleObservable
  475. coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  476. case DropAllIndexes(None) => //SingleObservable
  477. coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
  478. case DropAllIndexes(Some(opt)) => //SingleObservable
  479. coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  480. }
  481. }
  482. */
  483. def mongoStream(ctx: MGOContext)(
  484. implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
  485. log.info(s"mongoStream> MGOContext: ${ctx}")
  486. val db = client.getDatabase(ctx.dbName)
  487. val coll = db.getCollection(ctx.collName)
  488. if ( ctx.action == None) {
  489. log.error(s"mongoStream> uery action cannot be null!")
  490. throw new IllegalArgumentException("query action cannot be null!")
  491. }
  492. ctx.action.get match {
  493. case DocumentStream(None, None) =>
  494. MongoSource(coll.find())
  495. case DocumentStream(Some(filter), None) =>
  496. MongoSource(coll.find(filter))
  497. case DocumentStream(None, Some(next)) =>
  498. MongoSource(next(coll.find()))
  499. case DocumentStream(Some(filter), Some(next)) =>
  500. MongoSource(next(coll.find(filter)))
  501. }
  502. }
  503. }
  504. object MongoActionStream {
  505. import MGOContext._
  506. case class StreamingInsert[A](dbName: String,
  507. collName: String,
  508. converter: A => Document,
  509. parallelism: Int = 1
  510. ) extends MGOCommands
  511. case class StreamingDelete[A](dbName: String,
  512. collName: String,
  513. toFilter: A => Bson,
  514. parallelism: Int = 1,
  515. justOne: Boolean = false
  516. ) extends MGOCommands
  517. case class StreamingUpdate[A](dbName: String,
  518. collName: String,
  519. toFilter: A => Bson,
  520. toUpdate: A => Bson,
  521. parallelism: Int = 1,
  522. justOne: Boolean = false
  523. ) extends MGOCommands
  524. case class InsertAction[A](ctx: StreamingInsert[A])(
  525. implicit mongoClient: MongoClient) {
  526. val database = mongoClient.getDatabase(ctx.dbName)
  527. val collection = database.getCollection(ctx.collName)
  528. def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
  529. Flow[A].map(ctx.converter)
  530. .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
  531. }
  532. case class UpdateAction[A](ctx: StreamingUpdate[A])(
  533. implicit mongoClient: MongoClient) {
  534. val database = mongoClient.getDatabase(ctx.dbName)
  535. val collection = database.getCollection(ctx.collName)
  536. def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
  537. if (ctx.justOne) {
  538. Flow[A]
  539. .mapAsync(ctx.parallelism)(a =>
  540. collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
  541. } else
  542. Flow[A]
  543. .mapAsync(ctx.parallelism)(a =>
  544. collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
  545. }
  546. case class DeleteAction[A](ctx: StreamingDelete[A])(
  547. implicit mongoClient: MongoClient) {
  548. val database = mongoClient.getDatabase(ctx.dbName)
  549. val collection = database.getCollection(ctx.collName)
  550. def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
  551. if (ctx.justOne) {
  552. Flow[A]
  553. .mapAsync(ctx.parallelism)(a =>
  554. collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
  555. } else
  556. Flow[A]
  557. .mapAsync(ctx.parallelism)(a =>
  558. collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
  559. }
  560. }
  561. object MGOHelpers {
  562. implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
  563. override val converter: (Document) => String = (doc) => doc.toJson
  564. }
  565. implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
  566. override val converter: (C) => String = (doc) => doc.toString
  567. }
  568. trait ImplicitObservable[C] {
  569. val observable: Observable[C]
  570. val converter: (C) => String
  571. def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
  572. def headResult() = Await.result(observable.head(), 10 seconds)
  573. def printResults(initial: String = ""): Unit = {
  574. if (initial.length > 0) print(initial)
  575. results().foreach(res => println(converter(res)))
  576. }
  577. def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  578. }
  579. def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
  580. Await.result(fut, timeOut)
  581. }
  582. def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
  583. Await.result(fut, timeOut)
  584. }
  585. import monix.eval.Task
  586. import monix.execution.Scheduler.Implicits.global
  587. final class FutureToTask[A](x: => Future[A]) {
  588. def asTask: Task[A] = Task.deferFuture[A](x)
  589. }
  590. final class TaskToFuture[A](x: => Task[A]) {
  591. def asFuture: Future[A] = x.runAsync
  592. }
  593. }

MgoProtoConversion.scala

  1. package sdp.mongo.engine
  2. import org.mongodb.scala.bson.collection.immutable.Document
  3. import org.bson.conversions.Bson
  4. import sdp.grpc.services._
  5. import protobuf.bytes.Converter._
  6. import com.google.protobuf.ByteString
  7. import MGOContext._
  8. import MGOAdmins._
  9. import MGOCommands._
  10. import org.bson.BsonDocument
  11. import org.bson.codecs.configuration.CodecRegistry
  12. import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
  13. import org.mongodb.scala.FindObservable
  14. object MgoProtoConvertion {
  15. /* org.mongodb.scala.FindObservable
  16. import com.mongodb.async.client.FindIterable
  17. val resultDocType = FindIterable[Document]
  18. val resultOption = FindObservable(resultDocType)
  19. .maxScan(...)
  20. .limit(...)
  21. .sort(...)
  22. .project(...) */
  23. type FOD_TYPE = Int
  24. val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
  25. val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
  26. val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
  27. val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
  28. val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
  29. //Sets a document describing the fields to return for all matching documents
  30. val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
  31. val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
  32. //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
  33. val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
  34. //Sets the cursor type
  35. val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
  36. //Sets the hint for which index to use. A null value means no hint is set
  37. val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
  38. //Sets the exclusive upper bound for a specific index. A null value means no max is set
  39. val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
  40. //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
  41. val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
  42. //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
  43. val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
  44. //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
  45. case class ResultOptions(
  46. optType: FOD_TYPE,
  47. bsonParam: Option[Bson] = None,
  48. valueParam: Int = 0 ){
  49. def toProto = new sdp.grpc.services.ResultTransformer(
  50. optType = this.optType,
  51. bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
  52. valueParam = this.valueParam
  53. )
  54. def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
  55. optType match {
  56. case FOD_FIRST => find
  57. case FOD_FILTER => find.filter(bsonParam.get)
  58. case FOD_LIMIT => find.limit(valueParam)
  59. case FOD_SKIP => find.skip(valueParam)
  60. case FOD_PROJECTION => find.projection(bsonParam.get)
  61. case FOD_SORT => find.sort(bsonParam.get)
  62. case FOD_PARTIAL => find.partial(valueParam != 0)
  63. case FOD_CURSORTYPE => find
  64. case FOD_HINT => find.hint(bsonParam.get)
  65. case FOD_MAX => find.max(bsonParam.get)
  66. case FOD_MIN => find.min(bsonParam.get)
  67. case FOD_RETURNKEY => find.returnKey(valueParam != 0)
  68. case FOD_SHOWRECORDID => find.showRecordId(valueParam != 0)
  69. }
  70. }
  71. }
  72. object ResultOptions {
  73. def fromProto(msg: sdp.grpc.services.ResultTransformer) = new ResultOptions(
  74. optType = msg.optType,
  75. bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
  76. valueParam = msg.valueParam
  77. )
  78. }
  79. type MGO_COMMAND_TYPE = Int
  80. val MGO_COMMAND_FIND = 0
  81. val MGO_COMMAND_COUNT = 20
  82. val MGO_COMMAND_DISTICT = 21
  83. val MGO_COMMAND_DOCUMENTSTREAM = 1
  84. val MGO_COMMAND_AGGREGATE = 2
  85. val MGO_COMMAND_INSERT = 3
  86. val MGO_COMMAND_DELETE = 4
  87. val MGO_COMMAND_REPLACE = 5
  88. val MGO_COMMAND_UPDATE = 6
  89. val MGO_ADMIN_DROPCOLLECTION = 8
  90. val MGO_ADMIN_CREATECOLLECTION = 9
  91. val MGO_ADMIN_LISTCOLLECTION = 10
  92. val MGO_ADMIN_CREATEVIEW = 11
  93. val MGO_ADMIN_CREATEINDEX = 12
  94. val MGO_ADMIN_DROPINDEXBYNAME = 13
  95. val MGO_ADMIN_DROPINDEXBYKEY = 14
  96. val MGO_ADMIN_DROPALLINDEXES = 15
  97. case class MGOAdminCtx(
  98. tarName: String = "",
  99. bsonParam: Seq[Bson] = Nil,
  100. options: Option[Any] = None,
  101. objName: String = ""
  102. ){
  103. def toProto = sdp.grpc.services.MGOAdminOptons(
  104. tarName = this.tarName,
  105. bsonParam = this.bsonParam.map {b => sdp.grpc.services.MGOBson(marshal(b))},
  106. objName = this.objName,
  107. options = this.options.map(b => OptionAny(marshal(b)))
  108. )
  109. }
  110. object MGOAdminCtx {
  111. def fromProto(msg: sdp.grpc.services.MGOAdminOptons) = new MGOAdminCtx(
  112. tarName = msg.tarName,
  113. bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson))
  114. )
  115. }
  116. case class MGOProtoMsg(
  117. dbName: String = "",
  118. collName: String = "",
  119. commandType: MGO_COMMAND_TYPE,
  120. bsonParam: Seq[Bson] = Nil,
  121. resultOptions: Seq[ResultOptions] = Nil,
  122. options: ByteString = com.google.protobuf.ByteString.EMPTY,
  123. documents: Seq[Document] = Nil,
  124. only: Boolean = false,
  125. adminOptions: Option[MGOAdminCtx] = None
  126. ){
  127. def toProto = new sdp.grpc.services.MGOOperations(
  128. dbName = this.dbName,
  129. collName = this.collName,
  130. commandType = this.commandType,
  131. bsonParam = this.bsonParam.map(bsonToProto),
  132. resultOptions = this.resultOptions.map(_.toProto),
  133. documents = this.documents.map(d => sdp.grpc.services.MGODocument(marshal(d))),
  134. only = Some(this.only),
  135. adminOptions = this.adminOptions.map(_.toProto)
  136. )
  137. }
  138. object MGOProtoMsg {
  139. def fromProto(msg: sdp.grpc.services.MGOOperations) = new MGOProtoMsg(
  140. dbName = msg.dbName,
  141. collName = msg.collName,
  142. commandType = msg.commandType,
  143. bsonParam = msg.bsonParam.map(protoToBson),
  144. resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r))
  145. )
  146. }
  147. def bsonToProto(bson: Bson) =
  148. MGOBson(marshal(bson.toBsonDocument(
  149. classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
  150. def protoToBson(proto: MGOBson): Bson = new Bson {
  151. val bsdoc = unmarshal[BsonDocument](proto.bson)
  152. override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
  153. }
  154. def CtxFromProto(proto: MGOOperations): MGOContext = proto.commandType match {
  155. case MGO_COMMAND_FIND => {
  156. var ctx = new MGOContext(
  157. dbName = proto.dbName,
  158. collName = proto.collName,
  159. actionType = MGO_QUERY,
  160. action = Some(Find())
  161. )
  162. def toResultTransformer(rts: Seq[ResultTransformer]): FindObservable[Document] => FindObservable[Document] = findObj =>
  163. rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
  164. (proto.bsonParam, proto.resultOptions, proto.only) match {
  165. case (Nil, Nil, None) => ctx
  166. case (Nil, Nil, Some(b)) => ctx.setCommand(Find(None, None, b))
  167. case (bp,Nil,None) => ctx.setCommand(
  168. Find(Some(protoToBson(bp.head)),None,false))
  169. case (bp,Nil,Some(b)) => ctx.setCommand(
  170. Find(Some(protoToBson(bp.head)),None,b))
  171. case (bp,fo,None) => {
  172. ctx.setCommand(
  173. Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),false))
  174. }
  175. case (bp,fo,Some(b)) => {
  176. ctx.setCommand(
  177. Find(Some(protoToBson(bp.head)),Some(toResultTransformer(fo)),b))
  178. }
  179. case _ => ctx
  180. }
  181. }
  182. }
  183. }

BytesConverter.scala

  1. package protobuf.bytes
  2. import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
  3. import com.google.protobuf.ByteString
  4. object Converter {
  5. def marshal(value: Any): ByteString = {
  6. val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  7. val oos = new ObjectOutputStream(stream)
  8. oos.writeObject(value)
  9. oos.close()
  10. ByteString.copyFrom(stream.toByteArray())
  11. }
  12. def unmarshal[A](bytes: ByteString): A = {
  13. val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
  14. val value = ois.readObject()
  15. ois.close()
  16. value.asInstanceOf[A]
  17. }
  18. }

FindDemo.scala

  1. package demo.sdp.mgo.localapp
  2. import akka.actor.ActorSystem
  3. import akka.stream.ActorMaterializer
  4. import org.mongodb.scala._
  5. import scala.util._
  6. import scala.collection.JavaConverters._
  7. import sdp.mongo.engine._
  8. import org.mongodb.scala.model._
  9. import akka.stream.scaladsl.{Sink, Source}
  10. import org.bson.codecs.configuration.CodecRegistry
  11. import org.mongodb.scala.bson.{BsonDocument, BsonValue}
  12. import scalikejdbc._
  13. import sdp.jdbc.engine._
  14. import sdp.jdbc.config._
  15. object ProtoTests extends App {
  16. import MGOContext._
  17. import MGOEngine._
  18. import MGOCommands._
  19. import MongoActionStream._
  20. import MgoProtoConvertion._
  21. import org.mongodb.scala.model._
  22. import Projections._
  23. import Filters._
  24. implicit val system = ActorSystem()
  25. implicit val mat = ActorMaterializer()
  26. implicit val ec = system.dispatcher
  27. val clientSettings: MongoClientSettings = MongoClientSettings.builder()
  28. .applyToClusterSettings {b =>
  29. b.hosts(List(new ServerAddress("localhost")).asJava)
  30. }.build()
  31. implicit val client: MongoClient = MongoClient(clientSettings)
  32. val eqState = equal("state","California")
  33. val proj = exclude("rowid","_id")
  34. val rtxfmr = Seq(
  35. ResultOptions(
  36. optType = FOD_LIMIT,
  37. valueParam = 3)
  38. ,ResultOptions(
  39. optType = FOD_PROJECTION,
  40. bsonParam = Some(proj))
  41. )
  42. val protoCtx = MGOProtoMsg(
  43. dbName = "testdb",
  44. collName = "aqmrpt",
  45. commandType = MGO_COMMAND_FIND,
  46. bsonParam = Seq(eqState),
  47. resultOptions = rtxfmr
  48. ).toProto
  49. val findCtx = CtxFromProto(protoCtx)
  50. val futFind = mgoQuery[Seq[Document]](findCtx)
  51. futFind.onComplete {
  52. case Success(docs) => docs.asInstanceOf[Seq[Document]].foreach{doc => println(doc.toJson())}
  53. case Failure(e) => println(e.getMessage)
  54. }
  55. scala.io.StdIn.readLine()
  56. system.terminate()
  57. }

 

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号