Skip to content

Commit a7ae77a

Browse files
huaxingaozhengruifeng
authored andcommitted
[SPARK-30662][ML][PYSPARK] Put back the API changes for HasBlockSize in ALS/MLP
### What changes were proposed in this pull request? Add ```HasBlockSize``` in shared Params in both Scala and Python. Make ALS/MLP extend ```HasBlockSize``` ### Why are the changes needed? Add ```HasBlockSize ``` in ALS, so user can specify the blockSize. Make ```HasBlockSize``` a shared param so both ALS and MLP can use it. ### Does this PR introduce any user-facing change? Yes ```ALS.setBlockSize/getBlockSize``` ```ALSModel.setBlockSize/getBlockSize``` ### How was this patch tested? Manually tested. Also added doctest. Closes apache#27501 from huaxingao/spark_30662. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: zhengruifeng <[email protected]>
1 parent e1cd4d9 commit a7ae77a

File tree

8 files changed

+109
-55
lines changed

8 files changed

+109
-55
lines changed

mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala

+1-21
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
3434

3535
/** Params for Multilayer Perceptron. */
3636
private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
37-
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
37+
with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
3838

3939
import MultilayerPerceptronClassifier._
4040

@@ -54,26 +54,6 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
5454
@Since("1.5.0")
5555
final def getLayers: Array[Int] = $(layers)
5656

57-
/**
58-
* Block size for stacking input data in matrices to speed up the computation.
59-
* Data is stacked within partitions. If block size is more than remaining data in
60-
* a partition then it is adjusted to the size of this data.
61-
* Recommended size is between 10 and 1000.
62-
* Default: 128
63-
*
64-
* @group expertParam
65-
*/
66-
@Since("1.5.0")
67-
final val blockSize: IntParam = new IntParam(this, "blockSize",
68-
"Block size for stacking input data in matrices. Data is stacked within partitions." +
69-
" If block size is more than remaining data in a partition then " +
70-
"it is adjusted to the size of this data. Recommended size is between 10 and 1000",
71-
ParamValidators.gt(0))
72-
73-
/** @group expertGetParam */
74-
@Since("1.5.0")
75-
final def getBlockSize: Int = $(blockSize)
76-
7757
/**
7858
* The solver algorithm for optimization.
7959
* Supported options: "gd" (minibatch gradient descent) or "l-bfgs".

mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,11 @@ private[shared] object SharedParamsCodeGen {
104104
isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"),
105105
ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " +
106106
"each row is for training or for validation. False indicates training; true indicates " +
107-
"validation.")
107+
"validation."),
108+
ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
109+
"stacked within partitions. If block size is more than remaining data in a partition " +
110+
"then it is adjusted to the size of this data.",
111+
isValid = "ParamValidators.gt(0)", isExpertParam = true)
108112
)
109113

110114
val code = genSharedParams(params)

mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala

+17
Original file line numberDiff line numberDiff line change
@@ -578,4 +578,21 @@ trait HasValidationIndicatorCol extends Params {
578578
/** @group getParam */
579579
final def getValidationIndicatorCol: String = $(validationIndicatorCol)
580580
}
581+
582+
/**
583+
* Trait for shared param blockSize. This trait may be changed or
584+
* removed between minor versions.
585+
*/
586+
@DeveloperApi
587+
trait HasBlockSize extends Params {
588+
589+
/**
590+
* Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..
591+
* @group expertParam
592+
*/
593+
final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0))
594+
595+
/** @group expertGetParam */
596+
final def getBlockSize: Int = $(blockSize)
597+
}
581598
// scalastyle:on

mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala

+34-12
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ import org.apache.spark.util.random.XORShiftRandom
5454
/**
5555
* Common params for ALS and ALSModel.
5656
*/
57-
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
57+
private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
58+
with HasBlockSize {
5859
/**
5960
* Param for the column name for user ids. Ids must be integers. Other
6061
* numeric types are supported for this column, but will be cast to integers as long as they
@@ -125,6 +126,8 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo
125126

126127
/** @group expertGetParam */
127128
def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)
129+
130+
setDefault(blockSize -> 4096)
128131
}
129132

