Skip to content

Commit 80bf48f

Browse files
brkyvzmarmbrus
authored andcommitted
[SPARK-14555] First cut of Python API for Structured Streaming
## What changes were proposed in this pull request? This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes: - ContinuousQuery - Trigger - ProcessingTime in pyspark under `pyspark.sql.streaming`. In addition, it contains the new methods added under: - `DataFrameWriter` a) `startStream` b) `trigger` c) `queryName` - `DataFrameReader` a) `stream` - `DataFrame` a) `isStreaming` This PR doesn't contain all methods exposed for `ContinuousQuery`, for example: - `exception` - `sourceStatuses` - `sinkStatus` They may be added in a follow up. This PR also contains some very minor doc fixes in the Scala side. ## How was this patch tested? Python doc tests TODO: - [ ] verify Python docs look good Author: Burak Yavuz <brkyvz@gmail.com> Author: Burak Yavuz <burak@databricks.com> Closes apache#12320 from brkyvz/stream-python.
1 parent 8342778 commit 80bf48f

19 files changed

Lines changed: 397 additions & 48 deletions

File tree

python/pyspark/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
3838
"""
3939

40+
from functools import wraps
4041
import types
4142

4243
from pyspark.conf import SparkConf
@@ -84,6 +85,20 @@ def copy_func(f, name=None, sinceversion=None, doc=None):
8485
return fn
8586

8687

88+
def keyword_only(func):
89+
"""
90+
A decorator that forces keyword arguments in the wrapped method
91+
and saves actual input keyword arguments in `_input_kwargs`.
92+
"""
93+
@wraps(func)
94+
def wrapper(*args, **kwargs):
95+
if len(args) > 1:
96+
raise TypeError("Method %s forces keyword arguments." % func.__name__)
97+
wrapper._input_kwargs = kwargs
98+
return func(*args, **kwargs)
99+
return wrapper
100+
101+
87102
# for back compatibility
88103
from pyspark.sql import SQLContext, HiveContext, Row
89104

python/pyspark/ml/classification.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import operator
1919
import warnings
2020

21+
from pyspark import since, keyword_only
2122
from pyspark.ml import Estimator, Model
2223
from pyspark.ml.param.shared import *
2324
from pyspark.ml.regression import (

python/pyspark/ml/clustering.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18-
from pyspark import since
18+
from pyspark import since, keyword_only
1919
from pyspark.ml.util import *
2020
from pyspark.ml.wrapper import JavaEstimator, JavaModel
2121
from pyspark.ml.param.shared import *

python/pyspark/ml/evaluation.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
from abc import abstractmethod, ABCMeta
1919

20-
from pyspark import since
20+
from pyspark import since, keyword_only
2121
from pyspark.ml.wrapper import JavaParams
2222
from pyspark.ml.param import Param, Params, TypeConverters
2323
from pyspark.ml.param.shared import HasLabelCol, HasPredictionCol, HasRawPredictionCol
24-
from pyspark.ml.util import keyword_only
2524
from pyspark.mllib.common import inherit_doc
2625

2726
__all__ = ['Evaluator', 'BinaryClassificationEvaluator', 'RegressionEvaluator',

python/pyspark/ml/feature.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
from py4j.java_collections import JavaArray
2323

24-
from pyspark import since
24+
from pyspark import since, keyword_only
2525
from pyspark.rdd import ignore_unicode_prefix
2626
from pyspark.ml.param.shared import *
27-
from pyspark.ml.util import keyword_only, JavaMLReadable, JavaMLWritable
27+
from pyspark.ml.util import JavaMLReadable, JavaMLWritable
2828
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer, _jvm
2929
from pyspark.mllib.common import inherit_doc
3030
from pyspark.mllib.linalg import _convert_to_vector

python/pyspark/ml/pipeline.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@
2020
if sys.version > '3':
2121
basestring = str
2222

23-
from pyspark import SparkContext
24-
from pyspark import since
23+
from pyspark import since, keyword_only, SparkContext
2524
from pyspark.ml import Estimator, Model, Transformer
2625
from pyspark.ml.param import Param, Params
27-
from pyspark.ml.util import keyword_only, JavaMLWriter, JavaMLReader, MLReadable, MLWritable
26+
from pyspark.ml.util import JavaMLWriter, JavaMLReader, MLReadable, MLWritable
2827
from pyspark.ml.wrapper import JavaParams
2928
from pyspark.mllib.common import inherit_doc
3029

python/pyspark/ml/recommendation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18-
from pyspark import since
18+
from pyspark import since, keyword_only
1919
from pyspark.ml.util import *
2020
from pyspark.ml.wrapper import JavaEstimator, JavaModel
2121
from pyspark.ml.param.shared import *

python/pyspark/ml/regression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import warnings
1919

20-
from pyspark import since
20+
from pyspark import since, keyword_only
2121
from pyspark.ml.param.shared import *
2222
from pyspark.ml.util import *
2323
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper

python/pyspark/ml/tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import tempfile
4343
import numpy as np
4444

45+
from pyspark import keyword_only
4546
from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer
4647
from pyspark.ml.classification import (
4748
LogisticRegression, DecisionTreeClassifier, OneVsRest, OneVsRestModel)
@@ -52,7 +53,6 @@
5253
from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
5354
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
5455
from pyspark.ml.tuning import *
55-
from pyspark.ml.util import keyword_only
5656
from pyspark.ml.util import MLWritable, MLWriter
5757
from pyspark.ml.wrapper import JavaParams
5858
from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector

python/pyspark/ml/tuning.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import numpy as np
2020

2121
from pyspark import SparkContext
22-
from pyspark import since
22+
from pyspark import since, keyword_only
2323
from pyspark.ml import Estimator, Model
2424
from pyspark.ml.param import Params, Param, TypeConverters
2525
from pyspark.ml.param.shared import HasSeed
26-
from pyspark.ml.util import keyword_only, JavaMLWriter, JavaMLReader, MLReadable, MLWritable
26+
from pyspark.ml.util import JavaMLWriter, JavaMLReader, MLReadable, MLWritable
2727
from pyspark.ml.wrapper import JavaParams
2828
from pyspark.sql.functions import rand
2929
from pyspark.mllib.common import inherit_doc, _py2java

0 commit comments

Comments
 (0)