Connect to a Stream
Create a Stream
data.uecore.io:4222
This is the server DNS to connect a client to UE Streams
Begin by logging into the portal and clicking "Simple Streams" on the left navigation.
When the streams window opens, you'll see two tab options at the top. The default will be "Streams" and next to that will be "Subjects". Remain on Streams.
Click the "New Stream" button and fill in the fields on the right panel as shown. Notice that there are no spaces in the name. Spaces are not allowed.
If you then scroll down a bit, you can associate subjects to this stream. If you leave it blank, a subject with the same name as the stream will be created and associated. We'll leave it blank as shown. Click "Create New Stream".
This will create the stream and you'll now see it on the list of available streams in the left panel of the window. If you select the card, the right panel will shift to show the current configuration of the stream including its limits, thresholds, and data within the stream. You will also see buttons to edit, see live messages, or request access.
Request Access
Connecting to this stream is as simple as clicking "Request Access". Click the button and fill in the form as shown. Access is always within the context of a subject, even when applied to a stream. We are going to request both publisher and subscriber access to the associated subject (e.g., my-new-stream).
Streams require a Consumer for subscriptions. Your access can come with a number of reserved consumer names to which you will be given full admin permissions to create and destroy from within your code. If you need other named Consumers later, you can centrally add them from Access Management.
The difference between reserved consumers you request here vs those you create centrally later, is that unlike reserved consumers, you will not be able to create or destroy centrally created consumers from your code, you can only leverage them for access.
By default every access requests provisions one reserved consumer if you leave this field blank. Otherwise you can provide a number up to 10. We will leave it blank and click "Request Access".
This will result in a success window with some information shown. Scroll past each screen shot for a breakdown of the information.
- ID - This is the internal UE Stream identifier for this access record
- Publish Access - This shows to which subject this access record can publish
- Subscribe Access - This shows to which subject his access record can subscribe
- Public Key - This is the public key for the secret seed needed for the connection. Within the UE Stream API, ID and Public Key are interchangeable for most requests.
- Custom Inboxes - The streaming API requires this for your client to send acknowledgements.
In order to connect to this stream, you will need a JWT and a Seed, along with a few additional details such as the Custom Inbox value and Reserved Consumer name (below). The JWT is a representation of all of the permissions associated to this access request. If you were to decode the JWT, you would see the breakdown of your access (not shown - you can copy it into jwt.io if you are curious).
The JWT is set to expire and with UE Streams, this is always the case. For sustained connections, you are expected to utilize the Client ID and Client Secret shown below to continuously refresh the JWT. It is provided here to allow a quick connection and debug.
NEVER SHARE OR POST THESE VALUES
The image depicts data that is not active as a learning tool. You should never share or post the Seed or Client Secret in a non-secure environment. Doing so might allow someone to gain access to the data stream.
- Seed - This is the private key value that is required to make a secure connection to the subject using one of the open source clients.
- Client Secret - This is a client credential secret to allow new JWTs to be requested from the JWT API. Instructions on how to do this are provided under the Sustained Connection section of this page.
- Client ID - This is the client credential id for to allow new JWTs to be requested.
Finally, you'll want to retrieve your Reserved Consumer name as well. On the portal left navigation, click "Access Management". This will open a window where all access connections you've requested are visible. Click the access connection we just created.
On the right panel, you'll see the details of this connection. You can actually disable or even delete this connection from here. You can also view all consumers. Click the blue consumers button.
On the resulting modal window, you'll see all of your consumers, which for now is just the reserved consumer. Click it and copy the name for later.
Reserved Consumers
These will be available on the initial success window soon so that you do not have to go find them on every request.
Hello World
This hello world example will show you how to use the provided data to immediately connect to the stream, subscribe, or publish. The example is in Node.js JavaScript.
Publish
const {connect, StringCodec, credsAuthenticator } = require('nats');
const sc = StringCodec();
const natsServer = 'data.uecore.io:4222';
const stream = 'my-new-stream';
//subject has the same name as the stream in this example
const subject = 'my-new-stream';
const inbox = '_INBOX_my-new-stream_XRGz1ztHyW';
const jwt = 'COPY-YOUR-JWT-HERE';
const seed = 'COPY-YOUR-SEED-HERE';
const credentials = `-----BEGIN NATS USER JWT-----
${jwt}
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
${seed}
------END USER NKEY SEED------
`;
(async () => {
const nc = await connect({
debug: true,
servers: natsServer,
authenticator: credsAuthenticator(new TextEncoder().encode(credentials)),
inboxPrefix: inbox, //needed for streams connections
name: 'hello-world-publish'
});
const js = await nc.jetstream();
for(let i = 0; i<10; i++) {
const obj = { message: `hello world ${i}`};
// message must be a string
const message = JSON.stringify(obj);
await js.publish(subject, sc.encode(message));
console.info(`Published ${i}`)
}
nc.drain();
})();
You can verify that the above worked by accessing your stream on the Simple Stream dashboard and clicking "my-new-stream" on the card view. At the top of the right panel, click "messages">"Query".
Then you can select the last message in the stream and view it. You should see something like this:
Subscribe
const {connect, StringCodec, credsAuthenticator, consumerOpts, createInbox } = require('nats');
const sc = StringCodec();
const natsServer = 'data.uecore.io:4222';
const stream = 'my-new-stream';
//subject has the same name as the stream in this example
const subject = 'my-new-stream';
const inbox = '_INBOX_my-new-stream_XRGz1ztHyW';
const jwt = 'COPY-YOUR-JWT-HERE';
const seed = 'COPY-YOUR-SEED-HERE';
const consumer = 'RESERVED_CONSUMER_DemoStream-0_MY-NEW-STREAM_BrbYH5Km9N';
const credentials = `-----BEGIN NATS USER JWT-----
${jwt}
------END NATS USER JWT------
************************* IMPORTANT *************************
NKEY Seed printed below can be used sign and prove identity.
NKEYs are sensitive and should be treated as secrets.
-----BEGIN USER NKEY SEED-----
${seed}
------END USER NKEY SEED------
`;
(async () => {
const nc = await connect({
debug: true,
servers: natsServer,
authenticator: credsAuthenticator(new TextEncoder().encode(credentials)),
inboxPrefix: inbox, //needed for streams connections
name: 'hello-world-subscribe'
});
const js = await nc.jetstream();
const jsm = await nc.jetstreamManager();
const streamInfo = await jsm.streams.info(stream);
console.info('YOUR STREAM INFO', streamInfo);
// setup the consumer
const opts = consumerOpts();
opts.durable(consumer);
opts.manualAck();
opts.ackExplicit();
opts.deliverTo(createInbox(inbox));
// subscribe with consumer
let sub = await js.subscribe(`${subject}`, opts);
for await (const m of sub) {
const d = sc.decode(m.data);
let data;
try {
// if its json, parse it
data = JSON.parse(d);
} catch(e) {
data = d;
}
console.info(`Received message: ${m.seq} ---`, data);
m.ack();
}
})();
If you run the subscribe script and then separately kick off the publish script, you'll see your messages come through the subject to your subscription.
If your JWT expires and you're still just debugging, you can click "Access Management" on the left nav of the portal, find your access object, and click
"GENERATE A TEMPORARY ACCESS JWT".
Sustained Connection
Creating a sustained connection is not significantly different from the hello world demo above. The main difference is that you'll need two extra steps. First, you need a function which can request access to UE Streams through UE Auth and then request a JWT from UE Streams. Second, you'll need to update your connection to catch authorization errors that occur when the JWT expires so you can update the JWT and reconnect. We can think of the whole flow like so:
For a detailed code example checkout the recipe for this approach!
Updated almost 2 years ago