130133
/**
@@ -288,6 +291,15 @@ class ALSModel private[ml] (
288291
@Since("2.2.0")
289292
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
290293

294+
/**
295+
* Set block size for stacking input data in matrices.
296+
* Default is 4096.
297+
*
298+
* @group expertSetParam
299+
*/
300+
@Since("3.0.0")
301+
def setBlockSize(value: Int): this.type = set(blockSize, value)
302+
291303
private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
292304
if (featuresA != null && featuresB != null) {
293305
var dotProduct = 0.0f
@@ -351,7 +363,7 @@ class ALSModel private[ml] (
351363
*/
352364
@Since("2.2.0")
353365
def recommendForAllUsers(numItems: Int): DataFrame = {
354-
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
366+
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
355367
}
356368

357369
/**
@@ -366,7 +378,7 @@ class ALSModel private[ml] (
366378
@Since("2.3.0")
367379
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
368380
val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
369-
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
381+
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
370382
}
371383

372384
/**
@@ -377,7 +389,7 @@ class ALSModel private[ml] (
377389
*/
378390
@Since("2.2.0")
379391
def recommendForAllItems(numUsers: Int): DataFrame = {
380-
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
392+
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
381393
}
382394

383395
/**
@@ -392,7 +404,7 @@ class ALSModel private[ml] (
392404
@Since("2.3.0")
393405
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
394406
val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
395-
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
407+
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
396408
}
397409

398410
/**
@@ -441,11 +453,12 @@ class ALSModel private[ml] (
441453
dstFactors: DataFrame,
442454
srcOutputColumn: String,
443455
dstOutputColumn: String,
444-
num: Int): DataFrame = {
456+
num: Int,
457+
blockSize: Int): DataFrame = {
445458
import srcFactors.sparkSession.implicits._
446459

447-
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
448-
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
460+
val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
461+
val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
449462
val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
450463
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
451464
.flatMap { case (srcIter, dstIter) =>
@@ -483,11 +496,10 @@ class ALSModel private[ml] (
483496

484497
/**
485498
* Blockifies factors to improve the efficiency of cross join
486-
* TODO: SPARK-20443 - expose blockSize as a param?
487499
*/
488500
private def blockify(
489501
factors: Dataset[(Int, Array[Float])],
490-
blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
502+
blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
491503
import factors.sparkSession.implicits._
492504
factors.mapPartitions(_.grouped(blockSize))
493505
}
@@ -654,6 +666,15 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
654666
@Since("2.2.0")
655667
def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
656668

669+
/**
670+
* Set block size for stacking input data in matrices.
671+
* Default is 4096.
672+
*
673+
* @group expertSetParam
674+
*/
675+
@Since("3.0.0")
676+
def setBlockSize(value: Int): this.type = set(blockSize, value)
677+
657678
/**
658679
* Sets both numUserBlocks and numItemBlocks to the specific value.
659680
*
@@ -683,7 +704,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
683704
instr.logDataset(dataset)
684705
instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
685706
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
686-
seed, intermediateStorageLevel, finalStorageLevel)
707+
seed, intermediateStorageLevel, finalStorageLevel, blockSize)
687708

688709
val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
689710
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
@@ -694,7 +715,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
694715
checkpointInterval = $(checkpointInterval), seed = $(seed))
695716
val userDF = userFactors.toDF("id", "features")
696717
val itemDF = itemFactors.toDF("id", "features")
697-
val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
718+
val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
719+
.setParent(this)
698720
copyValues(model)
699721
}
700722

python/pyspark/ml/classification.py

+8-14
Original file line numberDiff line numberDiff line change
@@ -2153,7 +2153,7 @@ def sigma(self):
21532153

21542154

21552155
class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
2156-
HasTol, HasStepSize, HasSolver):
2156+
HasTol, HasStepSize, HasSolver, HasBlockSize):
21572157
"""
21582158
Params for :py:class:`MultilayerPerceptronClassifier`.
21592159
@@ -2164,11 +2164,6 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
21642164
"E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
21652165
"neurons and output layer of 10 neurons.",
21662166
typeConverter=TypeConverters.toListInt)
2167-
blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
2168-
"matrices. Data is stacked within partitions. If block size is more than " +
2169-
"remaining data in a partition then it is adjusted to the size of this " +
2170-
"data. Recommended size is between 10 and 1000, default is 128.",
2171-
typeConverter=TypeConverters.toInt)
21722167
solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
21732168
"options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
21742169
initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
@@ -2181,13 +2176,6 @@ def getLayers(self):
21812176
"""
21822177
return self.getOrDefault(self.layers)
21832178

2184-
@since("1.6.0")
2185-
def getBlockSize(self):
2186-
"""
2187-
Gets the value of blockSize or its default value.
2188-
"""
2189-
return self.getOrDefault(self.blockSize)
2190-
21912179
@since("2.0.0")
21922180
def getInitialWeights(self):
21932181
"""
@@ -2211,11 +2199,17 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
22112199
... (1.0, Vectors.dense([0.0, 1.0])),
22122200
... (1.0, Vectors.dense([1.0, 0.0])),
22132201
... (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
2214-
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
2202+
>>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
22152203
>>> mlp.setMaxIter(100)
22162204
MultilayerPerceptronClassifier...
22172205
>>> mlp.getMaxIter()
22182206
100
2207+
>>> mlp.getBlockSize()
2208+
128
2209+
>>> mlp.setBlockSize(1)
2210+
MultilayerPerceptronClassifier...
2211+
>>> mlp.getBlockSize()
2212+
1
22192213
>>> model = mlp.fit(df)
22202214
>>> model.setFeaturesCol("features")
22212215
MultilayerPerceptronClassificationModel...

python/pyspark/ml/param/_shared_params_code_gen.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,10 @@ def get$Name(self):
164164
"'euclidean'", "TypeConverters.toString"),
165165
("validationIndicatorCol", "name of the column that indicates whether each row is for " +
166166
"training or for validation. False indicates training; true indicates validation.",
167-
None, "TypeConverters.toString")]
167+
None, "TypeConverters.toString"),
168+
("blockSize", "block size for stacking input data in matrices. Data is stacked within "
169+
"partitions. If block size is more than remaining data in a partition then it is "
170+
"adjusted to the size of this data.", None, "TypeConverters.toInt")]
168171

169172
code = []
170173
for name, doc, defaultValueStr, typeConverter in shared:

python/pyspark/ml/param/shared.py

+17
Original file line numberDiff line numberDiff line change
@@ -580,3 +580,20 @@ def getValidationIndicatorCol(self):
580580
Gets the value of validationIndicatorCol or its default value.
581581
"""
582582
return self.getOrDefault(self.validationIndicatorCol)
583+
584+
585+
class HasBlockSize(Params):
586+
"""
587+
Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
588+
"""
589+
590+
blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt)
591+
592+
def __init__(self):
593+
super(HasBlockSize, self).__init__()
594+
595+
def getBlockSize(self):
596+
"""
597+
Gets the value of blockSize or its default value.
598+
"""
599+
return self.getOrDefault(self.blockSize)

python/pyspark/ml/recommendation.py

+23-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929

3030
@inherit_doc
31-
class _ALSModelParams(HasPredictionCol):
31+
class _ALSModelParams(HasPredictionCol, HasBlockSize):
3232
"""
3333
Params for :py:class:`ALS` and :py:class:`ALSModel`.
3434
@@ -223,6 +223,8 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
223223
0.1
224224
>>> als.clear(als.regParam)
225225
>>> model = als.fit(df)
226+
>>> model.getBlockSize()
227+
4096
226228
>>> model.getUserCol()
227229
'user'
228230
>>> model.setUserCol("user")
@@ -282,21 +284,22 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB
282284
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
283285
ratingCol="rating", nonnegative=False, checkpointInterval=10,
284286
intermediateStorageLevel="MEMORY_AND_DISK",
285-
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
287+
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
286288
"""
287289
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
288290
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
289291
ratingCol="rating", nonnegative=false, checkpointInterval=10, \
290292
intermediateStorageLevel="MEMORY_AND_DISK", \
291-
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
293+
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
292294
"""
293295
super(ALS, self).__init__()
294296
self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
295297
self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
296298
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
297299
ratingCol="rating", nonnegative=False, checkpointInterval=10,
298300
intermediateStorageLevel="MEMORY_AND_DISK",
299-
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
301+
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
302+
blockSize=4096)
300303
kwargs = self._input_kwargs
301304
self.setParams(**kwargs)
302305

@@ -306,13 +309,13 @@ def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItem
306309
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
307310
ratingCol="rating", nonnegative=False, checkpointInterval=10,
308311
intermediateStorageLevel="MEMORY_AND_DISK",
309-
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
312+
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
310313
"""
311314
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
312315
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
313316
ratingCol="rating", nonnegative=False, checkpointInterval=10, \
314317
intermediateStorageLevel="MEMORY_AND_DISK", \
315-
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
318+
finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
316319
Sets params for ALS.
317320
"""
318321
kwargs = self._input_kwargs
@@ -443,6 +446,13 @@ def setSeed(self, value):
443446
"""
444447
return self._set(seed=value)
445448

449+
@since("3.0.0")
450+
def setBlockSize(self, value):
451+
"""
452+
Sets the value of :py:attr:`blockSize`.
453+
"""
454+
return self._set(blockSize=value)
455+
446456

447457
class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
448458
"""
@@ -479,6 +489,13 @@ def setPredictionCol(self, value):
479489
"""
480490
return self._set(predictionCol=value)
481491

492+
@since("3.0.0")
493+
def setBlockSize(self, value):
494+
"""
495+
Sets the value of :py:attr:`blockSize`.
496+
"""
497+
return self._set(blockSize=value)
498+
482499
@property
483500
@since("1.4.0")
484501
def rank(self):

0 commit comments

Comments
 (0)