Skip to content
This repository was archived by the owner on Jun 7, 2020. It is now read-only.

Commit

Permalink
Merge pull request #560 from RocketChat/develop
Browse files Browse the repository at this point in the history
[RELEASE] Merge develop into master
  • Loading branch information
rafaelks authored Nov 28, 2017
2 parents 1f3713b + 92ee59e commit 5bbd3f7
Show file tree
Hide file tree
Showing 199 changed files with 3,505 additions and 3,369 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ git clone https://github.com/RocketChat/Rocket.Chat.Android

### Code style guide

Before submitting a PR you should follow our [Coding Style](https://github.com/RocketChat/Rocket.Chat.Android/blob/develop/CODING_STYLE.md).
Before submitting a PR you should follow our [Coding Style](https://github.com/RocketChat/java-code-styles/blob/master/CODING_STYLE.md).
11 changes: 0 additions & 11 deletions android-ddp/build.gradle
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
apply plugin: 'com.android.library'
apply plugin: 'me.tatarka.retrolambda'

buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.3.3'
classpath 'me.tatarka:gradle-retrolambda:3.6.1'
classpath 'me.tatarka.retrolambda.projectlombok:lombok.ast:0.2.3.a2'
}
}
android {
compileSdkVersion rootProject.ext.compileSdkVersion
buildToolsVersion rootProject.ext.buildToolsVersion
Expand Down
60 changes: 0 additions & 60 deletions android-ddp/gen_listeners.py

This file was deleted.

232 changes: 171 additions & 61 deletions android-ddp/src/main/java/chat/rocket/android_ddp/DDPClient.java
Original file line number Diff line number Diff line change
@@ -1,73 +1,183 @@
package chat.rocket.android_ddp;


import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.annotations.Nullable;
import android.text.TextUtils;

import org.json.JSONArray;
import org.json.JSONException;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import bolts.Task;
import bolts.TaskCompletionSource;
import chat.rocket.android.log.RCLog;
import chat.rocket.android_ddp.rx.RxWebSocketCallback;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import okhttp3.OkHttpClient;

public class DDPClient {
// reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js

private final DDPClientImpl impl;

public DDPClient(OkHttpClient client) {
impl = new DDPClientImpl(this, client);
}

public Task<DDPClientCallback.Connect> connect(String url) {
return connect(url, null);
}

public Task<DDPClientCallback.Connect> connect(String url, String session) {
TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>();
impl.connect(task, url, session);
return task.getTask();
}

public Task<DDPClientCallback.Ping> ping(@Nullable String id) {
TaskCompletionSource<DDPClientCallback.Ping> task = new TaskCompletionSource<>();
impl.ping(task, id);
return task.getTask();
}

public Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id);
}

public Task<DDPClientCallback.RPC> rpc(String method, JSONArray params, String id,
long timeoutMs) {
TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>();
impl.rpc(task, method, params, id, timeoutMs);
return task.getTask();
}

public Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) {
TaskCompletionSource<DDPSubscription.Ready> task = new TaskCompletionSource<>();
impl.sub(task, name, params, id);
return task.getTask();
}

public Task<DDPSubscription.NoSub> unsub(String id) {
TaskCompletionSource<DDPSubscription.NoSub> task = new TaskCompletionSource<>();
impl.unsub(task, id);
return task.getTask();
}

public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return impl.getDDPSubscription();
}

public Task<RxWebSocketCallback.Close> getOnCloseCallback() {
return impl.getOnCloseCallback();
}

public void close() {
impl.close(1000, "closed by DDPClient#close()");
}
// reference: https://github.com/eddflrs/meteor-ddp/blob/master/meteor-ddp.js
public static final int REASON_CLOSED_BY_USER = 1000;
public static final int REASON_NETWORK_ERROR = 1001;

private static volatile DDPClient singleton;
private static volatile OkHttpClient client;
private final DDPClientImpl impl;
private final AtomicReference<String> hostname = new AtomicReference<>();

public static void initialize(OkHttpClient okHttpClient) {
client = okHttpClient;
}

public static DDPClient get() {
DDPClient result = singleton;
if (result == null) {
synchronized (DDPClient.class) {
result = singleton;
if (result == null) {
singleton = result = new DDPClient(client);
}
}
}
return result;
}

private DDPClient(OkHttpClient client) {
impl = new DDPClientImpl(this, client);
}

private Task<DDPClientCallback.Connect> connect(String url, String session) {
hostname.set(url);
TaskCompletionSource<DDPClientCallback.Connect> task = new TaskCompletionSource<>();
impl.connect(task, url, session);
return task.getTask();
}

private Task<DDPClientCallback.Ping> ping(@Nullable String id) {
TaskCompletionSource<DDPClientCallback.Ping> task = new TaskCompletionSource<>();
impl.ping(task, id);
return task.getTask();
}

private Maybe<DDPClientCallback.Base> doPing(@Nullable String id) {
return impl.ping(id);
}

