ObservableSSH作ってみた

Unity3DからSSHでサーバ接続する

 以前から実現してみたかった、Unity3D から Linux サーバへの接続用クラスがだいぶ形になってきました。とかいいつつ、SSH 接続する DLL は SSH.NET を使わせていただいてます。

DLL のインストール方法は DLL をビルドして、ここにいれとくだけで多分使えます。

多分。 っていうのは以前はプロジェクトの URL が別のところで、ビルド済の DLL があった記憶があるんですが、これはまた調べてレポートします。

ObservableSSH.cs

とりすーぷさんの最初のアドバイスを参考に Observable.Create を使ったバージョン。 SSHコネクションを作成し、ストリームの入出力を管理します。モノビヘイビアだと、IObservable を AddTo すればラクチンなのかなとも思うのですが、こっちは呼び出し元の方で Dispose を使ってもらう必要がありますが、呼び出し側は割とお手軽に呼べるかなとは思います。

using System;
using System.IO;
using System.Linq;
using System.Threading;
using UnityEngine;
using Renci.SshNet;
using UniRx;

public class ObservableSSH : IDisposable {

    public ConnectionInfo con = null;
    public SshClient sshClient = null;
    public bool loop = true;
    public ShellStream shellStream = null;
    public Subject<string> writeSshSubject = null;
    public Subject<string> stdOutSubject = null;
    public string commandPrefix = "env LANG=en_US.UTF-8";
    public CompositeDisposable compositeDisposable = new CompositeDisposable();

    StreamWriter stdInStreamWriter = null;
    ReactiveProperty<ObservableSSHBuffer> bufferReactiveProperty = new ReactiveProperty<ObservableSSHBuffer>(new ObservableSSHBuffer());

    /// <summary>
    /// コンストラクタ
    /// </summary>
    /// <param name="host">接続先ホスト名</param>
    /// <param name="user">ユーザ名</param>
    /// <param name="password">パスワード</param>
    public ObservableSSH(string host=null, string user=null, string password=null) {

        // SSH接続
        con = new ConnectionInfo(host, 22, user, new AuthenticationMethod[] { new PasswordAuthenticationMethod(user, password) });
        sshClient = new SshClient(con);
        sshClient.Connect();
        shellStream = sshClient.CreateShellStream("console", 800, 100, 800, 600, 1024);

        // SSH書き込みストリーム
        stdInStreamWriter = new StreamWriter(shellStream);
        stdInStreamWriter.AutoFlush = true;

        // コマンド入力をSSH送信
        writeSshSubject = new Subject<string>();
        writeSshSubject
            .Subscribe(command => {
                try {
                    stdInStreamWriter.WriteLine(commandPrefix + " " + command);
                } catch (Exception ex) {
                    Debug.LogErrorFormat("{0}\n{1}", ex.Message, ex.StackTrace);
                    loop = false;
                }
            })
            .AddTo(compositeDisposable);

        // SSH読み込みストリームオープン
        SshReader(shellStream)
            .Subscribe()
            .AddTo(compositeDisposable);

        // 標準出力サブジェクト
        stdOutSubject = new Subject<string>();

        // SSH読み込みストリームを改行に分解し、標準出力のサブジェクトへOnNextする
        bufferReactiveProperty
            .Where(buffer => buffer != null && buffer.count > 0)
            .Subscribe(buffer => {
                while (true) {
                    var newline = buffer.ReadLine();
                    if (newline == null)
                        break;
                    stdOutSubject.OnNext(newline);
                }
            })
            .AddTo(compositeDisposable);
    }

    /// <summary>
    /// SSH読み込みストリーム処理
    /// </summary>
    /// <param name="ssh_stream"></param>
    /// <returns></returns>
    private IObservable<string> SshReader(ShellStream ssh_stream) {
        return Observable.Create<string>(observer => {
            Thread.Sleep(300);
            using (StreamReader reader = new StreamReader(ssh_stream)) {
                while (loop) {

                    if (reader.EndOfStream)
                        continue;

                    if (ssh_stream.CanRead && reader.Peek() != -1) {
                        try {
                            bufferReactiveProperty.SetValueAndForceNotify(bufferReactiveProperty.Value.UpdateFromReadStream(reader));
                        } catch (Exception ex) {
                            observer.OnError(ex);
                            loop = false;
                        }
                    }
                }
            }
            return Disposable.Create(() => { });
        })
        .SubscribeOn(Scheduler.ThreadPool);
    }

    /// <summary>
    /// 接続クローズ及びリソース開放
    /// </summary>
    public void Dispose() {
        loop = false;
        if (stdInStreamWriter != null) {
            stdInStreamWriter.Close();
            stdInStreamWriter.Dispose();
        }
        stdOutSubject.OnCompleted();
        if (sshClient != null) {
            if (sshClient.IsConnected) {
                sshClient.Disconnect();
            }
            sshClient.Dispose();
        }
        compositeDisposable.Dispose();
    }
}

ObservableSSHBuffer.cs

クラスの名前が良くない気がするので、良い名前を検討中。 こいつは StreamReader に入ってくるデータが必ずしも1行づつではないので、このクラスを経由して、出来るだけ1行づつ取得できるようにしてます。

using System.Linq;
using System.IO;

