@@ -141,27 +141,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
141
141
}
142
142
143
143
test(" uncaching temp table" ) {
144
- testData.select(" key" ).createOrReplaceTempView(" tempTable1" )
145
- testData.select(" key" ).createOrReplaceTempView(" tempTable2" )
146
- spark.catalog.cacheTable(" tempTable1" )
144
+ withTempView(" tempTable1" , " tempTable2" ) {
145
+ testData.select(" key" ).createOrReplaceTempView(" tempTable1" )
146
+ testData.select(" key" ).createOrReplaceTempView(" tempTable2" )
147
+ spark.catalog.cacheTable(" tempTable1" )
147
148
148
- assertCached(sql(" SELECT COUNT(*) FROM tempTable1" ))
149
- assertCached(sql(" SELECT COUNT(*) FROM tempTable2" ))
149
+ assertCached(sql(" SELECT COUNT(*) FROM tempTable1" ))
150
+ assertCached(sql(" SELECT COUNT(*) FROM tempTable2" ))
150
151
151
- // Is this valid?
152
- uncacheTable(" tempTable2" )
152
+ // Is this valid?
153
+ uncacheTable(" tempTable2" )
153
154
154
- // Should this be cached?
155
- assertCached(sql(" SELECT COUNT(*) FROM tempTable1" ), 0 )
155
+ // Should this be cached?
156
+ assertCached(sql(" SELECT COUNT(*) FROM tempTable1" ), 0 )
157
+ }
156
158
}
157
159
158
160
test(" too big for memory" ) {
159
- val data = " *" * 1000
160
- sparkContext.parallelize(1 to 200000 , 1 ).map(_ => BigData (data)).toDF()
161
- .createOrReplaceTempView(" bigData" )
162
- spark.table(" bigData" ).persist(StorageLevel .MEMORY_AND_DISK )
163
- assert(spark.table(" bigData" ).count() === 200000L )
164
- spark.table(" bigData" ).unpersist(blocking = true )
161
+ withTempView(" bigData" ) {
162
+ val data = " *" * 1000
163
+ sparkContext.parallelize(1 to 200000 , 1 ).map(_ => BigData (data)).toDF()
164
+ .createOrReplaceTempView(" bigData" )
165
+ spark.table(" bigData" ).persist(StorageLevel .MEMORY_AND_DISK )
166
+ assert(spark.table(" bigData" ).count() === 200000L )
167
+ spark.table(" bigData" ).unpersist(blocking = true )
168
+ }
165
169
}
166
170
167
171
test(" calling .cache() should use in-memory columnar caching" ) {
@@ -225,12 +229,14 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
225
229
}
226
230
227
231
test(" SELECT star from cached table" ) {
228
- sql(" SELECT * FROM testData" ).createOrReplaceTempView(" selectStar" )
229
- spark.catalog.cacheTable(" selectStar" )
230
- checkAnswer(
231
- sql(" SELECT * FROM selectStar WHERE key = 1" ),
232
- Seq (Row (1 , " 1" )))
233
- uncacheTable(" selectStar" )
232
+ withTempView(" selectStar" ) {
233
+ sql(" SELECT * FROM testData" ).createOrReplaceTempView(" selectStar" )
234
+ spark.catalog.cacheTable(" selectStar" )
235
+ checkAnswer(
236
+ sql(" SELECT * FROM selectStar WHERE key = 1" ),
237
+ Seq (Row (1 , " 1" )))
238
+ uncacheTable(" selectStar" )
239
+ }
234
240
}
235
241
236
242
test(" Self-join cached" ) {
@@ -375,102 +381,112 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
375
381
}
376
382
377
383
test(" Drops temporary table" ) {
378
- testData.select(" key" ).createOrReplaceTempView(" t1" )
379
- spark.table(" t1" )
380
- spark.catalog.dropTempView(" t1" )
381
- intercept[AnalysisException ](spark.table(" t1" ))
384
+ withTempView(" t1" ) {
385
+ testData.select(" key" ).createOrReplaceTempView(" t1" )
386
+ spark.table(" t1" )
387
+ spark.catalog.dropTempView(" t1" )
388
+ intercept[AnalysisException ](spark.table(" t1" ))
389
+ }
382
390
}
383
391
384
392
test(" Drops cached temporary table" ) {
385
- testData.select(" key" ).createOrReplaceTempView(" t1" )
386
- testData.select(" key" ).createOrReplaceTempView(" t2" )
387
- spark.catalog.cacheTable(" t1" )
393
+ withTempView(" t1" , " t2" ) {
394
+ testData.select(" key" ).createOrReplaceTempView(" t1" )
395
+ testData.select(" key" ).createOrReplaceTempView(" t2" )
396
+ spark.catalog.cacheTable(" t1" )
388
397
389
- assert(spark.catalog.isCached(" t1" ))
390
- assert(spark.catalog.isCached(" t2" ))
398
+ assert(spark.catalog.isCached(" t1" ))
399
+ assert(spark.catalog.isCached(" t2" ))
391
400
392
- spark.catalog.dropTempView(" t1" )
393
- intercept[AnalysisException ](spark.table(" t1" ))
394
- assert(! spark.catalog.isCached(" t2" ))
401
+ spark.catalog.dropTempView(" t1" )
402
+ intercept[AnalysisException ](spark.table(" t1" ))
403
+ assert(! spark.catalog.isCached(" t2" ))
404
+ }
395
405
}
396
406
397
407
test(" Clear all cache" ) {
398
- sql(" SELECT key FROM testData LIMIT 10" ).createOrReplaceTempView(" t1" )
399
- sql(" SELECT key FROM testData LIMIT 5" ).createOrReplaceTempView(" t2" )
400
- spark.catalog.cacheTable(" t1" )
401
- spark.catalog.cacheTable(" t2" )
402
- spark.catalog.clearCache()
403
- assert(spark.sharedState.cacheManager.isEmpty)
404
-
405
- sql(" SELECT key FROM testData LIMIT 10" ).createOrReplaceTempView(" t1" )
406
- sql(" SELECT key FROM testData LIMIT 5" ).createOrReplaceTempView(" t2" )
407
- spark.catalog.cacheTable(" t1" )
408
- spark.catalog.cacheTable(" t2" )
409
- sql(" Clear CACHE" )
410
- assert(spark.sharedState.cacheManager.isEmpty)
408
+ withTempView(" t1" , " t2" ) {
409
+ sql(" SELECT key FROM testData LIMIT 10" ).createOrReplaceTempView(" t1" )
410
+ sql(" SELECT key FROM testData LIMIT 5" ).createOrReplaceTempView(" t2" )
411
+ spark.catalog.cacheTable(" t1" )
412
+ spark.catalog.cacheTable(" t2" )
413
+ spark.catalog.clearCache()
414
+ assert(spark.sharedState.cacheManager.isEmpty)
415
+
416
+ sql(" SELECT key FROM testData LIMIT 10" ).createOrReplaceTempView(" t1" )
417
+ sql(" SELECT key FROM testData LIMIT 5" ).createOrReplaceTempView(" t2" )
418
+ spark.catalog.cacheTable(" t1" )
419
+ spark.catalog.cacheTable(" t2" )
420
+ sql(" Clear CACHE" )
421
+ assert(spark.sharedState.cacheManager.isEmpty)
422
+ }
411
423
}
412
424
413
425
test(" Ensure accumulators to be cleared after GC when uncacheTable" ) {
414
- sql(" SELECT key FROM testData LIMIT 10" ).createOrReplaceTempView(" t1" )
415
- sql(" SELECT key FROM testData LIMIT 5" ).createOrReplaceTempView(" t2" )
426
+ withTempView(" t1" , " t2" ) {
427
+ sql(" SELECT key FROM testData LIMIT 10" ).createOrReplaceTempView(" t1" )
428
+ sql(" SELECT key FROM testData LIMIT 5" ).createOrReplaceTempView(" t2" )
416
429
417
- spark.catalog.cacheTable(" t1" )
418
- spark.catalog.cacheTable(" t2" )
430
+ spark.catalog.cacheTable(" t1" )
431
+ spark.catalog.cacheTable(" t2" )
419
432
420
- sql(" SELECT * FROM t1" ).count()
421
- sql(" SELECT * FROM t2" ).count()
422
- sql(" SELECT * FROM t1" ).count()
423
- sql(" SELECT * FROM t2" ).count()
433
+ sql(" SELECT * FROM t1" ).count()
434
+ sql(" SELECT * FROM t2" ).count()
435
+ sql(" SELECT * FROM t1" ).count()
436
+ sql(" SELECT * FROM t2" ).count()
437
+
438
+ val toBeCleanedAccIds = new HashSet [Long ]
439
+
440
+ val accId1 = spark.table(" t1" ).queryExecution.withCachedData.collect {
441
+ case i : InMemoryRelation => i.cacheBuilder.sizeInBytesStats.id
442
+ }.head
443
+ toBeCleanedAccIds += accId1
444
+
445
+ val accId2 = spark.table(" t1" ).queryExecution.withCachedData.collect {
446
+ case i : InMemoryRelation => i.cacheBuilder.sizeInBytesStats.id
447
+ }.head
448
+ toBeCleanedAccIds += accId2
449
+
450
+ val cleanerListener = new CleanerListener {
451
+ def rddCleaned (rddId : Int ): Unit = {}
452
+ def shuffleCleaned (shuffleId : Int ): Unit = {}
453
+ def broadcastCleaned (broadcastId : Long ): Unit = {}
454
+ def accumCleaned (accId : Long ): Unit = {
455
+ toBeCleanedAccIds.synchronized { toBeCleanedAccIds -= accId }
456
+ }
457
+ def checkpointCleaned (rddId : Long ): Unit = {}
458
+ }
459
+ spark.sparkContext.cleaner.get.attachListener(cleanerListener)
424
460
425
- val toBeCleanedAccIds = new HashSet [Long ]
461
+ uncacheTable(" t1" )
462
+ uncacheTable(" t2" )
426
463
427
- val accId1 = spark.table(" t1" ).queryExecution.withCachedData.collect {
428
- case i : InMemoryRelation => i.cacheBuilder.sizeInBytesStats.id
429
- }.head
430
- toBeCleanedAccIds += accId1
464
+ System .gc()
431
465
432
- val accId2 = spark.table(" t1" ).queryExecution.withCachedData.collect {
433
- case i : InMemoryRelation => i.cacheBuilder.sizeInBytesStats.id
434
- }.head
435
- toBeCleanedAccIds += accId2
436
-
437
- val cleanerListener = new CleanerListener {
438
- def rddCleaned (rddId : Int ): Unit = {}
439
- def shuffleCleaned (shuffleId : Int ): Unit = {}
440
- def broadcastCleaned (broadcastId : Long ): Unit = {}
441
- def accumCleaned (accId : Long ): Unit = {
442
- toBeCleanedAccIds.synchronized { toBeCleanedAccIds -= accId }
466
+ eventually(timeout(10 .seconds)) {
467
+ assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty },
468
+ " batchStats accumulators should be cleared after GC when uncacheTable" )
443
469
}
444
- def checkpointCleaned (rddId : Long ): Unit = {}
445
- }
446
- spark.sparkContext.cleaner.get.attachListener(cleanerListener)
447
-
448
- uncacheTable(" t1" )
449
- uncacheTable(" t2" )
450
-
451
- System .gc()
452
470
453
- eventually(timeout(10 .seconds)) {
454
- assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty },
455
- " batchStats accumulators should be cleared after GC when uncacheTable" )
471
+ assert(AccumulatorContext .get(accId1).isEmpty)
472
+ assert(AccumulatorContext .get(accId2).isEmpty)
456
473
}
457
-
458
- assert(AccumulatorContext .get(accId1).isEmpty)
459
- assert(AccumulatorContext .get(accId2).isEmpty)
460
474
}
461
475
462
476
test(" SPARK-10327 Cache Table is not working while subquery has alias in its project list" ) {
463
- sparkContext.parallelize((1 , 1 ) :: (2 , 2 ) :: Nil )
464
- .toDF(" key" , " value" ).selectExpr(" key" , " value" , " key+1" ).createOrReplaceTempView(" abc" )
465
- spark.catalog.cacheTable(" abc" )
466
-
467
- val sparkPlan = sql(
468
- """ select a.key, b.key, c.key from
469
- |abc a join abc b on a.key=b.key
470
- |join abc c on a.key=c.key""" .stripMargin).queryExecution.sparkPlan
471
-
472
- assert(sparkPlan.collect { case e : InMemoryTableScanExec => e }.size === 3 )
473
- assert(sparkPlan.collect { case e : RDDScanExec => e }.size === 0 )
477
+ withTempView(" abc" ) {
478
+ sparkContext.parallelize((1 , 1 ) :: (2 , 2 ) :: Nil )
479
+ .toDF(" key" , " value" ).selectExpr(" key" , " value" , " key+1" ).createOrReplaceTempView(" abc" )
480
+ spark.catalog.cacheTable(" abc" )
481
+
482
+ val sparkPlan = sql(
483
+ """ select a.key, b.key, c.key from
484
+ |abc a join abc b on a.key=b.key
485
+ |join abc c on a.key=c.key""" .stripMargin).queryExecution.sparkPlan
486
+
487
+ assert(sparkPlan.collect { case e : InMemoryTableScanExec => e }.size === 3 )
488
+ assert(sparkPlan.collect { case e : RDDScanExec => e }.size === 0 )
489
+ }
474
490
}
475
491
476
492
/**
@@ -628,26 +644,30 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
628
644
}
629
645
630
646
test(" SPARK-15870 DataFrame can't execute after uncacheTable" ) {
631
- val selectStar = sql(" SELECT * FROM testData WHERE key = 1" )
632
- selectStar.createOrReplaceTempView(" selectStar" )
647
+ withTempView(" selectStar" ) {
648
+ val selectStar = sql(" SELECT * FROM testData WHERE key = 1" )
649
+ selectStar.createOrReplaceTempView(" selectStar" )
633
650
634
- spark.catalog.cacheTable(" selectStar" )
635
- checkAnswer(
636
- selectStar,
637
- Seq (Row (1 , " 1" )))
651
+ spark.catalog.cacheTable(" selectStar" )
652
+ checkAnswer(
653
+ selectStar,
654
+ Seq (Row (1 , " 1" )))
638
655
639
- uncacheTable(" selectStar" )
640
- checkAnswer(
641
- selectStar,
642
- Seq (Row (1 , " 1" )))
656
+ uncacheTable(" selectStar" )
657
+ checkAnswer(
658
+ selectStar,
659
+ Seq (Row (1 , " 1" )))
660
+ }
643
661
}
644
662
645
663
test(" SPARK-15915 Logical plans should use canonicalized plan when override sameResult" ) {
646
- val localRelation = Seq (1 , 2 , 3 ).toDF()
647
- localRelation.createOrReplaceTempView(" localRelation" )
664
+ withTempView(" localRelation" ) {
665
+ val localRelation = Seq (1 , 2 , 3 ).toDF()
666
+ localRelation.createOrReplaceTempView(" localRelation" )
648
667
649
- spark.catalog.cacheTable(" localRelation" )
650
- assert(getNumInMemoryRelations(localRelation) == 1 )
668
+ spark.catalog.cacheTable(" localRelation" )
669
+ assert(getNumInMemoryRelations(localRelation) == 1 )
670
+ }
651
671
}
652
672
653
673
test(" SPARK-19093 Caching in side subquery" ) {
0 commit comments