private Task<DDPSubscription.Ready> sub(String id, String name, JSONArray params) {
TaskCompletionSource<DDPSubscription.Ready> task = new TaskCompletionSource<>();
impl.sub(task, name, params, id);
return task.getTask();
}

private Task<DDPSubscription.NoSub> unsub(String id) {
TaskCompletionSource<DDPSubscription.NoSub> task = new TaskCompletionSource<>();
impl.unsub(task, id);
return task.getTask();
}

public Task<RxWebSocketCallback.Close> getOnCloseCallback() {
return impl.getOnCloseCallback();
}

public void close() {
impl.close(REASON_CLOSED_BY_USER, "closed by DDPClient#close()");
}

/**
* check WebSocket connectivity with ping.
*/
public Task<Void> ping() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return ping(pingId)
.continueWithTask(task -> {
if (task.isFaulted()) {
RCLog.d(task.getError(), "ping[%s] xxx failed xxx", pingId);
return Task.forError(task.getError());
} else {
RCLog.d("pong[%s] <", pingId);
return Task.forResult(null);
}
});
}

/**
* check WebSocket connectivity with ping.
*/
public Maybe<DDPClientCallback.Base> doPing() {
final String pingId = UUID.randomUUID().toString();
RCLog.d("ping[%s] >", pingId);
return doPing(pingId);
}

/**
* Connect to WebSocket server with DDP client.
*/
public Task<DDPClientCallback.Connect> connect(@NonNull String hostname, @Nullable String session,
boolean usesSecureConnection) {
final String protocol = usesSecureConnection ? "wss://" : "ws://";
return connect(protocol + hostname + "/websocket", session);
}

/**
* Subscribe with DDP client.
*/
public Task<DDPSubscription.Ready> subscribe(final String name, JSONArray param) {
final String subscriptionId = UUID.randomUUID().toString();
RCLog.d("sub:[%s]> %s(%s)", subscriptionId, name, param);
return sub(subscriptionId, name, param);
}

/**
* Unsubscribe with DDP client.
*/
public Task<DDPSubscription.NoSub> unsubscribe(final String subscriptionId) {
RCLog.d("unsub:[%s]>", subscriptionId);
return unsub(subscriptionId);
}

/**
* Returns Observable for handling DDP subscription.
*/
public Flowable<DDPSubscription.Event> getSubscriptionCallback() {
return impl.getDDPSubscription();
}

/**
* Execute raw RPC.
*/
public Task<DDPClientCallback.RPC> rpc(String methodCallId, String methodName, String params,
long timeoutMs) {
TaskCompletionSource<DDPClientCallback.RPC> task = new TaskCompletionSource<>();
RCLog.d("rpc:[%s]> %s(%s) timeout=%d", methodCallId, methodName, params, timeoutMs);
if (TextUtils.isEmpty(params)) {
impl.rpc(task, methodName, null, methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
}
return task_;
});
}

try {
impl.rpc(task, methodName, new JSONArray(params), methodCallId, timeoutMs);
return task.getTask().continueWithTask(task_ -> {
if (task_.isFaulted()) {
RCLog.d("rpc:[%s]< error = %s", methodCallId, task_.getError());
} else {
RCLog.d("rpc:[%s]< result = %s", methodCallId, task_.getResult().result);
}
return task_;
});
} catch (JSONException exception) {
return Task.forError(exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import android.support.annotation.NonNull;
import android.support.annotation.Nullable;

import org.json.JSONObject;

public class DDPClientCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ private static String extractMsg(JSONObject response) {
}
}

public void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
/* package */ void connect(final TaskCompletionSource<DDPClientCallback.Connect> task, final String url,
String session) {
try {
flowable = websocket.connect(url).autoConnect(2);
CompositeDisposable disposables = new CompositeDisposable();

disposables.add(
flowable.retry().filter(callback -> callback instanceof RxWebSocketCallback.Open)
flowable.filter(callback -> callback instanceof RxWebSocketCallback.Open)
.subscribe(
callback ->
sendMessage("connect",
Expand Down Expand Up @@ -115,6 +115,7 @@ public Maybe<DDPClientCallback.Base> ping(@Nullable final String id) {

if (requested) {
return flowable.filter(callback -> callback instanceof RxWebSocketCallback.Message)
.timeout(8, TimeUnit.SECONDS)
.map(callback -> ((RxWebSocketCallback.Message) callback).responseBodyString)
.map(DDPClientImpl::toJson)
.filter(response -> "pong".equalsIgnoreCase(extractMsg(response)))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chat.rocket.android_ddp;

import android.support.annotation.NonNull;

import org.json.JSONArray;
import org.json.JSONObject;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package chat.rocket.android_ddp.rx;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import chat.rocket.android.log.RCLog;
import io.reactivex.BackpressureStrategy;
Expand Down
Loading

0 comments on commit 5bbd3f7

Please sign in to comment.