/// <summary>
/// SSHストリームからの受信データをバッファし、改行毎に分割するためのクラス
/// </summary>
class ObservableSSHBuffer {

    /// <summary>
    /// ストリームからの読み込みバッファ
    /// </summary>
    public char[] buffer = Enumerable.Repeat((char)0, 4096).ToArray();

    /// <summary>
    /// バッファへの次回読み込み位置
    /// </summary>
    public int index = 0;

    /// <summary>
    /// 総文字数
    /// </summary>
    public int count = 0;

    /// <summary>
    /// 受け取ったストリームリーダからバッファへの読み込み
    /// </summary>
    /// <param name="reader"></param>
    /// <returns></returns>
    public ObservableSSHBuffer UpdateFromReadStream(StreamReader reader) {
        // ストリームからバッファに読み込み、読み込んだ文字数を取得する
        var count_up = reader.Read(buffer, index, buffer.Length - index - 1);
        //Debug.LogFormat("count_up ← ({0}) = reader.Read(buffer, index({1}), buffer.Length - index - 1 ({2}); // count={3}", count_up, index, buffer.Length - index - 1, count);
        // 総文字数にバッファから読み込んだ文字数を加算する
        count += count_up;
        // バッファへの次回読み込み位置にバッファから読み込んだ文字数を加算する
        index += count_up;
        // デバッグ情報
        //Debug.LogFormat("*** READ FROM STREAM *** {0}", string.Join(" ", Enumerable.Range(0, count).Select(x => string.Format("[{0} = {1}:{2}]", x, ((int)buffer[x]).ToString(), new string(buffer[x], 1))).ToArray()));
        return this;
    }

    /// <summary>
    /// バッファから1行取得する
    /// </summary>
    /// <param name="force">改行コードが存在しなくても強引に取得する</param>
    /// <returns></returns>
    public string ReadLine(bool force = false) {

        var newline = (string)null;

        // 強制取得
        if (force) {
            index = 0;
            count = 0;
            newline = new string(buffer);
            for (int ii = 0; ii < count; ii++) {
                buffer[ii] = (char)0;
            }
            return newline;
        }

        // 読み込んだバッファ内の改行コードの位置を取得する(0を未発見扱いにするため、1オリジンに変換する)
        var find = Enumerable.Range(0, count).Where(i => buffer[i] == '\n').Select(i => i + 1).FirstOrDefault();

        //Debug.LogFormat("count = {0} / find = {1}", count, find);

        // 改行コード発見時のコード
        if (find > 0) {
            var backup_char = buffer[find]; // 行末に\0を入れる前にバックアップ
            buffer[find] = (char)0;         // 行末部分に\0を設定
            newline = new string(buffer);   // 文字列を取得
            buffer[find] = backup_char;     // \0で上書きしたキャラクターを復元

            // バッファに残ったキャラクターを左につめる
            for (int ii = find, jj = 0; ii < count; ii++, jj++) {
                buffer[jj] = buffer[ii];
                buffer[ii] = (char)0;
            }
            var remaining = count - find; // バッファに残った文字数
            for (int ii = remaining; ii < find + 1; ii++) {
                buffer[ii] = (char)0; // 取得した文字列部分を0で初期化する
            }

            count -= find;
            index -= find;

            //Debug.LogFormat("*** find *** {0} / count = {1} / index = {2}", newline, count, index);
            //Debug.LogFormat("*** READ LINE *** {0}", string.Join(" ", Enumerable.Range(0, count).Select(x => string.Format("[{0} = {1}:{2}]", x, ((int)buffer[x]).ToString(), new string(buffer[x], 1))).ToArray()));
        }

        return newline;
    }

}

TestSSH2.cs

テスト用コードです。
ヒエラルキの適当なオブジェクトにこいつを割り当て、インスペクタで host, user, password を設定します。秘密鍵を使ったバージョンはまた次回あたりにでも。

ログインして、ls します。それだけ。

ObservableSSH を new した後、標準出力(stdOutSubject)と、SSHコマンド送信(writeSshSubject)を使ってサーバオペレーションを Unity3D から行えるようになるといいなぁと。 とりあえずいつも使ってるサーバ設定レシピをまずこいつにやらしてみよう。

標準出力は stdOutSubject のストリームに1行づつ流れてくるので、マクロっぽい事や、tail のログ監視で特定文字列に反応するとかができそうな気がします。

でもこれ、複数のコネクション作るとスレッドいっぱいできちゃうのかな? 複数サーバ監視で使うならマイクロコルーチンを使って沢山接続を作るというのもありなのでしょうか。 どっちにしてもまだできたばっかりなので、最適化やチューニングはこれからということで。

using UnityEngine;
using UniRx;

public class TestSSH2 : MonoBehaviour {

    public string host;
    public string user;
    public string password;

    ObservableSSH observableSsh;

	// Use this for initialization
	void Start () {
        observableSsh = new ObservableSSH(host: host, user: user, password: password);
        observableSsh
            .stdOutSubject
            .Subscribe(line => Debug.Log(line))
            .AddTo(observableSsh.compositeDisposable);

        observableSsh
            .writeSshSubject.OnNext("ls -l");
	}

    void OnDestroy() {
        observableSsh.Dispose();
    }

}

次なる野望

これが一段落したら、次は XML から生成する uGUI と UniRx の自動設定に手をつけよう。

ツイートツイート